1
0

yielddefer.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # yielddefer is an async programming mechanism with a blocking look-alike syntax
  2. #
  3. # The contents of this file are subject to the Python Software Foundation
  4. # License Version 2.3 (the License). You may not copy or use this file, in
  5. # either source code or executable form, except in compliance with the License.
  6. # You may obtain a copy of the License at http://www.python.org/license.
  7. #
  8. # Software distributed under the License is distributed on an AS IS basis,
  9. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  10. # for the specific language governing rights and limitations under the
  11. # License.
  12. #
  13. # launch_coroutine maintains the illusion that the passed function
  14. # (a generator) runs from beginning to end yielding when necessary
  15. # for some job to complete and then continuing where it left off.
  16. #
  17. # def f():
  18. # ...
  19. # df = some_thing_that_takes_time()
  20. # yield df
  21. # df.getResult() # Even if not expecting result.
  22. # ...
  23. # df = something_else()
  24. # yield df
  25. # result = df.getResult()
  26. # ...
  27. #
  28. # Upon resuming from a yield point, the generator should
  29. # call getResult() even if no result is expected, so that
  30. # exceptions generated while yielding are raised.
  31. #
  32. # from inside a generator launched with launch_coroutine:
  33. # wait on a deferred to be called back by yielding it
  34. # return None by simply returning
  35. # return an exception by throwing one
  36. # return a value by yielding a non-Deferred
  37. #
  38. # by Greg Hazel
  39. from __future__ import generators
  40. import sys
  41. import types
  42. import traceback
  43. from BTL.defer import Deferred, Failure, wrap_task
  44. from BTL.stackthreading import _print_traceback
  45. debug = False
  46. name_debug = False
  47. class GenWithDeferred(object):
  48. if debug:
  49. __slots__ = ['gen', 'deferred', 'queue_task', 'stack']
  50. else:
  51. __slots__ = ['gen', 'deferred', 'queue_task']
  52. def __init__(self, gen, deferred, queue_task):
  53. self.gen = gen
  54. self.deferred = deferred
  55. self.queue_task = queue_task
  56. if debug:
  57. try:
  58. raise ZeroDivisionError
  59. except ZeroDivisionError:
  60. f = sys.exc_info()[2].tb_frame.f_back
  61. self.stack = traceback.extract_stack(f)
  62. # cut out GenWithDeferred() and launch_coroutine
  63. self.stack = self.stack[:-2]
  64. def cleanup(self):
  65. del self.gen
  66. del self.deferred
  67. del self.queue_task
  68. if debug:
  69. del self.stack
  70. if name_debug:
  71. def __getattr__(self, attr):
  72. if '_recall' not in attr:
  73. raise AttributeError(attr)
  74. return self._recall
  75. def _queue_task_chain(self, v):
  76. recall = getattr(self, "_recall_%s" % self.gen.gi_frame.f_code.co_name)
  77. self.queue_task(recall)
  78. return v
  79. else:
  80. def _queue_task_chain(self, v):
  81. self.queue_task(self._recall)
  82. return v
  83. def _recall(self):
  84. try:
  85. t = self.gen.next()
  86. except StopIteration:
  87. self.deferred.callback(None)
  88. self.cleanup()
  89. except Exception, e:
  90. exc_type, value, tb = sys.exc_info()
  91. ## Magic Traceback Hacking
  92. if debug:
  93. # interpreter shutdown
  94. if not sys:
  95. return
  96. stream = sys.stderr
  97. # HERE. This should really be logged or else bittorrent-
  98. # curses will never be able to properly output. --Dave
  99. _print_traceback(stream, self.stack,
  100. "generator %s" % self.gen.gi_frame.f_code.co_name, 0,
  101. exc_type, value, tb)
  102. else:
  103. #if (tb.tb_lineno != self.gen.gi_frame.f_lineno or
  104. # tb.f_code.co_filename != self.gen.gi_frame.f_code.co_filename):
  105. # tb = FakeTb(self.gen.gi_frame, tb)
  106. pass
  107. ## Magic Traceback Hacking
  108. self.deferred.errback(Failure(value, exc_type, tb))
  109. del tb
  110. self.cleanup()
  111. else:
  112. if not isinstance(t, Deferred):
  113. self.deferred.callback(t)
  114. self.cleanup()
  115. return
  116. t.addCallback(self._queue_task_chain)
  117. t.addErrback(self._queue_task_chain)
  118. del t
  119. class FakeTb(object):
  120. __slots__ = ['tb_frame', 'tb_lineno', 'tb_orig', 'tb_next']
  121. def __init__(self, frame, tb):
  122. self.tb_frame = frame
  123. self.tb_lineno = frame.f_lineno
  124. self.tb_orig = tb
  125. self.tb_next = tb.tb_next
  126. def _launch_generator(queue_task, g, main_df):
  127. gwd = GenWithDeferred(g, main_df, queue_task)
  128. ## the first one is fired for you
  129. ##gwd._recall()
  130. # the first one is not fired for you, because if it errors the sys.exc_info
  131. # causes an unresolvable circular reference that makes the gwd.deferred never
  132. # be deleted.
  133. gwd._queue_task_chain(None)
  134. def launch_coroutine(queue_task, f, *args, **kwargs):
  135. main_df = Deferred()
  136. try:
  137. g = f(*args, **kwargs)
  138. except Exception, e:
  139. if debug:
  140. traceback.print_exc()
  141. main_df.errback(Failure())
  142. else:
  143. if isinstance(g, types.GeneratorType):
  144. _launch_generator(queue_task, g, main_df)
  145. else:
  146. # we got a non-generator, just callback with the return value
  147. main_df.callback(g)
  148. return main_df
  149. def coroutine(queue_task):
  150. def make_coroutine(_f):
  151. def replacement(*a, **kw):
  152. return launch_coroutine(queue_task, _f, *a, **kw)
  153. return replacement
  154. return make_coroutine