Connecter.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. # Written by Bram Cohen
  2. # see LICENSE.txt for license information
  3. from BitTornado.bitfield import Bitfield
  4. from BitTornado.clock import clock
  5. from binascii import b2a_hex
  6. try:
  7. True
  8. except:
  9. True = 1
  10. False = 0
  11. DEBUG1 = False
  12. DEBUG2 = False
  13. def toint(s):
  14. return long(b2a_hex(s), 16)
  15. def tobinary(i):
  16. return (chr(i >> 24) + chr((i >> 16) & 0xFF) +
  17. chr((i >> 8) & 0xFF) + chr(i & 0xFF))
  18. CHOKE = chr(0)
  19. UNCHOKE = chr(1)
  20. INTERESTED = chr(2)
  21. NOT_INTERESTED = chr(3)
  22. # index
  23. HAVE = chr(4)
  24. # index, bitfield
  25. BITFIELD = chr(5)
  26. # index, begin, length
  27. REQUEST = chr(6)
  28. # index, begin, piece
  29. PIECE = chr(7)
  30. # index, begin, piece
  31. CANCEL = chr(8)
  32. class Connection:
  33. def __init__(self, connection, connecter, ccount):
  34. self.connection = connection
  35. self.connecter = connecter
  36. self.ccount = ccount
  37. self.got_anything = False
  38. self.next_upload = None
  39. self.outqueue = []
  40. self.partial_message = None
  41. self.download = None
  42. self.send_choke_queued = False
  43. self.just_unchoked = None
  44. def get_ip(self, real=False):
  45. return self.connection.get_ip(real)
  46. def get_id(self):
  47. return self.connection.get_id()
  48. def get_readable_id(self):
  49. return self.connection.get_readable_id()
  50. def close(self):
  51. if DEBUG1:
  52. print (self.ccount,'connection closed')
  53. self.connection.close()
  54. def is_locally_initiated(self):
  55. return self.connection.is_locally_initiated()
  56. def is_encrypted(self):
  57. return self.connection.is_encrypted()
  58. def send_interested(self):
  59. self._send_message(INTERESTED)
  60. def send_not_interested(self):
  61. self._send_message(NOT_INTERESTED)
  62. def send_choke(self):
  63. if self.partial_message:
  64. self.send_choke_queued = True
  65. else:
  66. self._send_message(CHOKE)
  67. self.upload.choke_sent()
  68. self.just_unchoked = 0
  69. def send_unchoke(self):
  70. if self.send_choke_queued:
  71. self.send_choke_queued = False
  72. if DEBUG1:
  73. print (self.ccount,'CHOKE SUPPRESSED')
  74. else:
  75. self._send_message(UNCHOKE)
  76. if ( self.partial_message or self.just_unchoked is None
  77. or not self.upload.interested or self.download.active_requests ):
  78. self.just_unchoked = 0
  79. else:
  80. self.just_unchoked = clock()
  81. def send_request(self, index, begin, length):
  82. self._send_message(REQUEST + tobinary(index) +
  83. tobinary(begin) + tobinary(length))
  84. if DEBUG1:
  85. print (self.ccount,'sent request',index,begin,begin+length)
  86. def send_cancel(self, index, begin, length):
  87. self._send_message(CANCEL + tobinary(index) +
  88. tobinary(begin) + tobinary(length))
  89. if DEBUG1:
  90. print (self.ccount,'sent cancel',index,begin,begin+length)
  91. def send_bitfield(self, bitfield):
  92. self._send_message(BITFIELD + bitfield)
  93. def send_have(self, index):
  94. self._send_message(HAVE + tobinary(index))
  95. def send_keepalive(self):
  96. self._send_message('')
  97. def _send_message(self, s):
  98. if DEBUG2:
  99. if s:
  100. print (self.ccount,'SENDING MESSAGE',ord(s[0]),len(s))
  101. else:
  102. print (self.ccount,'SENDING MESSAGE',-1,0)
  103. s = tobinary(len(s))+s
  104. if self.partial_message:
  105. self.outqueue.append(s)
  106. else:
  107. self.connection.send_message_raw(s)
  108. def send_partial(self, bytes):
  109. if self.connection.closed:
  110. return 0
  111. if self.partial_message is None:
  112. s = self.upload.get_upload_chunk()
  113. if s is None:
  114. return 0
  115. index, begin, piece = s
  116. self.partial_message = ''.join((
  117. tobinary(len(piece) + 9), PIECE,
  118. tobinary(index), tobinary(begin), piece.tostring() ))
  119. if DEBUG1:
  120. print (self.ccount,'sending chunk',index,begin,begin+len(piece))
  121. if bytes < len(self.partial_message):
  122. self.connection.send_message_raw(self.partial_message[:bytes])
  123. self.partial_message = self.partial_message[bytes:]
  124. return bytes
  125. q = [self.partial_message]
  126. self.partial_message = None
  127. if self.send_choke_queued:
  128. self.send_choke_queued = False
  129. self.outqueue.append(tobinary(1)+CHOKE)
  130. self.upload.choke_sent()
  131. self.just_unchoked = 0
  132. q.extend(self.outqueue)
  133. self.outqueue = []
  134. q = ''.join(q)
  135. self.connection.send_message_raw(q)
  136. return len(q)
  137. def get_upload(self):
  138. return self.upload
  139. def get_download(self):
  140. return self.download
  141. def set_download(self, download):
  142. self.download = download
  143. def backlogged(self):
  144. return not self.connection.is_flushed()
  145. def got_request(self, i, p, l):
  146. self.upload.got_request(i, p, l)
  147. if self.just_unchoked:
  148. self.connecter.ratelimiter.ping(clock() - self.just_unchoked)
  149. self.just_unchoked = 0
  150. class Connecter:
  151. def __init__(self, make_upload, downloader, choker, numpieces,
  152. totalup, config, ratelimiter, sched = None):
  153. self.downloader = downloader
  154. self.make_upload = make_upload
  155. self.choker = choker
  156. self.numpieces = numpieces
  157. self.config = config
  158. self.ratelimiter = ratelimiter
  159. self.rate_capped = False
  160. self.sched = sched
  161. self.totalup = totalup
  162. self.rate_capped = False
  163. self.connections = {}
  164. self.external_connection_made = 0
  165. self.ccount = 0
  166. def how_many_connections(self):
  167. return len(self.connections)
  168. def connection_made(self, connection):
  169. self.ccount += 1
  170. c = Connection(connection, self, self.ccount)
  171. if DEBUG2:
  172. print (c.ccount,'connection made')
  173. self.connections[connection] = c
  174. c.upload = self.make_upload(c, self.ratelimiter, self.totalup)
  175. c.download = self.downloader.make_download(c)
  176. self.choker.connection_made(c)
  177. return c
  178. def connection_lost(self, connection):
  179. c = self.connections[connection]
  180. if DEBUG2:
  181. print (c.ccount,'connection closed')
  182. del self.connections[connection]
  183. if c.download:
  184. c.download.disconnected()
  185. self.choker.connection_lost(c)
  186. def connection_flushed(self, connection):
  187. conn = self.connections[connection]
  188. if conn.next_upload is None and (conn.partial_message is not None
  189. or len(conn.upload.buffer) > 0):
  190. self.ratelimiter.queue(conn)
  191. def got_piece(self, i):
  192. for co in self.connections.values():
  193. co.send_have(i)
  194. def got_message(self, connection, message):
  195. c = self.connections[connection]
  196. t = message[0]
  197. if DEBUG2:
  198. print (c.ccount,'message received',ord(t))
  199. if t == BITFIELD and c.got_anything:
  200. if DEBUG2:
  201. print (c.ccount,'misplaced bitfield')
  202. connection.close()
  203. return
  204. c.got_anything = True
  205. if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and
  206. len(message) != 1):
  207. if DEBUG2:
  208. print (c.ccount,'bad message length')
  209. connection.close()
  210. return
  211. if t == CHOKE:
  212. c.download.got_choke()
  213. elif t == UNCHOKE:
  214. c.download.got_unchoke()
  215. elif t == INTERESTED:
  216. if not c.download.have.complete():
  217. c.upload.got_interested()
  218. elif t == NOT_INTERESTED:
  219. c.upload.got_not_interested()
  220. elif t == HAVE:
  221. if len(message) != 5:
  222. if DEBUG2:
  223. print (c.ccount,'bad message length')
  224. connection.close()
  225. return
  226. i = toint(message[1:])
  227. if i >= self.numpieces:
  228. if DEBUG2:
  229. print (c.ccount,'bad piece number')
  230. connection.close()
  231. return
  232. if c.download.got_have(i):
  233. c.upload.got_not_interested()
  234. elif t == BITFIELD:
  235. try:
  236. b = Bitfield(self.numpieces, message[1:])
  237. except ValueError:
  238. if DEBUG2:
  239. print (c.ccount,'bad bitfield')
  240. connection.close()
  241. return
  242. if c.download.got_have_bitfield(b):
  243. c.upload.got_not_interested()
  244. elif t == REQUEST:
  245. if len(message) != 13:
  246. if DEBUG2:
  247. print (c.ccount,'bad message length')
  248. connection.close()
  249. return
  250. i = toint(message[1:5])
  251. if i >= self.numpieces:
  252. if DEBUG2:
  253. print (c.ccount,'bad piece number')
  254. connection.close()
  255. return
  256. c.got_request(i, toint(message[5:9]),
  257. toint(message[9:]))
  258. elif t == CANCEL:
  259. if len(message) != 13:
  260. if DEBUG2:
  261. print (c.ccount,'bad message length')
  262. connection.close()
  263. return
  264. i = toint(message[1:5])
  265. if i >= self.numpieces:
  266. if DEBUG2:
  267. print (c.ccount,'bad piece number')
  268. connection.close()
  269. return
  270. c.upload.got_cancel(i, toint(message[5:9]),
  271. toint(message[9:]))
  272. elif t == PIECE:
  273. if len(message) <= 9:
  274. if DEBUG2:
  275. print (c.ccount,'bad message length')
  276. connection.close()
  277. return
  278. i = toint(message[1:5])
  279. if i >= self.numpieces:
  280. if DEBUG2:
  281. print (c.ccount,'bad piece number')
  282. connection.close()
  283. return
  284. if c.download.got_piece(i, toint(message[5:9]), message[9:]):
  285. self.got_piece(i)
  286. else:
  287. connection.close()