ServerPortHandler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. # Written by John Hoffman
  2. # see LICENSE.txt for license information
  3. from cStringIO import StringIO
  4. #from RawServer import RawServer
  5. from BTcrypto import Crypto
  6. try:
  7. True
  8. except:
  9. True = 1
  10. False = 0
  11. from BT1.Encrypter import protocol_name
  12. default_task_id = []
  13. class SingleRawServer:
  14. def __init__(self, info_hash, multihandler, doneflag, protocol):
  15. self.info_hash = info_hash
  16. self.doneflag = doneflag
  17. self.protocol = protocol
  18. self.multihandler = multihandler
  19. self.rawserver = multihandler.rawserver
  20. self.finished = False
  21. self.running = False
  22. self.handler = None
  23. self.taskqueue = []
  24. def shutdown(self):
  25. if not self.finished:
  26. self.multihandler.shutdown_torrent(self.info_hash)
  27. def _shutdown(self):
  28. if not self.finished:
  29. self.finished = True
  30. self.running = False
  31. self.rawserver.kill_tasks(self.info_hash)
  32. if self.handler:
  33. self.handler.close_all()
  34. def _external_connection_made(self, c, options, already_read,
  35. encrypted = None ):
  36. if self.running:
  37. c.set_handler(self.handler)
  38. self.handler.externally_handshaked_connection_made(
  39. c, options, already_read, encrypted = encrypted)
  40. ### RawServer functions ###
  41. def add_task(self, func, delay=0, id = default_task_id):
  42. if id is default_task_id:
  43. id = self.info_hash
  44. if not self.finished:
  45. self.rawserver.add_task(func, delay, id)
  46. # def bind(self, port, bind = '', reuse = False):
  47. # pass # not handled here
  48. def start_connection(self, dns, handler = None):
  49. if not handler:
  50. handler = self.handler
  51. c = self.rawserver.start_connection(dns, handler)
  52. return c
  53. # def listen_forever(self, handler):
  54. # pass # don't call with this
  55. def start_listening(self, handler):
  56. self.handler = handler
  57. self.running = True
  58. return self.shutdown # obviously, doesn't listen forever
  59. def is_finished(self):
  60. return self.finished
  61. def get_exception_flag(self):
  62. return self.rawserver.get_exception_flag()
  63. class NewSocketHandler: # hand a new socket off where it belongs
  64. def __init__(self, multihandler, connection):
  65. self.multihandler = multihandler
  66. self.connection = connection
  67. connection.set_handler(self)
  68. self.closed = False
  69. self.buffer = ''
  70. self.complete = False
  71. self.read = self._read
  72. self.write = connection.write
  73. self.next_len, self.next_func = 1+len(protocol_name), self.read_header
  74. self.multihandler.rawserver.add_task(self._auto_close, 30)
  75. def _auto_close(self):
  76. if not self.complete:
  77. self.close()
  78. def close(self):
  79. if not self.closed:
  80. self.connection.close()
  81. self.closed = True
  82. # copied from Encrypter and modified
  83. def _read_header(self, s):
  84. if s == chr(len(protocol_name))+protocol_name:
  85. self.protocol = protocol_name
  86. return 8, self.read_options
  87. return None
  88. def read_header(self, s):
  89. if self._read_header(s):
  90. if self.multihandler.config['crypto_only']:
  91. return None
  92. return 8, self.read_options
  93. if not self.multihandler.config['crypto_allowed']:
  94. return None
  95. self.encrypted = True
  96. self.encrypter = Crypto(False)
  97. self._write_buffer(s)
  98. return self.encrypter.keylength, self.read_crypto_header
  99. def read_crypto_header(self, s):
  100. self.encrypter.received_key(s)
  101. self.write(self.encrypter.pubkey+self.encrypter.padding())
  102. self._max_search = 520
  103. return 0, self.read_crypto_block3a
  104. def _search_for_pattern(self, s, pat):
  105. p = s.find(pat)
  106. if p < 0:
  107. self._max_search -= len(s)+1-len(pat)
  108. if self._max_search < 0:
  109. self.close()
  110. return False
  111. self._write_buffer(s[1-len(pat):])
  112. return False
  113. self._write_buffer(s[p+len(pat):])
  114. return True
  115. def read_crypto_block3a(self, s):
  116. if not self._search_for_pattern(s,self.encrypter.block3a):
  117. return -1, self.read_crypto_block3a # wait for more data
  118. return 20, self.read_crypto_block3b
  119. def read_crypto_block3b(self, s):
  120. for k in self.multihandler.singlerawservers.keys():
  121. if self.encrypter.test_skey(s,k):
  122. self.multihandler.singlerawservers[k]._external_connection_made(
  123. self.connection, None, self.buffer,
  124. encrypted = self.encrypter )
  125. return True
  126. return None
  127. def read_options(self, s):
  128. self.options = s
  129. return 20, self.read_download_id
  130. def read_download_id(self, s):
  131. if self.multihandler.singlerawservers.has_key(s):
  132. if self.multihandler.singlerawservers[s].protocol == self.protocol:
  133. self.multihandler.singlerawservers[s]._external_connection_made(
  134. self.connection, self.options, self.buffer)
  135. return True
  136. return None
  137. def read_dead(self, s):
  138. return None
  139. def data_came_in(self, garbage, s):
  140. self.read(s)
  141. def _write_buffer(self, s):
  142. self.buffer = s+self.buffer
  143. def _read(self, s):
  144. self.buffer += s
  145. while True:
  146. if self.closed:
  147. return
  148. # self.next_len = # of characters function expects
  149. # or 0 = all characters in the buffer
  150. # or -1 = wait for next read, then all characters in the buffer
  151. if self.next_len <= 0:
  152. m = self.buffer
  153. self.buffer = ''
  154. elif len(self.buffer) >= self.next_len:
  155. m = self.buffer[:self.next_len]
  156. self.buffer = self.buffer[self.next_len:]
  157. else:
  158. return
  159. try:
  160. x = self.next_func(m)
  161. except:
  162. self.next_len, self.next_func = 1, self.read_dead
  163. raise
  164. if x is None:
  165. self.close()
  166. return
  167. if x == True:
  168. self.complete = True
  169. return
  170. self.next_len, self.next_func = x
  171. if self.next_len < 0: # already checked buffer
  172. return # wait for additional data
  173. def connection_flushed(self, ss):
  174. pass
  175. def connection_lost(self, ss):
  176. self.closed = True
  177. class MultiHandler:
  178. def __init__(self, rawserver, doneflag, config):
  179. self.rawserver = rawserver
  180. self.masterdoneflag = doneflag
  181. self.config = config
  182. self.singlerawservers = {}
  183. self.connections = {}
  184. self.taskqueues = {}
  185. def newRawServer(self, info_hash, doneflag, protocol=protocol_name):
  186. new = SingleRawServer(info_hash, self, doneflag, protocol)
  187. self.singlerawservers[info_hash] = new
  188. return new
  189. def shutdown_torrent(self, info_hash):
  190. self.singlerawservers[info_hash]._shutdown()
  191. del self.singlerawservers[info_hash]
  192. def listen_forever(self):
  193. self.rawserver.listen_forever(self)
  194. for srs in self.singlerawservers.values():
  195. srs.finished = True
  196. srs.running = False
  197. srs.doneflag.set()
  198. ### RawServer handler functions ###
  199. # be wary of name collisions
  200. def external_connection_made(self, ss):
  201. NewSocketHandler(self, ss)