defer.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # The contents of this file are subject to the Python Software Foundation
  2. # License Version 2.3 (the License). You may not copy or use this file, in
  3. # either source code or executable form, except in compliance with the License.
  4. # You may obtain a copy of the License at http://www.python.org/license.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. import sys
  11. import weakref
  12. import traceback
  13. import BTL.stackthreading as threading
  14. from twisted.internet import defer
  15. from twisted.python import failure
  16. debug = False
  17. tp_Failure = failure.Failure
  18. # used to emulate sys.exc_info()
  19. def exc_info(self):
  20. return self.type, self.value, self.tb
  21. tp_Failure.exc_info = exc_info
  22. # Maybe Dangerous. If you're having memory leaks, look here.
  23. # used to prevent traceback stripping for standard re-raises
  24. #old_cleanFailure = tp_Failure.cleanFailure
  25. #def cleanFailure(self):
  26. # self.tb2 = self.tb
  27. # old_cleanFailure(self)
  28. # self.tb = self.tb2
  29. #tp_Failure.cleanFailure = cleanFailure
  30. class Failure(tp_Failure):
  31. def __init__(self, *a, **kw):
  32. tp_Failure.__init__(self, *a, **kw)
  33. # magic to allow re-raise of failures to do proper stack appending
  34. if hasattr(self.value, 'failure'):
  35. self.stack = self.stack[:-2] + self.value.failure.stack
  36. self.frames = self.frames[:-2] + self.value.failure.frames
  37. self.value.failure = self
  38. failure.Failure = Failure
  39. fail = defer.fail
  40. succeed = defer.succeed
  41. execute = defer.execute
  42. maybeDeferred = defer.maybeDeferred
  43. timeout = defer.timeout
  44. DeferredQueue = defer.DeferredQueue
  45. Deferred = defer.Deferred
  46. def getResult(self):
  47. if isinstance(self.result, tp_Failure):
  48. r = self.result
  49. self.addErrback(lambda fuckoff: None)
  50. r.raiseException()
  51. return self.result
  52. Deferred.getResult = getResult
  53. Deferred_errback = Deferred.errback
  54. def errback(self, fail):
  55. assert isinstance(fail, (tp_Failure, Exception)), repr(fail)
  56. # this can check the wrong failure type if the imports occur in the
  57. # wrong order.
  58. #Deferred_errback(self, fail)
  59. if not isinstance(fail, tp_Failure):
  60. fail = Failure(fail)
  61. self._startRunCallbacks(fail)
  62. errback.__doc__ = Deferred_errback.__doc__
  63. Deferred.errback = errback
  64. def addLogback(self, logger, logmsg):
  65. if not callable(logger):
  66. logger = logger.error
  67. def logback(failure):
  68. logger(logmsg, exc_info=failure.exc_info())
  69. return self.addErrback(logback)
  70. Deferred.addLogback = addLogback
  71. # not totally safe, but a start.
  72. # This lets you call callback/errback from any thread.
  73. # The next step would be for addCallbak and addErrback to be safe.
  74. class ThreadableDeferred(Deferred):
  75. def __init__(self, queue_func):
  76. assert callable(queue_func)
  77. self.queue_func = queue_func
  78. Deferred.__init__(self)
  79. def callback(self, result):
  80. self.queue_func(Deferred.callback, self, result)
  81. def errback(self, result):
  82. self.queue_func(Deferred.errback, self, result)
  83. # go ahead and forget to call start()!
  84. class ThreadedDeferred(Deferred):
  85. def __init__(self, queue_func, f, *args, **kwargs):
  86. Deferred.__init__(self)
  87. daemon = False
  88. if 'daemon' in kwargs:
  89. daemon = kwargs.pop('daemon')
  90. self.f = f
  91. start = True
  92. if queue_func is None:
  93. start = False
  94. queue_func = lambda f, *a, **kw : f(*a, **kw)
  95. self.queue_func = queue_func
  96. self.args = args
  97. self.kwargs = kwargs
  98. self.t = threading.Thread(target=self.run)
  99. self.t.setDaemon(daemon)
  100. if start:
  101. self.start()
  102. def start(self):
  103. self.t.start()
  104. def run(self):
  105. try:
  106. r = self.f(*self.args, **self.kwargs)
  107. self.queue_func(self.callback, r)
  108. except:
  109. self.queue_func(self.errback, Failure())
  110. class DeferredEvent(Deferred, threading._Event):
  111. def __init__(self, *a, **kw):
  112. threading._Event.__init__(self)
  113. Deferred.__init__(self, *a, **kw)
  114. def set(self):
  115. threading._Event.set(self)
  116. self.callback(None) # hmm, None?
  117. def run_deferred(df, f, *a, **kw):
  118. try:
  119. v = f(*a, **kw)
  120. except:
  121. df.errback(Failure())
  122. else:
  123. df.callback(v)
  124. return df
  125. def run_deferred_and_queue(df, queue_task, f, *args, **kwargs):
  126. try:
  127. v = f(*args, **kwargs)
  128. except:
  129. queue_task(df.errback, Failure())
  130. del df
  131. else:
  132. if isinstance(v, Deferred):
  133. # v is owned by the caller, so add the callback
  134. # now, but the task itself should queue.
  135. # lamdba over df here would break 'del df' above
  136. # so do it with a local function.
  137. def make_queueback(func):
  138. return lambda r : queue_task(func, r)
  139. v.addCallback(make_queueback(df.callback))
  140. v.addErrback(make_queueback(df.errback))
  141. else:
  142. queue_task(df.callback, v)
  143. def defer_to_thread(local_queue_task, thread_queue_task, f, *args, **kwargs):
  144. df = Deferred()
  145. thread_queue_task(run_deferred_and_queue, df, local_queue_task,
  146. f, *args, **kwargs)
  147. return df
  148. def wrap_task(add_task):
  149. return lambda _f, *args, **kwargs : add_task(0, _f, *args, **kwargs)