1
0

EventLoop.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # The contents of this file are subject to the Python Software Foundation
  2. # License Version 2.3 (the License). You may not copy or use this file, in
  3. # either source code or executable form, except in compliance with the License.
  4. # You may obtain a copy of the License at http://www.python.org/license.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. #
  11. # by Greg Hazel
  12. import sys
  13. import Queue
  14. import BTL.stackthreading as threading
  15. from BTL import defer
  16. from BTL.yielddefer import launch_coroutine, wrap_task
  17. class EventLoop(object):
  18. def __init__(self):
  19. self.thread = threading.Thread(target=self.run)
  20. self.queue = Queue.Queue()
  21. self.killswitch = threading.Event()
  22. def __getattr__(self, attr):
  23. return getattr(self.thread, attr)
  24. def add_task(self, _f, *a, **kw):
  25. self.queue.put((_f, a, kw))
  26. def exit(self):
  27. self.killswitch.set()
  28. self.add_task(lambda : None)
  29. def run(self):
  30. while not self.killswitch.isSet():
  31. func, args, kwargs = self.queue.get(True)
  32. try:
  33. v = func(*args, **kwargs)
  34. except:
  35. # interpreter shutdown
  36. if not sys:
  37. return
  38. exc_type, value, tb = sys.exc_info()
  39. threading._print_traceback(sys.stderr, self.stack,
  40. "thread %s" % self.thread.getName(),
  41. 1,
  42. exc_type, value, tb)
  43. del tb
  44. class RoutineLoop(object):
  45. def __init__(self, queue_task):
  46. self.killswitch = threading.Event()
  47. self.queue = defer.DeferredQueue()
  48. self.main_df = launch_coroutine(queue_task, self.run)
  49. def add_task(self, _f, *a, **kw):
  50. df = _f(*a, **kw)
  51. self.queue.put((df,))
  52. def add_deferred(self, df):
  53. self.queue.put((df,))
  54. def exit(self):
  55. self.killswitch.set()
  56. self.add_deferred(defer.succeed(True))
  57. def run(self):
  58. while not self.killswitch.isSet():
  59. event_df = self.queue.get()
  60. yield event_df
  61. (df,) = event_df.getResult()
  62. yield df
  63. try:
  64. r = df.getResult()
  65. except:
  66. # interpreter shutdown
  67. if not sys:
  68. return
  69. exc_type, value, tb = sys.exc_info()
  70. # no base_stack, unless we wan't to keep stack from the add_task
  71. threading._print_traceback(sys.stderr, [],
  72. "RoutineLoop", 1,
  73. exc_type, value, tb)
  74. del tb