Connector.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Originally written by Bram Cohen, heavily modified by Uoti Urpala
  11. # Fast extensions added by David Harrison
  12. from __future__ import generators
  13. # DEBUG
  14. # If you think FAST_EXTENSION is causing problems then set the following:
  15. disable_fast_extension = False
  16. # END DEBUG
  17. noisy = False
  18. log_data = False
  19. # for crypto
  20. from random import randrange
  21. from BTL.hash import sha
  22. from Crypto.Cipher import ARC4
  23. # urandom comes from obsoletepythonsupport
  24. import struct
  25. from struct import pack, unpack
  26. from cStringIO import StringIO
  27. from BTL.bencode import bencode, bdecode
  28. from BitTorrent.RawServer_twisted import Handler
  29. from BTL.bitfield import Bitfield
  30. from BTL import IPTools
  31. from BTL.obsoletepythonsupport import *
  32. from BitTorrent.ClientIdentifier import identify_client
  33. from BTL.platform import app_name
  34. from BitTorrent import version
  35. import logging
  36. def toint(s):
  37. return struct.unpack("!i", s)[0]
  38. def tobinary(i):
  39. return struct.pack("!i", i)
  40. class BTMessages(object):
  41. def __init__(self, messages):
  42. self.message_to_chr = {}
  43. self.chr_to_message = {}
  44. for o, v in messages.iteritems():
  45. c = chr(o)
  46. self.chr_to_message[c] = v
  47. self.message_to_chr[v] = c
  48. def __getitem__(self, key):
  49. return self.chr_to_message.get(key, "UNKNOWN: %r" % key)
  50. message_dict = BTMessages({
  51. 0:'CHOKE',
  52. 1:'UNCHOKE',
  53. 2:'INTERESTED',
  54. 3:'NOT_INTERESTED',
  55. 4:'HAVE',
  56. # index, bitfield
  57. 5:'BITFIELD',
  58. # index, begin, length
  59. 6:'REQUEST',
  60. # index, begin, piece
  61. 7:'PIECE',
  62. # index, begin, piece
  63. 8:'CANCEL',
  64. # 2-byte port message
  65. 9:'PORT',
  66. # no args
  67. 10:'WANT_METAINFO',
  68. 11:'METAINFO',
  69. # index
  70. 12:'SUSPECT_PIECE',
  71. # index
  72. 13:'SUGGEST_PIECE', # FAST_EXTENSION
  73. # no args
  74. 14:'HAVE_ALL', # FAST_EXTENSION
  75. 15:'HAVE_NONE', # FAST_EXTENSION
  76. # index, begin, length
  77. 16:'REJECT_REQUEST', # FAST_EXTENSION
  78. # index
  79. 17:'ALLOWED_FAST', # FAST_EXTENSION
  80. # compact_addr
  81. 18:'HOLE_PUNCH', # NAT_TRAVERSAL
  82. # message id, bencoded payload
  83. 20:'UTORRENT_MSG', # UTORRENT
  84. })
  85. # put all the message identifiers in the module
  86. locals().update(message_dict.message_to_chr)
  87. # I am not even shitting you.
  88. AZUREUS_SUCKS = CHOKE
  89. UTORRENT_MSG_INFO = chr(0)
  90. # in reality this could be variable
  91. UTORRENT_MSG_PEX = chr(1)
  92. # reserved flags:
  93. # reserved[0]
  94. # 0x80 Azureus Messaging Protocol
  95. AZUREUS = 0x80
  96. # reserved[5]
  97. # 0x10 uTorrent extensions: peer exchange, encrypted connections,
  98. # broadcast listen port.
  99. UTORRENT = 0x10
  100. # reserved[7]
  101. DHT = 0x01
  102. FAST_EXTENSION = 0x04 # suggest, haveall, havenone, reject request,
  103. # and allow fast extensions.
  104. NAT_TRAVERSAL = 0x08 # holepunch
  105. LAST_BYTE = DHT
  106. if not disable_fast_extension:
  107. LAST_BYTE |= FAST_EXTENSION
  108. LAST_BYTE |= NAT_TRAVERSAL
  109. FLAGS = ['\0'] * 8
  110. #FLAGS[0] = chr( AZUREUS )
  111. FLAGS[5] = chr( UTORRENT )
  112. FLAGS[7] = chr( LAST_BYTE )
  113. FLAGS = ''.join(FLAGS)
  114. protocol_name = 'BitTorrent protocol'
  115. # for crypto
  116. dh_prime = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A63A36210000000000090563
  117. PAD_MAX = 200 # less than protocol maximum, and later assumed to be < 256
  118. DH_BYTES = 96
  119. def bytetonum(x):
  120. return long(x.encode('hex'), 16)
  121. def numtobyte(x):
  122. x = hex(x).lstrip('0x').rstrip('Ll')
  123. x = '0'*(192 - len(x)) + x
  124. return x.decode('hex')
  125. if noisy:
  126. connection_logger = logging.getLogger("BitTorrent.Connector")
  127. connection_logger.setLevel(logging.DEBUG)
  128. stream_handler = logging.StreamHandler()
  129. connection_logger.addHandler(stream_handler)
  130. log = connection_logger.debug
  131. # Tracker NAT checking:
  132. # Aside: When you start up a Torrent, the first connection after contacting
  133. # the tracker is probably a callback from the tracker to perform a NatCheck.
  134. # (I was a bit confused about where this connection was coming from that
  135. # didn't have any bits set in the handshake's reserved bytes when
  136. # with there were no other peers. Call me stupid.) --Dave
  137. class Connector(Handler):
  138. """Implements the syntax of the BitTorrent protocol.
  139. See Upload.py and Download.py for the connection-level
  140. semantics."""
  141. def __init__(self, parent, connection, id, is_local,
  142. obfuscate_outgoing=False, log_prefix = "", lan=False):
  143. self.parent = parent
  144. self.connection = connection
  145. self.id = id
  146. self.ip = connection.ip
  147. self.port = connection.port
  148. self.addr = (self.ip, self.port)
  149. self.hostname = None
  150. self.locally_initiated = is_local
  151. if self.locally_initiated:
  152. self.max_message_length = self.parent.config['max_message_length']
  153. self.listening_port = self.port
  154. else:
  155. self.listening_port = None
  156. self.complete = False
  157. self.lan = lan
  158. self.closed = False
  159. self.got_anything = False
  160. self.next_upload = None
  161. self.upload = None
  162. self.download = None
  163. self._buffer = StringIO()
  164. self._reader = self._read_messages()
  165. self._next_len = self._reader.next()
  166. self._message = None
  167. self._partial_message = None
  168. self._outqueue = StringIO()
  169. self._decrypt = None
  170. self._privkey = None
  171. self.choke_sent = True
  172. self.uses_utorrent_extension = False
  173. self.uses_utorrent_pex = False
  174. self.utorrent_pex_id = None
  175. self.uses_azureus_extension = False
  176. self.uses_azureus_pex = False
  177. self.uses_dht = False
  178. self.uses_fast_extension = False
  179. self.uses_nat_traversal = False
  180. self.obfuscate_outgoing = obfuscate_outgoing
  181. self.dht_port = None
  182. self.local_pex_set = set()
  183. self.remote_pex_set = set()
  184. self.sloppy_pre_connection_counter = 0
  185. self._sent_listeners = set()
  186. self.received_data = False
  187. self.log_prefix = log_prefix
  188. if self.locally_initiated:
  189. self.logger = logging.getLogger(
  190. self.log_prefix + '.' + repr(self.parent.infohash) +
  191. '.peer_id_not_yet')
  192. else:
  193. self.logger = logging.getLogger(
  194. self.log_prefix + '.infohash_not_yet.peer_id_not_yet')
  195. self.logger.setLevel(logging.DEBUG)
  196. if noisy:
  197. self.logger.addHandler(stream_handler)
  198. if self.locally_initiated:
  199. self.send_handshake()
  200. # Greg's comments: ow ow ow
  201. self.connection.handler = self
  202. def protocol_violation(self, s):
  203. msg = "%s %s" % (s, self.addr)
  204. if self.id:
  205. msg += " %r" % (identify_client(self.id), )
  206. if noisy:
  207. log("FAUX PAS: %s" % msg)
  208. self.logger.info(msg)
  209. def send_handshake(self):
  210. if self.obfuscate_outgoing:
  211. privkey = bytetonum(urandom(20))
  212. self._privkey = privkey
  213. pubkey = pow(2, privkey, dh_prime)
  214. out = numtobyte(pubkey) + urandom(randrange(PAD_MAX))
  215. self.connection.write(out)
  216. else:
  217. if noisy:
  218. l = [ c.encode('hex') for c in list(FLAGS) ]
  219. log("sending reserved: %s" % ' '.join(l))
  220. self.connection.write(''.join((chr(len(protocol_name)),
  221. protocol_name,
  222. FLAGS,
  223. self.parent.infohash)))
  224. # if we already have the peer's id, just send ours.
  225. # otherwise we wait for it.
  226. if self.id is not None:
  227. self.connection.write(self.parent.my_id)
  228. def set_parent(self, parent):
  229. self.parent = parent
  230. self.max_message_length = self.parent.config['max_message_length']
  231. def close(self):
  232. if noisy: log("CLOSE")
  233. if not self.closed:
  234. self.parent.remove_addr_from_cache(self.addr)
  235. self.connection.close()
  236. def send_interested(self):
  237. if noisy:
  238. log("SEND %s" % message_dict[INTERESTED])
  239. self._send_message(INTERESTED)
  240. def send_not_interested(self):
  241. if noisy:
  242. log("SEND %s" % message_dict[NOT_INTERESTED])
  243. self._send_message(NOT_INTERESTED)
  244. def send_choke(self):
  245. if self._partial_message is None:
  246. if noisy:
  247. log("SEND %s" % message_dict[CHOKE])
  248. self._send_message(CHOKE)
  249. self.choke_sent = True
  250. self.upload.sent_choke()
  251. def send_unchoke(self):
  252. if self._partial_message is None:
  253. if noisy:
  254. log("SEND %s" % message_dict[UNCHOKE])
  255. self._send_message(UNCHOKE)
  256. self.choke_sent = False
  257. def send_port(self, port):
  258. if noisy:
  259. log("SEND %s" % message_dict[PORT])
  260. self._send_message(PORT, pack('!H', port))
  261. def send_request(self, index, begin, length):
  262. if noisy:
  263. log("SEND %s %d %d %d" % (message_dict[REQUEST], index, begin, length))
  264. self._send_message(pack("!ciii", REQUEST, index, begin, length))
  265. def send_cancel(self, index, begin, length):
  266. self._send_message(pack("!ciii", CANCEL, index, begin, length))
  267. def send_bitfield(self, bitfield):
  268. if noisy:
  269. log("SEND %s" % message_dict[BITFIELD])
  270. self._send_message(BITFIELD, bitfield)
  271. def send_have(self, index):
  272. if noisy:
  273. log("SEND %s" % message_dict[HAVE])
  274. self._send_message(pack("!ci", HAVE, index))
  275. def send_have_all(self):
  276. assert(self.uses_fast_extension)
  277. if noisy:
  278. log("SEND %s" % message_dict[HAVE_ALL])
  279. self._send_message(pack("!c", HAVE_ALL))
  280. def send_have_none(self):
  281. assert(self.uses_fast_extension)
  282. if noisy:
  283. log("SEND %s" % message_dict[HAVE_NONE])
  284. self._send_message(pack("!c", HAVE_NONE))
  285. def send_reject_request(self, index, begin, length):
  286. assert(self.uses_fast_extension)
  287. self._send_message(pack("!ciii", REJECT_REQUEST, index, begin, length))
  288. def send_allowed_fast(self, index):
  289. assert(self.uses_fast_extension)
  290. self._send_message(pack("!ci", ALLOWED_FAST, index))
  291. def send_keepalive(self):
  292. self._send_message('')
  293. def send_holepunch_request(self, addr):
  294. # disabled, for now.
  295. return
  296. if not self.uses_nat_traversal:
  297. # maybe close?
  298. return
  299. d = {'t': 'r'}
  300. d['p'] = IPTools.compact(*addr)
  301. self._send_message(HOLE_PUNCH, d)
  302. def send_pex(self, pex_set):
  303. if not (self.uses_utorrent_extension and self.uses_utorrent_pex):
  304. return
  305. added = pex_set.difference(self.local_pex_set)
  306. dropped = self.local_pex_set.difference(pex_set)
  307. self.local_pex_set = pex_set
  308. if added or dropped:
  309. d = {}
  310. d['added'] = IPTools.compact_sequence(added)
  311. # TODO: set seeds bytes
  312. d['added.f'] = chr(0) * len(added) # hmm..
  313. d['dropped'] = IPTools.compact_sequence(dropped)
  314. self._send_message(UTORRENT_MSG,
  315. chr(self.utorrent_pex_id), bencode(d))
  316. def add_sent_listener(self, listener):
  317. """Passed a function/functor that accepts a single byte argument,
  318. which is called everytime this uploader sends a chunk."""
  319. self._sent_listeners.add(listener)
  320. def remove_sent_listener(self, listener):
  321. self._sent_listeners.remove(listener)
  322. def fire_sent_listeners(self, bytes):
  323. for f in self._sent_listeners:
  324. f(bytes)
  325. def send_partial(self, bytes):
  326. if self.closed:
  327. return 0
  328. if self._partial_message is None and not self.upload.buffer:
  329. return 0
  330. if self._partial_message is None:
  331. buf = StringIO()
  332. while self.upload.buffer and buf.tell() < bytes:
  333. t, piece = self.upload.buffer.pop(0)
  334. index, begin, length = t
  335. msg = pack("!icii%s" % len(piece), len(piece) + 9, PIECE,
  336. index, begin)
  337. buf.write(msg)
  338. buf.write(piece)
  339. if noisy: log("SEND PIECE %d %d" % (index, begin))
  340. self._partial_message = buf.getvalue()
  341. if bytes < len(self._partial_message):
  342. self.fire_sent_listeners(bytes)
  343. self.connection.write(buffer(self._partial_message, 0, bytes))
  344. self._partial_message = buffer(self._partial_message, bytes)
  345. return bytes
  346. if self.choke_sent != self.upload.choked:
  347. if self.upload.choked:
  348. self._outqueue.write(pack("!ic", 1, CHOKE))
  349. self.upload.sent_choke()
  350. else:
  351. self._outqueue.write(pack("!ic", 1, UNCHOKE))
  352. self.choke_sent = self.upload.choked
  353. buf = StringIO()
  354. buf.write(self._partial_message)
  355. self._partial_message = None
  356. buf.write(self._outqueue.getvalue())
  357. # optimize for cpu (reduce mallocs)
  358. #self._outqueue.truncate(0)
  359. # optimize for memory (free buffer memory)
  360. self._outqueue.close()
  361. self._outqueue = StringIO()
  362. queue = buf.getvalue()
  363. self.fire_sent_listeners(len(queue))
  364. self.connection.write(queue)
  365. return len(queue)
  366. # yields the number of bytes it wants next, gets those in self._message
  367. def _read_messages(self):
  368. # be compatible with encrypted clients. Thanks Uoti
  369. yield 1 + len(protocol_name)
  370. if (self._privkey is not None or
  371. self._message != chr(len(protocol_name)) + protocol_name):
  372. if self.locally_initiated:
  373. if self._privkey is None:
  374. return
  375. dhstr = self._message
  376. yield DH_BYTES - len(dhstr)
  377. dhstr += self._message
  378. pub = bytetonum(dhstr)
  379. S = numtobyte(pow(pub, self._privkey, dh_prime))
  380. pub = self._privkey = dhstr = None
  381. SKEY = self.parent.infohash
  382. x = sha('req3' + S).digest()
  383. streamid = sha('req2'+SKEY).digest()
  384. streamid = ''.join([chr(ord(streamid[i]) ^ ord(x[i]))
  385. for i in range(20)])
  386. encrypt = ARC4.new(sha('keyA' + S + SKEY).digest()).encrypt
  387. encrypt('x'*1024)
  388. padlen = randrange(PAD_MAX)
  389. x = sha('req1' + S).digest() + streamid + encrypt(
  390. '\x00'*8 + '\x00'*3+'\x02'+'\x00'+chr(padlen)+
  391. urandom(padlen)+'\x00\x00')
  392. self.connection.write(x)
  393. self.connection.encrypt = encrypt
  394. decrypt = ARC4.new(sha('keyB' + S + SKEY).digest()).decrypt
  395. decrypt('x'*1024)
  396. VC = decrypt('\x00'*8) # actually encrypt
  397. x = ''
  398. while 1:
  399. yield 1
  400. x += self._message
  401. i = (x + str(self._rest)).find(VC)
  402. if i >= 0:
  403. break
  404. yield len(self._rest)
  405. x += self._message
  406. if len(x) >= 520:
  407. self.protocol_violation('VC not found')
  408. return
  409. yield i + 8 + 4 + 2 - len(x)
  410. x = decrypt((x + self._message)[-6:])
  411. self._decrypt = decrypt
  412. if x[0:4] != '\x00\x00\x00\x02':
  413. self.protocol_violation('bad crypto method selected, not 2')
  414. return
  415. padlen = (ord(x[4]) << 8) + ord(x[5])
  416. if padlen > 512:
  417. self.protocol_violation('padlen too long')
  418. return
  419. self.connection.write(''.join((chr(len(protocol_name)),
  420. protocol_name, FLAGS,
  421. self.parent.infohash)))
  422. yield padlen
  423. else:
  424. dhstr = self._message
  425. yield DH_BYTES - len(dhstr)
  426. dhstr += self._message
  427. privkey = bytetonum(urandom(20))
  428. pub = numtobyte(pow(2, privkey, dh_prime))
  429. self.connection.write(''.join((pub, urandom(randrange(PAD_MAX)))))
  430. pub = bytetonum(dhstr)
  431. S = numtobyte(pow(pub, privkey, dh_prime))
  432. dhstr = pub = privkey = None
  433. streamid = sha('req1' + S).digest()
  434. x = ''
  435. while 1:
  436. yield 1
  437. x += self._message
  438. i = (x + str(self._rest)).find(streamid)
  439. if i >= 0:
  440. break
  441. yield len(self._rest)
  442. x += self._message
  443. if len(x) >= 532:
  444. self.protocol_violation('incoming VC not found')
  445. return
  446. yield i + 20 + 20 + 8 + 4 + 2 - len(x)
  447. self._message = (x + self._message)[-34:]
  448. streamid = self._message[0:20]
  449. x = sha('req3' + S).digest()
  450. streamid = ''.join([chr(ord(streamid[i]) ^ ord(x[i]))
  451. for i in range(20)])
  452. self.parent.select_torrent_obfuscated(self, streamid)
  453. if self.parent.infohash is None:
  454. self.protocol_violation('download id unknown/rejected')
  455. return
  456. self.logger = logging.getLogger(
  457. self.log_prefix + '.' + repr(self.parent.infohash) +
  458. '.peer_id_not_yet')
  459. SKEY = self.parent.infohash
  460. decrypt = ARC4.new(sha('keyA' + S + SKEY).digest()).decrypt
  461. decrypt('x'*1024)
  462. s = decrypt(self._message[20:34])
  463. if s[0:8] != '\x00' * 8:
  464. self.protocol_violation('BAD VC')
  465. return
  466. crypto_provide = toint(s[8:12])
  467. padlen = (ord(s[12]) << 8) + ord(s[13])
  468. if padlen > 512:
  469. self.protocol_violation('BAD padlen, too long')
  470. return
  471. self._decrypt = decrypt
  472. yield padlen + 2
  473. s = self._message
  474. encrypt = ARC4.new(sha('keyB' + S + SKEY).digest()).encrypt
  475. encrypt('x'*1024)
  476. self.connection.encrypt = encrypt
  477. if not crypto_provide & 2:
  478. self.protocol_violation("peer doesn't support crypto mode 2")
  479. return
  480. padlen = randrange(PAD_MAX)
  481. s = '\x00' * 11 + '\x02\x00' + chr(padlen) + urandom(padlen)
  482. self.connection.write(s)
  483. S = SKEY = s = x = streamid = VC = padlen = None
  484. yield 1 + len(protocol_name)
  485. if self._message != chr(len(protocol_name)) + protocol_name:
  486. self.protocol_violation('classic handshake fails')
  487. return
  488. yield 8 # reserved
  489. if noisy:
  490. l = [ c.encode('hex') for c in list(self._message) ]
  491. log("reserved: %s" % ' '.join(l))
  492. if ord(self._message[0]) & AZUREUS:
  493. if noisy: log("Implements Azureus extensions")
  494. if ord(FLAGS[0]) & AZUREUS:
  495. self.uses_azureus_extension = True
  496. if ord(self._message[5]) & UTORRENT:
  497. if noisy: log("Implements uTorrent extensions")
  498. if ord(FLAGS[5]) & UTORRENT:
  499. self.uses_utorrent_extension = True
  500. if ord(self._message[7]) & DHT:
  501. if noisy: log("Implements DHT")
  502. if ord(FLAGS[7]) & DHT:
  503. self.uses_dht = True
  504. if ord(self._message[7]) & FAST_EXTENSION:
  505. if noisy: log("Implements FAST_EXTENSION")
  506. if not disable_fast_extension:
  507. self.uses_fast_extension = True
  508. if ord(self._message[7]) & NAT_TRAVERSAL:
  509. if noisy: log("Implements NAT_TRAVERSAL")
  510. if ord(FLAGS[7]) & NAT_TRAVERSAL:
  511. self.uses_nat_traversal = True
  512. yield 20 # download id (i.e., infohash)
  513. if self.parent.infohash is None: # incoming connection
  514. # modifies self.parent if successful
  515. self.parent.select_torrent(self, self._message)
  516. if self.parent.infohash is None:
  517. # could be turned away due to connection limits
  518. #self.protocol_violation("no infohash from parent (peer from a "
  519. # "torrent you're not running: %s)" %
  520. # self._message.encode('hex'))
  521. return
  522. elif self._message != self.parent.infohash:
  523. self.protocol_violation("incorrect infohash from parent")
  524. return
  525. if not self.locally_initiated:
  526. self.connection.write(''.join((chr(len(protocol_name)),
  527. protocol_name, FLAGS,
  528. self.parent.infohash,
  529. self.parent.my_id)))
  530. yield 20 # peer id
  531. if noisy: log("peer id: %r" % self._message)
  532. # if we don't already have the peer's id, send ours
  533. if not self.id:
  534. self.id = self._message
  535. ns = (self.log_prefix + '.' + repr(self.parent.infohash) +
  536. '.' + repr(self.id)[1:-1])
  537. self.logger = logging.getLogger(ns)
  538. if self.id == self.parent.my_id:
  539. #self.protocol_violation("talking to self")
  540. return
  541. if self.id in self.parent.connector_ids:
  542. if self.parent.my_id > self.id:
  543. #self.protocol_violation("duplicate connection (id collision)")
  544. return
  545. if (self.parent.config['one_connection_per_ip'] and
  546. self.ip in self.parent.connector_ips):
  547. self.protocol_violation("duplicate connection (ip collision)")
  548. return
  549. if self.locally_initiated:
  550. self.connection.write(self.parent.my_id)
  551. else:
  552. self.parent.everinc = True
  553. else:
  554. # assert the id we have and the one we got are the same
  555. if self._message != self.id:
  556. self.protocol_violation("incorrect id have:%r got:%r" % (self.id, self._message))
  557. # this is not critical enough to disconnect. some clients have
  558. # an option to do this on purpose
  559. #return
  560. if self.uses_utorrent_extension:
  561. response = {'m': {'ut_pex': ord(UTORRENT_MSG_PEX)},
  562. 'v': ('%s %s' % (app_name, version)).encode('utf8'),
  563. 'e': 0,
  564. 'p': self.parent.reported_port,
  565. }
  566. response = bencode(response)
  567. self._send_message(UTORRENT_MSG,
  568. UTORRENT_MSG_INFO, response)
  569. self.complete = True
  570. self.parent.connection_handshake_completed(self)
  571. message_count = 0
  572. while True:
  573. yield 4 # message length
  574. l = toint(self._message)
  575. if l > self.max_message_length:
  576. d = '%s%s' % (self._message, self._rest)
  577. d = d[:10]
  578. self.protocol_violation("message length exceeds max "
  579. "(%s > %s): %r, count:%d" %
  580. (l, self.max_message_length, d,
  581. message_count))
  582. return
  583. if l > 0:
  584. yield l
  585. self._got_message(self._message)
  586. message_count += 1
  587. def _got_utorrent_msg(self, msg_type, d):
  588. if msg_type == UTORRENT_MSG_INFO:
  589. version = d.get('v')
  590. port = d.get('p')
  591. if port:
  592. self.listening_port = int(port)
  593. encryption = d.get('e')
  594. messages = d.get('m')
  595. if 'ut_pex' in messages:
  596. self.uses_utorrent_pex = True
  597. self.utorrent_pex_id = messages['ut_pex']
  598. if not isinstance(self.utorrent_pex_id, (int, long)):
  599. try:
  600. raise TypeError("LTEX message ids must be int not %r" % self.utorrent_pex_id)
  601. except:
  602. self.logger.exception("ut_pex support failed")
  603. self.uses_utorrent_pex = False
  604. elif msg_type == UTORRENT_MSG_PEX:
  605. for i, addr in enumerate(IPTools.uncompact_sequence(d['added'])):
  606. self.remote_pex_set.add(addr)
  607. if len(d['added.f']) > i:
  608. if (ord(d['added.f'][i]) & 2 and
  609. self.parent.downloader.storage.get_amount_left() == 0):
  610. # don't connect to seeds if we're done
  611. continue
  612. self.parent.start_connection(addr)
  613. dropped_gen = IPTools.uncompact_sequence(d['dropped'])
  614. self.remote_pex_set.difference_update(dropped_gen)
  615. def _got_azureus_msg(self, msg_type, d):
  616. port = d.get('tcp_port')
  617. if port:
  618. self.listening_port = int(port)
  619. m = d.get('messages', [])
  620. for msg in m:
  621. if msg.get('id') == 'AZ_PEER_EXCHANGE':
  622. self.uses_azureus_pex = True
  623. def _got_holepunch_msg(self, d):
  624. msg_type = d.get('t')
  625. if msg_type == 'r': # request
  626. print 'hole punch requested from', self.addr, 'to', d['p']
  627. d = {'t': 'i'}
  628. d['p'] = IPTools.compact(addr)
  629. self._send_message(HOLE_PUNCH, d)
  630. elif msg_type == 'i': # initiate
  631. print 'told to initiate connection(s) to:' + str(d['p'])
  632. else:
  633. self.protocol_violation("unknown hole punch msg type: %r" %
  634. msg_type)
  635. def _got_message(self, message):
  636. t = message[0]
  637. if t in [BITFIELD, HAVE_ALL, HAVE_NONE] and self.got_anything:
  638. self.protocol_violation("%s after got anything" % message_dict[t])
  639. self.close()
  640. return
  641. if t == UTORRENT_MSG and self.uses_utorrent_extension:
  642. msg_type = message[1]
  643. d = bdecode(message[2:])
  644. if noisy: log("UTORRENT_MSG: %r:%r" % (msg_type, d))
  645. self._got_utorrent_msg(msg_type, d)
  646. return
  647. if t == AZUREUS_SUCKS and self.uses_azureus_extension:
  648. magic_intro = 17
  649. msg_type = message[:magic_intro]
  650. d = bdecode(message[magic_intro:])
  651. if noisy: log("AZUREUS_MSG: %r:%r" % (msg_type, d))
  652. self._got_azureus_msg(msg_type, d)
  653. return
  654. if t == HOLE_PUNCH and self.uses_nat_traversal:
  655. d = ebdecode(message)
  656. if noisy: log("HOLE_PUNCH: %r" % d)
  657. self._got_holepunch_msg(d)
  658. return
  659. self.got_anything = True
  660. if (t in (CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED,
  661. HAVE_ALL, HAVE_NONE) and
  662. len(message) != 1):
  663. self.protocol_violation("%s with message length %d" %
  664. (message_dict[t], len(message)))
  665. if noisy: log("UNKNOWN: %r" % message)
  666. self.close()
  667. return
  668. if t == CHOKE:
  669. if noisy: log("GOT %s" % message_dict[t])
  670. self.download.got_choke()
  671. elif t == UNCHOKE:
  672. if noisy: log("GOT %s" % message_dict[t])
  673. self.download.got_unchoke()
  674. elif t == INTERESTED:
  675. if noisy: log("GOT %s" % message_dict[t])
  676. self.upload.got_interested()
  677. elif t == NOT_INTERESTED:
  678. if noisy: log("GOT %s" % message_dict[t])
  679. self.upload.got_not_interested()
  680. elif t == HAVE:
  681. if len(message) != 5:
  682. self.protocol_violation("HAVE length: %d != 5" %
  683. len(message))
  684. self.close()
  685. return
  686. i = unpack("!xi", message)[0]
  687. if noisy: log("GOT HAVE %d" % i)
  688. if i >= self.parent.numpieces:
  689. self.protocol_violation("HAVE %d >= %d" %
  690. (i, self.parent.numpieces))
  691. self.close()
  692. return
  693. self.download.got_have(i)
  694. elif t == BITFIELD:
  695. try:
  696. b = Bitfield(self.parent.numpieces, message[1:])
  697. except ValueError, e:
  698. self.protocol_violation("BITFIELD %s" %
  699. (e,))
  700. self.close()
  701. return
  702. self.download.got_have_bitfield(b)
  703. elif t == REQUEST:
  704. if len(message) != 13:
  705. self.protocol_violation("REQUEST length %d != 13" %
  706. len(message))
  707. self.close()
  708. return
  709. i, a, b = unpack("!xiii", message)
  710. if noisy: log("GOT REQUEST %d %d %d" % (i, a, b))
  711. if i >= self.parent.numpieces:
  712. self.protocol_violation(
  713. "Requested piece index out of range: %d > %d" %
  714. (i, self.parent.numpieces))
  715. self.close()
  716. return
  717. if a + b > self.parent.piece_size:
  718. self.protocol_violation(
  719. "Requested range exceeds piece size: "
  720. "(b:%d + l:%d == %d) > %d" %
  721. (a, b, a + b, self.parent.piece_size))
  722. self.close()
  723. return
  724. if self.download.have[i]:
  725. self.protocol_violation(
  726. "Requested piece index %d which the peer already has" %
  727. (i,))
  728. self.close()
  729. return
  730. self.upload.got_request(i, a, b)
  731. elif t == CANCEL:
  732. if len(message) != 13:
  733. self.protocol_violation("CANCEL length %d != 13" %
  734. len(message))
  735. self.close()
  736. return
  737. i, a, b = unpack("!xiii", message)
  738. if noisy: log("GOT CANCEL %d %d %d" % (i, a, b))
  739. if i >= self.parent.numpieces:
  740. self.protocol_violation(
  741. "Cancelled piece index %d > numpieces which is %d" %
  742. (i,self.parent.numpieces))
  743. self.close()
  744. return
  745. self.upload.got_cancel(i, a, b)
  746. elif t == PIECE:
  747. if len(message) <= 9:
  748. self.protocol_violation("PIECE %d <= 9" %
  749. len(message))
  750. self.close()
  751. return
  752. n = len(message) - 9
  753. i, a, b = unpack("!xii%ss" % n, message)
  754. if noisy: log("GOT PIECE %d %d" % (i, a))
  755. if i >= self.parent.numpieces:
  756. self.protocol_violation("PIECE %d >= %d" %
  757. (i, self.parent.numpieces))
  758. self.close()
  759. return
  760. self.download.got_piece(i, a, b)
  761. elif t == PORT:
  762. if len(message) != 3:
  763. self.protocol_violation("PORT %d != 3" %
  764. len(message))
  765. self.close()
  766. return
  767. self.dht_port = unpack('!H', message[1:3])[0]
  768. self.parent.got_port(self)
  769. elif t == SUGGEST_PIECE:
  770. if not self.uses_fast_extension:
  771. self.protocol_violation(
  772. "Received 'SUGGEST_PIECE' when fast extension disabled.")
  773. self.close()
  774. return
  775. if len(message) != 5:
  776. self.protocol_violation("SUGGEST_PIECE length: %d != 5" %
  777. len(message))
  778. self.close()
  779. return
  780. i = unpack("!xi", message)[0]
  781. if noisy: log("GOT SUGGEST_PIECE %d" % i)
  782. if i >= self.parent.numpieces:
  783. self.protocol_violation(
  784. "Received 'SUGGEST_PIECE' with piece id %d > numpieces." %
  785. self.parent.numpieces)
  786. self.close()
  787. return
  788. self.download.got_suggest_piece(i)
  789. elif t == HAVE_ALL:
  790. if noisy: log("GOT %s" % message_dict[t])
  791. if not self.uses_fast_extension:
  792. self.protocol_violation(
  793. "Received 'HAVE_ALL' when fast extension disabled.")
  794. self.close()
  795. return
  796. self.download.got_have_all()
  797. elif t == HAVE_NONE:
  798. if noisy: log("GOT %s" % message_dict[t])
  799. if not self.uses_fast_extension:
  800. self.protocol_violation(
  801. "Received 'HAVE_NONE' when fast extension disabled.")
  802. self.close()
  803. return
  804. self.download.got_have_none()
  805. elif t == REJECT_REQUEST:
  806. if not self.uses_fast_extension:
  807. self.protocol_violation(
  808. "Received 'REJECT_REQUEST' when fast extension disabled.")
  809. self.close()
  810. return
  811. if len(message) != 13:
  812. self.protocol_violation(
  813. "Received 'REJECT_REQUEST' with length %d != 13." %
  814. len(message))
  815. self.close()
  816. return
  817. i, a, b = unpack("!xiii", message)
  818. if noisy: log("GOT REJECT_REQUEST %d %d" % (i,a))
  819. if i >= self.parent.numpieces:
  820. self.protocol_violation("REJECT %d >= %d" %
  821. (i, self.parent.numpieces))
  822. self.close()
  823. return
  824. self.download.got_reject_request(i, a, b)
  825. elif t == ALLOWED_FAST:
  826. if not self.uses_fast_extension:
  827. self.protocol_violation(
  828. "Received 'ALLOWED_FAST' when fast extension disabled.")
  829. self.close()
  830. return
  831. if len(message) != 5:
  832. self.protocol_violation("ALLOWED_FAST length: %d != 5" %
  833. len(message))
  834. self.close()
  835. return
  836. i = unpack("!xi", message)[0]
  837. if noisy: log("GOT ALLOWED_FAST %d" % i)
  838. self.download.got_allowed_fast(i)
  839. else:
  840. if noisy: log("GOT %s length %d" % (message_dict[t], len(message)))
  841. self.protocol_violation("unhandled message %s" % message_dict[t])
  842. self.close()
  843. def _write(self, s):
  844. if self._partial_message is not None:
  845. self._outqueue.write(s)
  846. else:
  847. self.connection.write(s)
  848. def _send_message(self, *msg_a):
  849. if self.closed:
  850. return
  851. l = 0
  852. for e in msg_a:
  853. l += len(e)
  854. d = [tobinary(l), ]
  855. d.extend(msg_a)
  856. s = ''.join(d)
  857. self._write(s)
  858. def data_came_in(self, conn, s):
  859. self.received_data = True
  860. if not self.download:
  861. # this is really annoying.
  862. self.sloppy_pre_connection_counter += len(s)
  863. else:
  864. l = self.sloppy_pre_connection_counter + len(s)
  865. self.sloppy_pre_connection_counter = 0
  866. self.download.fire_raw_received_listeners(l)
  867. if log_data:
  868. assert self.addr == (conn.ip, conn.port)
  869. open('%s_%d.log' % self.addr, 'ab').write(s)
  870. while True:
  871. if self.closed:
  872. return
  873. i = self._next_len - self._buffer.tell()
  874. if i > len(s):
  875. # not enough bytes, keep buffering
  876. self._buffer.write(s)
  877. return
  878. if self._buffer.tell() > 0:
  879. # collect buffer + current for message
  880. self._buffer.write(buffer(s, 0, i))
  881. m = self._buffer.getvalue()
  882. # optimize for cpu (reduce mallocs)
  883. #self._buffer.truncate(0)
  884. # optimize for memory (free buffer memory)
  885. self._buffer.close()
  886. self._buffer = StringIO()
  887. else:
  888. # painful string copy
  889. m = s[:i]
  890. s = buffer(s, i)
  891. if self._decrypt is not None:
  892. m = self._decrypt(m)
  893. self._message = m
  894. self._rest = s
  895. try:
  896. self._next_len = self._reader.next()
  897. except StopIteration:
  898. self.close()
  899. return
  900. except:
  901. self.protocol_violation("Message parsing failed")
  902. self.logger.exception("Message parsing failed")
  903. self.close()
  904. return
  905. def _optional_restart(self):
  906. if (self.locally_initiated and not self.received_data and
  907. not self.obfuscate_outgoing):
  908. self.parent.start_connection(self.addr, id=None, encrypt=True)
  909. def connection_lost(self, conn):
  910. assert conn is self.connection
  911. self.closed = True
  912. self._reader = None
  913. self.parent.connection_lost(self)
  914. self._optional_restart()
  915. self.connection = None
  916. if self.complete:
  917. if self.download is not None:
  918. self.download.disconnected()
  919. self.upload = None
  920. self.download = None
  921. del self._buffer
  922. del self.parent
  923. self._sent_listeners.clear()
  924. del self._message
  925. del self._partial_message
  926. self.local_pex_set.clear()
  927. def connection_flushed(self, connection):
  928. if (self.complete and self.next_upload is None and
  929. (self._partial_message is not None
  930. or (self.upload and self.upload.buffer))):
  931. if self.lan:
  932. # bypass upload rate limiter
  933. self.send_partial(self.parent.ratelimiter.unitsize)
  934. else:
  935. self.parent.ratelimiter.queue(self)