epollreactor.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """A epoll() based implementation of the twisted main loop.
  4. This is just like pollreactor, but it uses epoll.
  5. To install the event loop (and you should do this before any connections,
  6. listeners or connectors are added)::
  7. from BTL import epollreactor
  8. epollreactor.install()
  9. API Stability: stable
  10. Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
  11. """
  12. # System imports
  13. ##try:
  14. ## import epoll as select
  15. ##except ImportError:
  16. ## import select
  17. # do or do not, there is no try
  18. import epoll as select
  19. ########################################################
  20. ## http://twistedmatrix.com/trac/ticket/1953#comment:20
  21. from twisted.python import log, failure
  22. from twisted.internet.tcp import BaseClient
  23. def failIfNotConnected(self, err):
  24. if (self.connected or self.disconnected or
  25. not hasattr(self, "connector")):
  26. return
  27. self.connector.connectionFailed(failure.Failure(err))
  28. if hasattr(self, "reactor"):
  29. # this doesn't happen if we failed in __init__
  30. self.stopReading()
  31. self.stopWriting()
  32. del self.connector
  33. try:
  34. self._closeSocket()
  35. except AttributeError:
  36. pass
  37. else:
  38. del self.socket, self.fileno
  39. BaseClient.failIfNotConnected = failIfNotConnected
  40. ########################################################
  41. import errno, sys
  42. from zope.interface import implements
  43. # Twisted imports
  44. from twisted.python import log, threadable, failure
  45. from twisted.internet import main, posixbase, error
  46. from twisted.internet.interfaces import IReactorFDSet
  47. # globals
  48. reads = {}
  49. writes = {}
  50. selectables = {}
  51. poller = select.poll()
  52. POLL_DISCONNECTED = (select.POLLHUP | select.POLLERR | select.POLLNVAL)
  53. class PollReactor(posixbase.PosixReactorBase):
  54. """A reactor that uses poll(2)."""
  55. implements(IReactorFDSet)
  56. def _updateRegistration(self, fd):
  57. """Register/unregister an fd with the poller."""
  58. try:
  59. poller.unregister(fd)
  60. except KeyError:
  61. pass
  62. mask = 0
  63. if reads.has_key(fd): mask = mask | select.POLLIN
  64. if writes.has_key(fd): mask = mask | select.POLLOUT
  65. if mask != 0:
  66. poller.register(fd, mask)
  67. else:
  68. if selectables.has_key(fd): del selectables[fd]
  69. def _dictRemove(self, selectable, mdict):
  70. try:
  71. # the easy way
  72. fd = selectable.fileno()
  73. # make sure the fd is actually real. In some situations we can get
  74. # -1 here.
  75. mdict[fd]
  76. except:
  77. # the hard way: necessary because fileno() may disappear at any
  78. # moment, thanks to python's underlying sockets impl
  79. for fd, fdes in selectables.items():
  80. if selectable is fdes:
  81. break
  82. else:
  83. # Hmm, maybe not the right course of action? This method can't
  84. # fail, because it happens inside error detection...
  85. return
  86. if mdict.has_key(fd):
  87. del mdict[fd]
  88. self._updateRegistration(fd)
  89. def addReader(self, reader):
  90. """Add a FileDescriptor for notification of data available to read.
  91. """
  92. fd = reader.fileno()
  93. if not reads.has_key(fd):
  94. selectables[fd] = reader
  95. reads[fd] = 1
  96. self._updateRegistration(fd)
  97. def addWriter(self, writer, writes=writes, selectables=selectables):
  98. """Add a FileDescriptor for notification of data available to write.
  99. """
  100. fd = writer.fileno()
  101. if not writes.has_key(fd):
  102. selectables[fd] = writer
  103. writes[fd] = 1
  104. self._updateRegistration(fd)
  105. def removeReader(self, reader, reads=reads):
  106. """Remove a Selectable for notification of data available to read.
  107. """
  108. return self._dictRemove(reader, reads)
  109. def removeWriter(self, writer, writes=writes):
  110. """Remove a Selectable for notification of data available to write.
  111. """
  112. return self._dictRemove(writer, writes)
  113. def removeAll(self, reads=reads, writes=writes, selectables=selectables):
  114. """Remove all selectables, and return a list of them."""
  115. if self.waker is not None:
  116. self.removeReader(self.waker)
  117. result = selectables.values()
  118. fds = selectables.keys()
  119. reads.clear()
  120. writes.clear()
  121. selectables.clear()
  122. for fd in fds:
  123. poller.unregister(fd)
  124. if self.waker is not None:
  125. self.addReader(self.waker)
  126. return result
  127. def doPoll(self, timeout,
  128. reads=reads,
  129. writes=writes,
  130. selectables=selectables,
  131. select=select,
  132. log=log,
  133. POLLIN=select.POLLIN,
  134. POLLOUT=select.POLLOUT):
  135. """Poll the poller for new events."""
  136. if timeout is not None:
  137. timeout = int(timeout * 1000) # convert seconds to milliseconds
  138. try:
  139. l = poller.poll(timeout)
  140. except select.error, e:
  141. if e[0] == errno.EINTR:
  142. return
  143. else:
  144. raise
  145. _drdw = self._doReadOrWrite
  146. for fd, event in l:
  147. try:
  148. selectable = selectables[fd]
  149. except KeyError:
  150. # Handles the infrequent case where one selectable's
  151. # handler disconnects another.
  152. continue
  153. log.callWithLogger(selectable, _drdw, selectable, fd, event, POLLIN, POLLOUT, log)
  154. doIteration = doPoll
  155. def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log,
  156. faildict={
  157. error.ConnectionDone: failure.Failure(error.ConnectionDone()),
  158. error.ConnectionLost: failure.Failure(error.ConnectionLost())
  159. }):
  160. why = None
  161. inRead = False
  162. if event & POLL_DISCONNECTED and not (event & POLLIN):
  163. why = main.CONNECTION_LOST
  164. else:
  165. try:
  166. if event & POLLIN:
  167. why = selectable.doRead()
  168. inRead = True
  169. if not why and event & POLLOUT:
  170. why = selectable.doWrite()
  171. inRead = False
  172. if not selectable.fileno() == fd:
  173. why = error.ConnectionFdescWentAway('Filedescriptor went away')
  174. inRead = False
  175. except:
  176. log.deferr()
  177. why = sys.exc_info()[1]
  178. if why:
  179. self._disconnectSelectable(selectable, why, inRead)
  180. def install():
  181. """Install the poll() reactor."""
  182. p = PollReactor()
  183. from twisted.internet import main
  184. main.installReactor(p)
  185. __all__ = ["PollReactor", "install"]