RawServer_twisted.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Greg Hazel
  11. import os
  12. import sys
  13. import socket
  14. import signal
  15. import string
  16. import struct
  17. import thread
  18. import logging
  19. from BTL.translation import _
  20. from BTL.obsoletepythonsupport import set
  21. from BTL.defer import DeferredEvent, Failure
  22. from BTL.SaneThreadedResolver import SaneThreadedResolver
  23. ##############################################################
  24. profile = False
  25. if profile:
  26. try:
  27. from BTL.profile import Profiler, Stats
  28. prof_file_name = 'rawserver.prof'
  29. except ImportError, e:
  30. print "profiling not available:", e
  31. profile = False
  32. ##############################################################
  33. from twisted.python import threadable
  34. # needed for twisted 1.3
  35. # otherwise the 'thread safety' functions are not 'thread safe'
  36. threadable.init(1)
  37. from BTL.reactor_magic import noSignals, reactor, is_iocpreactor
  38. # as far as I know, we work with twisted 1.3 and >= 2.0
  39. #import twisted.copyright
  40. #if twisted.copyright.version.split('.') < 2:
  41. # raise ImportError(_("RawServer_twisted requires twisted 2.0.0 or greater"))
  42. from twisted.protocols.policies import TimeoutMixin
  43. from twisted.internet.protocol import DatagramProtocol, Protocol, ClientFactory
  44. from twisted.internet.threads import deferToThread
  45. from twisted.internet import error, interfaces
  46. from BTL.ConnectionRateLimitReactor import connectionRateLimitReactor
  47. letters = set(string.letters)
  48. main_thread = thread.get_ident()
  49. rawserver_logger = logging.getLogger('RawServer')
  50. NOLINGER = struct.pack('ii', 1, 0)
  51. # python sucks.
  52. SHUT_RD = getattr(socket, 'SHUT_RD', 0)
  53. SHUT_WR = getattr(socket, 'SHUT_WR', 1)
  54. # this is a base class for all the callbacks the server could use
  55. class Handler(object):
  56. # called when the connection is being attempted
  57. def connection_starting(self, addr):
  58. pass
  59. # called when the connection is ready for writiing
  60. def connection_made(self, s):
  61. pass
  62. # called when a connection attempt failed (failed, refused, or requested)
  63. def connection_failed(self, s, exception):
  64. pass
  65. def data_came_in(self, addr, data):
  66. pass
  67. # called once when the current write buffer empties completely
  68. def connection_flushed(self, s):
  69. pass
  70. # called when a connection dies (lost or requested)
  71. def connection_lost(self, s):
  72. pass
  73. class ConnectionWrapper(object):
  74. def __init__(self, rawserver, handler, context):
  75. if handler is None:
  76. raise ValueError("Handler should not be None")
  77. self.ip = None # peer ip
  78. self.port = None # peer port
  79. self.dying = False
  80. self.paused = False
  81. self.encrypt = None
  82. self.flushed = False
  83. self.connector = None
  84. self.transport = None
  85. self.write_open = True
  86. self.reset_timeout = None
  87. self.callback_connection = None
  88. self.post_init(rawserver, handler, context)
  89. def post_init(self, rawserver, handler, context):
  90. if handler is None:
  91. raise ValueError("Handler should not be None")
  92. self.rawserver = rawserver
  93. self.handler = handler
  94. self.context = context
  95. if self.rawserver:
  96. self.rawserver.single_sockets.add(self)
  97. def attach_connector(self, connector):
  98. self.connector = connector
  99. addr = connector.getDestination()
  100. try:
  101. self.ip = addr.host
  102. self.port = addr.port
  103. except:
  104. # unix sockets, for example
  105. pass
  106. def get_socket(self):
  107. s = None
  108. if interfaces.ISystemHandle.providedBy(self.transport):
  109. s = self.transport.getHandle()
  110. return s
  111. def pause_reading(self):
  112. # interfaces are the stupedist crap ever
  113. if (hasattr(interfaces.IProducer, "providedBy") and
  114. not interfaces.IProducer.providedBy(self.transport)):
  115. print "No producer", self.ip, self.port, self.transport
  116. return
  117. # not explicitly needed, but iocpreactor has a bug where the author is a moron
  118. if self.paused:
  119. return
  120. self.transport.pauseProducing()
  121. self.paused = True
  122. def resume_reading(self):
  123. if (hasattr(interfaces.IProducer, "providedBy") and
  124. not interfaces.IProducer.providedBy(self.transport)):
  125. print "No producer", self.ip, self.port, self.transport
  126. return
  127. # not explicitly needed, but iocpreactor has a bug where the author is a moron
  128. if not self.paused:
  129. return
  130. self.paused = False
  131. try:
  132. self.transport.resumeProducing()
  133. except Exception, e:
  134. # I bet these are harmless
  135. print "resumeProducing error", type(e), e
  136. def attach_transport(self, callback_connection, transport, reset_timeout):
  137. self.transport = transport
  138. self.callback_connection = callback_connection
  139. self.reset_timeout = reset_timeout
  140. if hasattr(self.transport, 'registerProducer'):
  141. # Multicast uses sendto, which does not buffer.
  142. # It has no producer api
  143. self.transport.registerProducer(self, False)
  144. try:
  145. addr = self.transport.getPeer()
  146. except:
  147. # udp, for example
  148. addr = self.transport.getHost()
  149. try:
  150. self.ip = addr.host
  151. self.port = addr.port
  152. except:
  153. # unix sockets, for example
  154. pass
  155. tos = self.rawserver.config.get('peer_socket_tos', 0)
  156. if tos != 0:
  157. s = self.get_socket()
  158. try:
  159. s.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
  160. except socket.error:
  161. pass
  162. def sendto(self, packet, flags, addr):
  163. ret = None
  164. try:
  165. ret = self.transport.write(packet, addr)
  166. except:
  167. # dont be so noisy here
  168. pass
  169. # rawserver_logger.warning("UDP sendto failed", exc_info=sys.exc_info())
  170. return ret
  171. def write(self, b):
  172. self.flushed = False
  173. if not self.write_open:
  174. return
  175. if self.encrypt is not None:
  176. b = self.encrypt(b)
  177. # bleh
  178. if isinstance(b, buffer):
  179. b = str(b)
  180. self.transport.write(b)
  181. def resumeProducing(self):
  182. self.flushed = True
  183. # why do you tease me so?
  184. if self.handler is not None:
  185. # calling flushed from the write is bad form
  186. self.rawserver.add_task(0, self.handler.connection_flushed, self)
  187. def pauseProducing(self):
  188. # auto pause by not resuming
  189. pass
  190. def stopProducing(self):
  191. self.write_open = False
  192. def is_flushed(self):
  193. return self.flushed
  194. def shutdown(self, how):
  195. if how == SHUT_WR:
  196. if hasattr(self.transport, "loseWriteConnection"):
  197. self.transport.loseWriteConnection()
  198. else:
  199. # twisted 1.3 sucks
  200. try:
  201. self.transport.socket.shutdown(how)
  202. except:
  203. pass
  204. elif how == SHUT_RD:
  205. self.transport.stopListening()
  206. else:
  207. self.close()
  208. def stopConnecting(self):
  209. return self.connector.stopConnecting()
  210. def close(self):
  211. self.stopProducing()
  212. if self.rawserver.config.get('close_with_rst', True):
  213. try:
  214. s = self.get_socket()
  215. s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
  216. except:
  217. pass
  218. if self.transport:
  219. try:
  220. self.transport.unregisterProducer()
  221. except KeyError:
  222. # bug in iocpreactor: http://twistedmatrix.com/trac/ticket/1657
  223. pass
  224. if (hasattr(self.transport, 'protocol') and
  225. isinstance(self.transport.protocol, CallbackDatagramProtocol)):
  226. # udp connections should only call stopListening
  227. self.transport.stopListening()
  228. else:
  229. self.transport.loseConnection()
  230. elif self.connector:
  231. self.connector.disconnect()
  232. def _cleanup(self):
  233. self.handler = None
  234. del self.transport
  235. del self.connector
  236. del self.context
  237. if self.callback_connection:
  238. if self.callback_connection.can_timeout:
  239. self.callback_connection.setTimeout(None)
  240. self.callback_connection.connection = None
  241. del self.callback_connection
  242. class CallbackConnection(object):
  243. def attachTransport(self, transport, s):
  244. s.attach_transport(self, transport=transport,
  245. reset_timeout=self.optionalResetTimeout)
  246. self.connection = s
  247. def connectionMade(self):
  248. s = self.connection
  249. s.handler.connection_made(s)
  250. self.optionalResetTimeout()
  251. self.factory.rawserver.connectionMade(s)
  252. def connectionLost(self, reason):
  253. reactor.callLater(0, self.post_connectionLost, reason)
  254. # twisted api inconsistancy workaround
  255. # sometimes connectionLost is called (not queued) from inside write()
  256. def post_connectionLost(self, reason):
  257. s = self.connection
  258. #print s.ip, s.port, reason.getErrorMessage()
  259. self.factory.rawserver._remove_socket(s, was_connected=True)
  260. def dataReceived(self, data):
  261. self.optionalResetTimeout()
  262. s = self.connection
  263. s.rawserver._make_wrapped_call(s.handler.data_came_in,
  264. s, data, wrapper=s)
  265. def datagramReceived(self, data, (host, port)):
  266. s = self.connection
  267. s.rawserver._make_wrapped_call(s.handler.data_came_in,
  268. (host, port), data, wrapper=s)
  269. def optionalResetTimeout(self):
  270. if self.can_timeout:
  271. self.resetTimeout()
  272. class CallbackProtocol(CallbackConnection, TimeoutMixin, Protocol):
  273. def makeConnection(self, transport):
  274. self.attachTransport(transport, self.wrapper)
  275. self.can_timeout = True
  276. self.setTimeout(self.factory.rawserver.config.get('socket_timeout', 30))
  277. return Protocol.makeConnection(self, transport)
  278. class CallbackDatagramProtocol(CallbackConnection, DatagramProtocol):
  279. def startProtocol(self):
  280. self.can_timeout = False
  281. self.attachTransport(self.transport, self.connection)
  282. return DatagramProtocol.startProtocol(self)
  283. class ConnectionFactory(ClientFactory):
  284. protocol = CallbackProtocol
  285. def __init__(self, rawserver, outgoing):
  286. self.rawserver = rawserver
  287. self.outgoing = outgoing
  288. def add_connection_data(self, data):
  289. self.data = data
  290. def get_connection_data(self):
  291. return self.data
  292. def get_wrapper(self):
  293. if self.outgoing:
  294. wrapper = self.get_connection_data()
  295. else:
  296. args = self.get_connection_data()
  297. wrapper = ConnectionWrapper(*args)
  298. return wrapper
  299. def startedConnecting(self, connector):
  300. peer = connector.getDestination()
  301. addr = (peer.host, peer.port)
  302. wrapper = self.get_wrapper()
  303. if wrapper.handler is not None:
  304. wrapper.handler.connection_starting(addr)
  305. def buildProtocol(self, addr):
  306. protocol = ClientFactory.buildProtocol(self, addr)
  307. protocol.wrapper = self.get_wrapper()
  308. return protocol
  309. def clientConnectionFailed(self, connector, reason):
  310. wrapper = self.get_wrapper()
  311. # opt-out
  312. if not wrapper.dying:
  313. # this might not work - reason is not an exception
  314. wrapper.handler.connection_failed(wrapper, reason)
  315. wrapper.dying = True
  316. self.rawserver._remove_socket(wrapper)
  317. # storage for socket creation requestions, and proxy once the connection is made
  318. class SocketRequestProxy(object):
  319. def __init__(self, port, bind, protocol):
  320. self.port = port
  321. self.bind = bind
  322. self.protocol = protocol
  323. self.connection = None
  324. def __getattr__(self, name):
  325. return getattr(self.connection, name)
  326. def close(self):
  327. # closing the proxy doesn't mean anything.
  328. # you can stop_listening(), and then start again.
  329. # the socket only exists while it is listening
  330. if self.connection:
  331. self.connection.close()
  332. class RawServerMixin(object):
  333. def __init__(self, config=None, noisy=True):
  334. self.noisy = noisy
  335. self.config = config
  336. if not self.config:
  337. self.config = {}
  338. self.sigint_flag = None
  339. self.sigint_installed = False
  340. # going away soon. call _context_wrap on the context.
  341. def _make_wrapped_call(self, _f, *args, **kwargs):
  342. wrapper = kwargs.pop('wrapper', None)
  343. try:
  344. _f(*args, **kwargs)
  345. except KeyboardInterrupt:
  346. raise
  347. except Exception, e: # hopefully nothing raises strings
  348. # Incoming sockets can be assigned to a particular torrent during
  349. # a data_came_in call, and it's possible (though not likely) that
  350. # there could be a torrent-specific exception during the same call.
  351. # Therefore read the context after the call.
  352. context = None
  353. if wrapper is not None:
  354. context = wrapper.context
  355. if context is not None:
  356. context.got_exception(Failure())
  357. elif self.noisy:
  358. rawserver_logger.exception("Error in _make_wrapped_call for %s",
  359. _f.__name__)
  360. # must be called from the main thread
  361. def install_sigint_handler(self, flag = None):
  362. if flag is not None:
  363. self.sigint_flag = flag
  364. signal.signal(signal.SIGINT, self._handler)
  365. self.sigint_installed = True
  366. def _handler(self, signum, frame):
  367. if self.sigint_flag:
  368. self.external_add_task(0, self.sigint_flag.set)
  369. elif self.doneflag:
  370. self.external_add_task(0, self.doneflag.set)
  371. # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt,
  372. # in case the program is in an infinite loop
  373. signal.signal(signal.SIGINT, signal.default_int_handler)
  374. class RawServer(RawServerMixin):
  375. """RawServer encapsulates I/O and task scheduling.
  376. I/O corresponds to the arrival data on a file descriptor,
  377. and a task is a scheduled callback. A task is scheduled
  378. using add_task or external_add_task. add_task is used from within the
  379. thread running the RawServer, external_add_task from other threads.
  380. tracker.py provides a simple example of how to use RawServer.
  381. 1. creates an instance of RawServer
  382. r = RawServer(config)
  383. 2. creates a socket by a call to create_serversocket.
  384. s = r.create_serversocket(config['port'], config['bind'])
  385. 3. tells the raw server to listen to the socket and associate
  386. a protocol handler with the socket.
  387. r.start_listening(s,
  388. HTTPHandler(t.get, config['min_time_between_log_flushes']))
  389. 4. tells the raw_server to listen for I/O or scheduled tasks
  390. until the r.stop() is called.
  391. r.listen_forever()
  392. When a remote client opens a connection, a new socket is
  393. returned from the server socket's accept method and the
  394. socket is assigned the same handler as was assigned to the
  395. server socket.
  396. As data arrives on a socket, the handler's data_came_in
  397. member function is called. It is up to the handler to
  398. interpret the data and/or pass it on to other objects.
  399. In the tracker, the HTTP protocol handler passes the arriving data
  400. to an HTTPConnector object which maintains state specific
  401. to a given connection.
  402. For outgoing connections, the call start_connection() is used.
  403. """
  404. def __init__(self, config=None, noisy=True):
  405. """config is a dict that contains option-value pairs.
  406. """
  407. RawServerMixin.__init__(self, config, noisy)
  408. self.doneflag = None
  409. # init is fine until the loop starts
  410. self.ident = thread.get_ident()
  411. self.associated = False
  412. self.single_sockets = set()
  413. self.unix_sockets = set()
  414. self.udp_sockets = set()
  415. self.listened = False
  416. self.connections = 0
  417. ##############################################################
  418. if profile:
  419. try:
  420. os.unlink(prof_file_name)
  421. except:
  422. pass
  423. self.prof = Profiler()
  424. ##############################################################
  425. self.connection_limit = self.config.get('max_incomplete', 10)
  426. connectionRateLimitReactor(reactor, self.connection_limit)
  427. # bleh
  428. self.add_pending_connection = reactor.add_pending_connection
  429. self.remove_pending_connection = reactor.remove_pending_connection
  430. self.reactor = reactor
  431. self.reactor.resolver = SaneThreadedResolver(self.reactor)
  432. #from twisted.internet import task
  433. #l2 = task.LoopingCall(self._print_connection_count)
  434. #l2.start(1)
  435. ##############################################################
  436. def _print_connection_count(self):
  437. def _sl(x):
  438. if hasattr(x, "__len__"):
  439. return str(len(x))
  440. else:
  441. return str(x)
  442. c = len(self.single_sockets)
  443. u = len(self.udp_sockets)
  444. c -= u
  445. #s = "Connections(" + str(id(self)) + "): tcp(" + str(c) + ") upd(" + str(u) + ")"
  446. #rawserver_logger.debug(s)
  447. d = dict()
  448. for s in self.single_sockets:
  449. state = "None"
  450. if not s.dying and s.transport:
  451. try:
  452. state = s.transport.state
  453. except:
  454. state = "has transport"
  455. else:
  456. state = "No transport"
  457. if state not in d:
  458. d[state] = 0
  459. d[state] += 1
  460. #rawserver_logger.debug(d)
  461. print d
  462. sizes = "cc(" + _sl(self.connections)
  463. sizes += ") ss(" + _sl(self.single_sockets)
  464. sizes += ") us(" + _sl(self.udp_sockets) + ")"
  465. #rawserver_logger.debug(sizes)
  466. print sizes
  467. ##############################################################
  468. def get_remote_endpoints(self):
  469. addrs = [(s.ip, s.port) for s in self.single_sockets]
  470. return addrs
  471. ## def add_task(self, delay, _f, *args, **kwargs):
  472. ## """Schedule the passed function 'func' to be called after
  473. ## 'delay' seconds and pass the 'args'.
  474. ##
  475. ## This should only be called by RawServer's thread."""
  476. ## #assert thread.get_ident() == self.ident
  477. ## return reactor.callLater(delay, _f, *args, **kwargs)
  478. add_task = reactor.callLater
  479. def external_add_task(self, delay, _f, *args, **kwargs):
  480. """Schedule the passed function 'func' to be called after
  481. 'delay' seconds and pass 'args'.
  482. This should be called by threads other than RawServer's thread."""
  483. if delay == 0:
  484. return reactor.callFromThread(_f, *args, **kwargs)
  485. else:
  486. return reactor.callFromThread(reactor.callLater, delay,
  487. _f, *args, **kwargs)
  488. def create_unixserversocket(self, filename):
  489. s = SocketRequestProxy(0, filename, 'unix')
  490. factory = ConnectionFactory(self, outgoing=False)
  491. s.listening_port = reactor.listenUNIX(s.bind, factory)
  492. s.factory = factory
  493. s.listening_port.listening = True
  494. return s
  495. def create_serversocket(self, port, bind=''):
  496. s = SocketRequestProxy(port, bind, 'tcp')
  497. factory = ConnectionFactory(self, outgoing=False)
  498. try:
  499. s.listening_port = reactor.listenTCP(s.port, factory,
  500. interface=s.bind)
  501. except error.CannotListenError, e:
  502. if e[0] != 0:
  503. raise e.socketError
  504. else:
  505. raise
  506. s.factory = factory
  507. s.listening_port.listening = True
  508. return s
  509. def _create_udpsocket(self, port, bind, create_func):
  510. s = SocketRequestProxy(port, bind, 'udp')
  511. protocol = CallbackDatagramProtocol()
  512. c = ConnectionWrapper(self, Handler(), None)
  513. s.connection = c
  514. protocol.connection = c
  515. try:
  516. s.listening_port = create_func(s.port, protocol, interface=s.bind)
  517. except error.CannotListenError, e:
  518. raise e.socketError
  519. s.listening_port.listening = True
  520. return s
  521. def create_udpsocket(self, port, bind=''):
  522. return self._create_udpsocket(port, bind,
  523. create_func = reactor.listenUDP)
  524. def create_multicastsocket(self, port, bind=''):
  525. return self._create_udpsocket(port, bind,
  526. create_func = reactor.listenMulticast)
  527. def _start_listening(self, s):
  528. if not s.listening_port.listening:
  529. s.listening_port.startListening()
  530. s.listening_port.listening = True
  531. def start_listening(self, serversocket, handler, context=None):
  532. data = (self, handler, context)
  533. serversocket.factory.add_connection_data(data)
  534. self._start_listening(serversocket)
  535. def start_listening_udp(self, serversocket, handler, context=None):
  536. c = serversocket.connection
  537. c.post_init(self, handler, context)
  538. self._start_listening(serversocket)
  539. self.udp_sockets.add(c)
  540. start_listening_multicast = start_listening_udp
  541. def stop_listening(self, serversocket):
  542. listening_port = serversocket.listening_port
  543. try:
  544. listening_port.stopListening()
  545. except AttributeError:
  546. # AttributeError: 'MulticastPort' object has no attribute 'handle_disconnected_stopListening'
  547. # sigh.
  548. pass
  549. listening_port.listening = False
  550. def stop_listening_udp(self, serversocket):
  551. self.stop_listening(serversocket)
  552. self.udp_sockets.remove(serversocket.connection)
  553. self.single_sockets.remove(serversocket.connection)
  554. stop_listening_multicast = stop_listening_udp
  555. def start_connection(self, dns, handler, context=None, do_bind=True,
  556. timeout=30, urgent=False):
  557. """creates the client-side of a connection and associates it with
  558. the passed handler. Data received on this conneciton are passed
  559. to the handler's data_came_in method."""
  560. addr = dns[0]
  561. port = int(dns[1])
  562. if len(letters.intersection(addr)) > 0:
  563. rawserver_logger.warning("Don't pass host names to RawServer")
  564. # this blocks, that's why we throw the warning
  565. addr = socket.gethostbyname(addr)
  566. bindaddr = None
  567. if do_bind:
  568. bindaddr = self.config.get('bind', '')
  569. if isinstance(bindaddr, str) and len(bindaddr) >= 0:
  570. bindaddr = (bindaddr, 0)
  571. else:
  572. bindaddr = None
  573. if handler is None:
  574. raise ValueError("Handler should not be None")
  575. c = ConnectionWrapper(self, handler, context)
  576. factory = ConnectionFactory(self, outgoing=True)
  577. factory.add_connection_data(c)
  578. if self.connection_limit:
  579. connector = reactor.connectTCP(addr, port, factory,
  580. owner=id(context),
  581. bindAddress=bindaddr, timeout=timeout,
  582. urgent=urgent)
  583. else:
  584. connector = reactor.connectTCP(addr, port, factory,
  585. bindAddress=bindaddr, timeout=timeout)
  586. c.attach_connector(connector)
  587. self.single_sockets.add(c)
  588. return c
  589. def associate_thread(self):
  590. assert not self.associated, \
  591. "RawServer has already been associated with a thread"
  592. self.ident = thread.get_ident()
  593. reactor.ident = self.ident
  594. self.associated = True
  595. def listen_forever(self, doneflag=None):
  596. """Main event processing loop for RawServer.
  597. RawServer listens until the doneFlag is set by some other
  598. thread. The doneFlag tells all threads to clean-up and then
  599. exit."""
  600. if not doneflag:
  601. doneflag = DeferredEvent()
  602. assert isinstance(doneflag, DeferredEvent)
  603. self.doneflag = doneflag
  604. if not self.associated:
  605. self.associate_thread()
  606. if self.listened:
  607. Exception(_("listen_forever() should only be called once per reactor."))
  608. if main_thread == thread.get_ident() and not self.sigint_installed:
  609. self.install_sigint_handler()
  610. if is_iocpreactor and main_thread == thread.get_ident():
  611. def pulse():
  612. self.add_task(1, pulse)
  613. pulse()
  614. reactor.callLater(0, self.doneflag.addCallback, self._safestop)
  615. self.listened = True
  616. reactor.suggestThreadPoolSize(3)
  617. if profile:
  618. self.prof.enable()
  619. if noSignals:
  620. reactor.run(installSignalHandlers=False)
  621. else:
  622. reactor.run()
  623. if profile:
  624. self.prof.disable()
  625. st = Stats(self.prof.getstats())
  626. st.sort()
  627. f = open(prof_file_name, 'wb')
  628. st.dump(file=f)
  629. def listen_once(self, period=1e9):
  630. rawserver_logger.warning(_("listen_once() might not return until there "
  631. "is activity, and might not process the "
  632. "event you want. Use listen_forever()."))
  633. reactor.iterate(period)
  634. def stop(self):
  635. if self.doneflag and not self.doneflag.isSet():
  636. self.doneflag.set()
  637. def _safestop(self, r=None):
  638. if not threadable.isInIOThread():
  639. self.external_add_task(0, self._stop)
  640. else:
  641. self._stop()
  642. def _stop(self, r=None):
  643. assert thread.get_ident() == self.ident
  644. connections = list(self.single_sockets)
  645. for connection in connections:
  646. try:
  647. connection.close()
  648. except:
  649. pass
  650. reactor.suggestThreadPoolSize(0)
  651. try:
  652. reactor.stop()
  653. except RuntimeError:
  654. # exceptions.RuntimeError: can't stop reactor that isn't running
  655. pass
  656. def _remove_socket(self, s, was_connected=False):
  657. # opt-out
  658. if not s.dying:
  659. self._make_wrapped_call(s.handler.connection_lost, s, wrapper=s)
  660. s._cleanup()
  661. self.single_sockets.remove(s)
  662. if was_connected:
  663. self.connections -= 1
  664. def connectionMade(self, s):
  665. self.connections += 1
  666. def gethostbyname(self, name):
  667. return self.reactor.resolve(name)
  668. def gethostbyaddr(self, addr):
  669. return deferToThread(socket.gethostbyaddr, addr)