1
0

yielddefer25.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # yielddefer is an async programming mechanism with a blocking look-alike syntax
  2. #
  3. # The contents of this file are subject to the Python Software Foundation
  4. # License Version 2.3 (the License). You may not copy or use this file, in
  5. # either source code or executable form, except in compliance with the License.
  6. # You may obtain a copy of the License at http://www.python.org/license.
  7. #
  8. # Software distributed under the License is distributed on an AS IS basis,
  9. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  10. # for the specific language governing rights and limitations under the
  11. # License.
  12. #
  13. # launch_coroutine maintains the illusion that the passed function
  14. # (a generator) runs from beginning to end yielding when necessary
  15. # for some job to complete and then continuing where it left off.
  16. #
  17. # def f():
  18. # ...
  19. # yield some_thing_that_takes_time()
  20. # ...
  21. # result = yield something_else()
  22. # ...
  23. #
  24. # from inside a generator launched with launch_coroutine:
  25. # wait on a deferred to be called back by yielding it
  26. # return None by simply returning
  27. # return an exception by throwing one
  28. # return a value by yielding a non-Deferred
  29. #
  30. # by Greg Hazel
  31. from __future__ import generators
  32. import sys
  33. import types
  34. import traceback
  35. from BTL.defer import Deferred, Failure
  36. from BTL.stackthreading import _print_traceback
  37. from twisted.python import failure
  38. debug = False
  39. name_debug = False
  40. class GenWithDeferred(object):
  41. if debug:
  42. __slots__ = ['gen', 'current_deferred', 'deferred', 'queue_task' 'stack']
  43. else:
  44. __slots__ = ['gen', 'current_deferred', 'deferred', 'queue_task']
  45. def __init__(self, gen, deferred, queue_task):
  46. self.gen = gen
  47. self.deferred = deferred
  48. self.queue_task = queue_task
  49. self.current_deferred = None
  50. if debug:
  51. try:
  52. raise ZeroDivisionError
  53. except ZeroDivisionError:
  54. f = sys.exc_info()[2].tb_frame.f_back
  55. self.stack = traceback.extract_stack(f)
  56. # cut out GenWithDeferred() and launch_coroutine
  57. self.stack = self.stack[:-2]
  58. def cleanup(self):
  59. del self.gen
  60. del self.deferred
  61. del self.queue_task
  62. del self.current_deferred
  63. if debug:
  64. del self.stack
  65. if name_debug:
  66. def __getattr__(self, attr):
  67. if '_recall' not in attr:
  68. raise AttributeError(attr)
  69. return self._recall
  70. def _queue_task_chain(self, v):
  71. recall = getattr(self, "_recall_%s" % self.gen.gi_frame.f_code.co_name)
  72. self.queue_task(recall)
  73. return v
  74. else:
  75. def _queue_task_chain(self, v):
  76. self.queue_task(self._recall)
  77. return v
  78. def next(self):
  79. if not self.current_deferred:
  80. return self.gen.next()
  81. if isinstance(self.current_deferred.result, failure.Failure):
  82. r = self.current_deferred.result
  83. self.current_deferred.addErrback(lambda fuckoff: None)
  84. return self.gen.throw(*r.exc_info())
  85. return self.gen.send(self.current_deferred.result)
  86. def _recall(self):
  87. try:
  88. df = self.next()
  89. except StopIteration:
  90. self.deferred.callback(None)
  91. self.cleanup()
  92. except Exception, e:
  93. exc_type, value, tb = sys.exc_info()
  94. ## Magic Traceback Hacking
  95. if debug:
  96. # interpreter shutdown
  97. if not sys:
  98. return
  99. # HERE. This should really be logged or else bittorrent-
  100. # curses will never be able to properly output. --Dave
  101. _print_traceback(sys.stderr, self.stack,
  102. "generator %s" % self.gen.gi_frame.f_code.co_name, 0,
  103. exc_type, value, tb)
  104. else:
  105. #if (tb.tb_lineno != self.gen.gi_frame.f_lineno or
  106. # tb.f_code.co_filename != self.gen.gi_frame.f_code.co_filename):
  107. # tb = FakeTb(self.gen.gi_frame, tb)
  108. pass
  109. ## Magic Traceback Hacking
  110. self.deferred.errback(Failure(value, exc_type, tb))
  111. del tb
  112. self.cleanup()
  113. else:
  114. if not isinstance(df, Deferred):
  115. self.deferred.callback(df)
  116. self.cleanup()
  117. return
  118. self.current_deferred = df
  119. df.addCallback(self._queue_task_chain)
  120. df.addErrback(self._queue_task_chain)
  121. del df
  122. class FakeTb(object):
  123. __slots__ = ['tb_frame', 'tb_lineno', 'tb_orig', 'tb_next']
  124. def __init__(self, frame, tb):
  125. self.tb_frame = frame
  126. self.tb_lineno = frame.f_lineno
  127. self.tb_orig = tb
  128. self.tb_next = tb.tb_next
  129. def _launch_generator(queue_task, g, main_df):
  130. gwd = GenWithDeferred(g, main_df, queue_task)
  131. ## the first one is fired for you
  132. ##gwd._recall()
  133. # the first one is not fired for you, because if it errors the sys.exc_info
  134. # causes an unresolvable circular reference that makes the gwd.deferred never
  135. # be deleted.
  136. gwd._queue_task_chain(None)
  137. def launch_coroutine(queue_task, f, *args, **kwargs):
  138. main_df = Deferred()
  139. try:
  140. g = f(*args, **kwargs)
  141. except Exception, e:
  142. if debug:
  143. traceback.print_exc()
  144. main_df.errback(Failure())
  145. else:
  146. if isinstance(g, types.GeneratorType):
  147. _launch_generator(queue_task, g, main_df)
  148. else:
  149. # we got a non-generator, just callback with the return value
  150. main_df.callback(g)
  151. return main_df
  152. # decorator
  153. def coroutine(func, queue_task):
  154. def replacement(*a, **kw):
  155. return launch_coroutine(queue_task, func, *a, **kw)
  156. return replacement
  157. def wrap_task(add_task):
  158. return lambda _f, *args, **kwargs : add_task(0, _f, *args, **kwargs)
  159. _wrap_task = wrap_task