| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # The contents of this file are subject to the Python Software Foundation
- # License Version 2.3 (the License). You may not copy or use this file, in
- # either source code or executable form, except in compliance with the License.
- # You may obtain a copy of the License at http://www.python.org/license.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- #
- # by Greg Hazel
- import sys
- import Queue
- import BTL.stackthreading as threading
- from BTL import defer
- from BTL.yielddefer import launch_coroutine, wrap_task
- class EventLoop(object):
-
- def __init__(self):
- self.thread = threading.Thread(target=self.run)
- self.queue = Queue.Queue()
- self.killswitch = threading.Event()
- def __getattr__(self, attr):
- return getattr(self.thread, attr)
-
- def add_task(self, _f, *a, **kw):
- self.queue.put((_f, a, kw))
- def exit(self):
- self.killswitch.set()
- self.add_task(lambda : None)
- def run(self):
- while not self.killswitch.isSet():
- func, args, kwargs = self.queue.get(True)
- try:
- v = func(*args, **kwargs)
- except:
- # interpreter shutdown
- if not sys:
- return
- exc_type, value, tb = sys.exc_info()
- threading._print_traceback(sys.stderr, self.stack,
- "thread %s" % self.thread.getName(),
- 1,
- exc_type, value, tb)
- del tb
- class RoutineLoop(object):
- def __init__(self, queue_task):
- self.killswitch = threading.Event()
- self.queue = defer.DeferredQueue()
- self.main_df = launch_coroutine(queue_task, self.run)
- def add_task(self, _f, *a, **kw):
- df = _f(*a, **kw)
- self.queue.put((df,))
- def add_deferred(self, df):
- self.queue.put((df,))
- def exit(self):
- self.killswitch.set()
- self.add_deferred(defer.succeed(True))
- def run(self):
- while not self.killswitch.isSet():
- event_df = self.queue.get()
- yield event_df
- (df,) = event_df.getResult()
-
- yield df
- try:
- r = df.getResult()
- except:
- # interpreter shutdown
- if not sys:
- return
- exc_type, value, tb = sys.exc_info()
- # no base_stack, unless we wan't to keep stack from the add_task
- threading._print_traceback(sys.stderr, [],
- "RoutineLoop", 1,
- exc_type, value, tb)
- del tb
-
-
|