Encrypter.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. # Written by Bram Cohen
  2. # see LICENSE.txt for license information
  3. from cStringIO import StringIO
  4. from binascii import b2a_hex
  5. from socket import error as socketerror
  6. from urllib import quote
  7. from traceback import print_exc
  8. from BitTornado.BTcrypto import Crypto
  9. try:
  10. True
  11. except:
  12. True = 1
  13. False = 0
  14. bool = lambda x: not not x
  15. DEBUG = False
  16. MAX_INCOMPLETE = 8
  17. protocol_name = 'BitTorrent protocol'
  18. option_pattern = chr(0)*8
  19. def toint(s):
  20. return long(b2a_hex(s), 16)
  21. def tobinary16(i):
  22. return chr((i >> 8) & 0xFF) + chr(i & 0xFF)
  23. hexchars = '0123456789ABCDEF'
  24. hexmap = []
  25. for i in xrange(256):
  26. hexmap.append(hexchars[(i&0xF0)/16]+hexchars[i&0x0F])
  27. def tohex(s):
  28. r = []
  29. for c in s:
  30. r.append(hexmap[ord(c)])
  31. return ''.join(r)
  32. def make_readable(s):
  33. if not s:
  34. return ''
  35. if quote(s).find('%') >= 0:
  36. return tohex(s)
  37. return '"'+s+'"'
  38. class IncompleteCounter:
  39. def __init__(self):
  40. self.c = 0
  41. def increment(self):
  42. self.c += 1
  43. def decrement(self):
  44. self.c -= 1
  45. def toomany(self):
  46. return self.c >= MAX_INCOMPLETE
  47. incompletecounter = IncompleteCounter()
  48. # header, options, download id, my id, [length, message]
  49. class Connection:
  50. def __init__(self, Encoder, connection, id,
  51. ext_handshake=False, encrypted = None, options = None):
  52. self.Encoder = Encoder
  53. self.connection = connection
  54. self.connecter = Encoder.connecter
  55. self.id = id
  56. self.locally_initiated = (id != None)
  57. self.readable_id = make_readable(id)
  58. self.complete = False
  59. self.keepalive = lambda: None
  60. self.closed = False
  61. self.buffer = ''
  62. self.bufferlen = None
  63. self.log = None
  64. self.read = self._read
  65. self.write = self._write
  66. self.cryptmode = 0
  67. self.encrypter = None
  68. if self.locally_initiated:
  69. incompletecounter.increment()
  70. if encrypted:
  71. self.encrypted = True
  72. self.encrypter = Crypto(True)
  73. self.write(self.encrypter.pubkey+self.encrypter.padding())
  74. else:
  75. self.encrypted = False
  76. self.write(chr(len(protocol_name)) + protocol_name +
  77. option_pattern + self.Encoder.download_id )
  78. self.next_len, self.next_func = 1+len(protocol_name), self.read_header
  79. elif ext_handshake:
  80. self.Encoder.connecter.external_connection_made += 1
  81. if encrypted: # passed an already running encrypter
  82. self.encrypter = encrypted
  83. self.encrypted = True
  84. self._start_crypto()
  85. self.next_len, self.next_func = 14, self.read_crypto_block3c
  86. else:
  87. self.encrypted = False
  88. self.options = options
  89. self.write(self.Encoder.my_id)
  90. self.next_len, self.next_func = 20, self.read_peer_id
  91. else:
  92. self.encrypted = None # don't know yet
  93. self.next_len, self.next_func = 1+len(protocol_name), self.read_header
  94. self.Encoder.raw_server.add_task(self._auto_close, 30)
  95. def _log_start(self): # only called with DEBUG = True
  96. self.log = open('peerlog.'+self.get_ip()+'.txt','a')
  97. self.log.write('connected - ')
  98. if self.locally_initiated:
  99. self.log.write('outgoing\n')
  100. else:
  101. self.log.write('incoming\n')
  102. self._logwritefunc = self.write
  103. self.write = self._log_write
  104. def _log_write(self, s):
  105. self.log.write('w:'+b2a_hex(s)+'\n')
  106. self._logwritefunc(s)
  107. def get_ip(self, real=False):
  108. return self.connection.get_ip(real)
  109. def get_id(self):
  110. return self.id
  111. def get_readable_id(self):
  112. return self.readable_id
  113. def is_locally_initiated(self):
  114. return self.locally_initiated
  115. def is_encrypted(self):
  116. return bool(self.encrypted)
  117. def is_flushed(self):
  118. return self.connection.is_flushed()
  119. def _read_header(self, s):
  120. if s == chr(len(protocol_name))+protocol_name:
  121. return 8, self.read_options
  122. return None
  123. def read_header(self, s):
  124. if self._read_header(s):
  125. if self.encrypted or self.Encoder.config['crypto_stealth']:
  126. return None
  127. return 8, self.read_options
  128. if self.locally_initiated and not self.encrypted:
  129. return None
  130. elif not self.Encoder.config['crypto_allowed']:
  131. return None
  132. if not self.encrypted:
  133. self.encrypted = True
  134. self.encrypter = Crypto(self.locally_initiated)
  135. self._write_buffer(s)
  136. return self.encrypter.keylength, self.read_crypto_header
  137. ################## ENCRYPTION SUPPORT ######################
  138. def _start_crypto(self):
  139. self.encrypter.setrawaccess(self._read,self._write)
  140. self.write = self.encrypter.write
  141. self.read = self.encrypter.read
  142. if self.buffer:
  143. self.buffer = self.encrypter.decrypt(self.buffer)
  144. def _end_crypto(self):
  145. self.read = self._read
  146. self.write = self._write
  147. self.encrypter = None
  148. def read_crypto_header(self, s):
  149. self.encrypter.received_key(s)
  150. self.encrypter.set_skey(self.Encoder.download_id)
  151. if self.locally_initiated:
  152. if self.Encoder.config['crypto_only']:
  153. cryptmode = '\x00\x00\x00\x02' # full stream encryption
  154. else:
  155. cryptmode = '\x00\x00\x00\x03' # header or full stream
  156. padc = self.encrypter.padding()
  157. self.write( self.encrypter.block3a
  158. + self.encrypter.block3b
  159. + self.encrypter.encrypt(
  160. ('\x00'*8) # VC
  161. + cryptmode # acceptable crypto modes
  162. + tobinary16(len(padc))
  163. + padc # PadC
  164. + '\x00\x00' ) ) # no initial payload data
  165. self._max_search = 520
  166. return 1, self.read_crypto_block4a
  167. self.write(self.encrypter.pubkey+self.encrypter.padding())
  168. self._max_search = 520
  169. return 0, self.read_crypto_block3a
  170. def _search_for_pattern(self, s, pat):
  171. p = s.find(pat)
  172. if p < 0:
  173. if len(s) >= len(pat):
  174. self._max_search -= len(s)+1-len(pat)
  175. if self._max_search < 0:
  176. self.close()
  177. return False
  178. self._write_buffer(s[1-len(pat):])
  179. return False
  180. self._write_buffer(s[p+len(pat):])
  181. return True
  182. ### INCOMING CONNECTION ###
  183. def read_crypto_block3a(self, s):
  184. if not self._search_for_pattern(s,self.encrypter.block3a):
  185. return -1, self.read_crypto_block3a # wait for more data
  186. return len(self.encrypter.block3b), self.read_crypto_block3b
  187. def read_crypto_block3b(self, s):
  188. if s != self.encrypter.block3b:
  189. return None
  190. self.Encoder.connecter.external_connection_made += 1
  191. self._start_crypto()
  192. return 14, self.read_crypto_block3c
  193. def read_crypto_block3c(self, s):
  194. if s[:8] != ('\x00'*8): # check VC
  195. return None
  196. self.cryptmode = toint(s[8:12]) % 4
  197. if self.cryptmode == 0:
  198. return None # no encryption selected
  199. if ( self.cryptmode == 1 # only header encryption
  200. and self.Encoder.config['crypto_only'] ):
  201. return None
  202. padlen = (ord(s[12])<<8)+ord(s[13])
  203. if padlen > 512:
  204. return None
  205. return padlen+2, self.read_crypto_pad3
  206. def read_crypto_pad3(self, s):
  207. s = s[-2:]
  208. ialen = (ord(s[0])<<8)+ord(s[1])
  209. if ialen > 65535:
  210. return None
  211. if self.cryptmode == 1:
  212. cryptmode = '\x00\x00\x00\x01' # header only encryption
  213. else:
  214. cryptmode = '\x00\x00\x00\x02' # full stream encryption
  215. padd = self.encrypter.padding()
  216. self.write( ('\x00'*8) # VC
  217. + cryptmode # encryption mode
  218. + tobinary16(len(padd))
  219. + padd ) # PadD
  220. if ialen:
  221. return ialen, self.read_crypto_ia
  222. return self.read_crypto_block3done()
  223. def read_crypto_ia(self, s):
  224. if DEBUG:
  225. self._log_start()
  226. self.log.write('r:'+b2a_hex(s)+'(ia)\n')
  227. if self.buffer:
  228. self.log.write('r:'+b2a_hex(self.buffer)+'(buffer)\n')
  229. return self.read_crypto_block3done(s)
  230. def read_crypto_block3done(self, ia=''):
  231. if DEBUG:
  232. if not self.log:
  233. self._log_start()
  234. if self.cryptmode == 1: # only handshake encryption
  235. assert not self.buffer # oops; check for exceptions to this
  236. self._end_crypto()
  237. if ia:
  238. self._write_buffer(ia)
  239. return 1+len(protocol_name), self.read_encrypted_header
  240. ### OUTGOING CONNECTION ###
  241. def read_crypto_block4a(self, s):
  242. if not self._search_for_pattern(s,self.encrypter.VC_pattern()):
  243. return -1, self.read_crypto_block4a # wait for more data
  244. self._start_crypto()
  245. return 6, self.read_crypto_block4b
  246. def read_crypto_block4b(self, s):
  247. self.cryptmode = toint(s[:4]) % 4
  248. if self.cryptmode == 1: # only header encryption
  249. if self.Encoder.config['crypto_only']:
  250. return None
  251. elif self.cryptmode != 2:
  252. return None # unknown encryption
  253. padlen = (ord(s[4])<<8)+ord(s[5])
  254. if padlen > 512:
  255. return None
  256. if padlen:
  257. return padlen, self.read_crypto_pad4
  258. return self.read_crypto_block4done()
  259. def read_crypto_pad4(self, s):
  260. # discard data
  261. return self.read_crypto_block4done()
  262. def read_crypto_block4done(self):
  263. if DEBUG:
  264. self._log_start()
  265. if self.cryptmode == 1: # only handshake encryption
  266. if not self.buffer: # oops; check for exceptions to this
  267. return None
  268. self._end_crypto()
  269. self.write(chr(len(protocol_name)) + protocol_name +
  270. option_pattern + self.Encoder.download_id)
  271. return 1+len(protocol_name), self.read_encrypted_header
  272. ### START PROTOCOL OVER ENCRYPTED CONNECTION ###
  273. def read_encrypted_header(self, s):
  274. return self._read_header(s)
  275. ################################################
  276. def read_options(self, s):
  277. self.options = s
  278. return 20, self.read_download_id
  279. def read_download_id(self, s):
  280. if ( s != self.Encoder.download_id
  281. or not self.Encoder.check_ip(ip=self.get_ip()) ):
  282. return None
  283. if not self.locally_initiated:
  284. if not self.encrypted:
  285. self.Encoder.connecter.external_connection_made += 1
  286. self.write(chr(len(protocol_name)) + protocol_name +
  287. option_pattern + self.Encoder.download_id + self.Encoder.my_id)
  288. return 20, self.read_peer_id
  289. def read_peer_id(self, s):
  290. if not self.encrypted and self.Encoder.config['crypto_only']:
  291. return None # allows older trackers to ping,
  292. # but won't proceed w/ connections
  293. if not self.id:
  294. self.id = s
  295. self.readable_id = make_readable(s)
  296. else:
  297. if s != self.id:
  298. return None
  299. self.complete = self.Encoder.got_id(self)
  300. if not self.complete:
  301. return None
  302. if self.locally_initiated:
  303. self.write(self.Encoder.my_id)
  304. incompletecounter.decrement()
  305. self._switch_to_read2()
  306. c = self.Encoder.connecter.connection_made(self)
  307. self.keepalive = c.send_keepalive
  308. return 4, self.read_len
  309. def read_len(self, s):
  310. l = toint(s)
  311. if l > self.Encoder.max_len:
  312. return None
  313. return l, self.read_message
  314. def read_message(self, s):
  315. if s != '':
  316. self.connecter.got_message(self, s)
  317. return 4, self.read_len
  318. def read_dead(self, s):
  319. return None
  320. def _auto_close(self):
  321. if not self.complete:
  322. self.close()
  323. def close(self):
  324. if not self.closed:
  325. self.connection.close()
  326. self.sever()
  327. def sever(self):
  328. if self.log:
  329. self.log.write('closed\n')
  330. self.log.close()
  331. self.closed = True
  332. del self.Encoder.connections[self.connection]
  333. if self.complete:
  334. self.connecter.connection_lost(self)
  335. elif self.locally_initiated:
  336. incompletecounter.decrement()
  337. def send_message_raw(self, message):
  338. self.write(message)
  339. def _write(self, message):
  340. if not self.closed:
  341. self.connection.write(message)
  342. def data_came_in(self, connection, s):
  343. self.read(s)
  344. def _write_buffer(self, s):
  345. self.buffer = s+self.buffer
  346. def _read(self, s):
  347. if self.log:
  348. self.log.write('r:'+b2a_hex(s)+'\n')
  349. self.Encoder.measurefunc(len(s))
  350. self.buffer += s
  351. while True:
  352. if self.closed:
  353. return
  354. # self.next_len = # of characters function expects
  355. # or 0 = all characters in the buffer
  356. # or -1 = wait for next read, then all characters in the buffer
  357. # not compatible w/ keepalives, switch out after all negotiation complete
  358. if self.next_len <= 0:
  359. m = self.buffer
  360. self.buffer = ''
  361. elif len(self.buffer) >= self.next_len:
  362. m = self.buffer[:self.next_len]
  363. self.buffer = self.buffer[self.next_len:]
  364. else:
  365. return
  366. try:
  367. x = self.next_func(m)
  368. except:
  369. self.next_len, self.next_func = 1, self.read_dead
  370. raise
  371. if x is None:
  372. self.close()
  373. return
  374. self.next_len, self.next_func = x
  375. if self.next_len < 0: # already checked buffer
  376. return # wait for additional data
  377. if self.bufferlen is not None:
  378. self._read2('')
  379. return
  380. def _switch_to_read2(self):
  381. self._write_buffer = None
  382. if self.encrypter:
  383. self.encrypter.setrawaccess(self._read2,self._write)
  384. else:
  385. self.read = self._read2
  386. self.bufferlen = len(self.buffer)
  387. self.buffer = [self.buffer]
  388. def _read2(self, s): # more efficient, requires buffer['',''] & bufferlen
  389. if self.log:
  390. self.log.write('r:'+b2a_hex(s)+'\n')
  391. self.Encoder.measurefunc(len(s))
  392. while True:
  393. if self.closed:
  394. return
  395. p = self.next_len-self.bufferlen
  396. if self.next_len == 0:
  397. m = ''
  398. elif s:
  399. if p > len(s):
  400. self.buffer.append(s)
  401. self.bufferlen += len(s)
  402. return
  403. self.bufferlen = len(s)-p
  404. self.buffer.append(s[:p])
  405. m = ''.join(self.buffer)
  406. if p == len(s):
  407. self.buffer = []
  408. else:
  409. self.buffer=[s[p:]]
  410. s = ''
  411. elif p <= 0:
  412. # assert len(self.buffer) == 1
  413. s = self.buffer[0]
  414. self.bufferlen = len(s)-self.next_len
  415. m = s[:self.next_len]
  416. if p == 0:
  417. self.buffer = []
  418. else:
  419. self.buffer = [s[self.next_len:]]
  420. s = ''
  421. else:
  422. return
  423. try:
  424. x = self.next_func(m)
  425. except:
  426. self.next_len, self.next_func = 1, self.read_dead
  427. raise
  428. if x is None:
  429. self.close()
  430. return
  431. self.next_len, self.next_func = x
  432. if self.next_len < 0: # already checked buffer
  433. return # wait for additional data
  434. def connection_flushed(self, connection):
  435. if self.complete:
  436. self.connecter.connection_flushed(self)
  437. def connection_lost(self, connection):
  438. if self.Encoder.connections.has_key(connection):
  439. self.sever()
  440. class _dummy_banlist:
  441. def includes(self, x):
  442. return False
  443. class Encoder:
  444. def __init__(self, connecter, raw_server, my_id, max_len,
  445. schedulefunc, keepalive_delay, download_id,
  446. measurefunc, config, bans=_dummy_banlist() ):
  447. self.raw_server = raw_server
  448. self.connecter = connecter
  449. self.my_id = my_id
  450. self.max_len = max_len
  451. self.schedulefunc = schedulefunc
  452. self.keepalive_delay = keepalive_delay
  453. self.download_id = download_id
  454. self.measurefunc = measurefunc
  455. self.config = config
  456. self.connections = {}
  457. self.banned = {}
  458. self.external_bans = bans
  459. self.to_connect = []
  460. self.paused = False
  461. if self.config['max_connections'] == 0:
  462. self.max_connections = 2 ** 30
  463. else:
  464. self.max_connections = self.config['max_connections']
  465. schedulefunc(self.send_keepalives, keepalive_delay)
  466. def send_keepalives(self):
  467. self.schedulefunc(self.send_keepalives, self.keepalive_delay)
  468. if self.paused:
  469. return
  470. for c in self.connections.values():
  471. c.keepalive()
  472. def start_connections(self, list):
  473. if not self.to_connect:
  474. self.raw_server.add_task(self._start_connection_from_queue)
  475. self.to_connect = list
  476. def _start_connection_from_queue(self):
  477. if self.connecter.external_connection_made:
  478. max_initiate = self.config['max_initiate']
  479. else:
  480. max_initiate = int(self.config['max_initiate']*1.5)
  481. cons = len(self.connections)
  482. if cons >= self.max_connections or cons >= max_initiate:
  483. delay = 60
  484. elif self.paused or incompletecounter.toomany():
  485. delay = 1
  486. else:
  487. delay = 0
  488. dns, id, encrypted = self.to_connect.pop(0)
  489. self.start_connection(dns, id, encrypted)
  490. if self.to_connect:
  491. self.raw_server.add_task(self._start_connection_from_queue, delay)
  492. def start_connection(self, dns, id, encrypted = None):
  493. if ( self.paused
  494. or len(self.connections) >= self.max_connections
  495. or id == self.my_id
  496. or not self.check_ip(ip=dns[0]) ):
  497. return True
  498. if self.config['crypto_only']:
  499. if encrypted is None or encrypted: # fails on encrypted = 0
  500. encrypted = True
  501. else:
  502. return True
  503. for v in self.connections.values():
  504. if v is None:
  505. continue
  506. if id and v.id == id:
  507. return True
  508. ip = v.get_ip(True)
  509. if self.config['security'] and ip != 'unknown' and ip == dns[0]:
  510. return True
  511. try:
  512. c = self.raw_server.start_connection(dns)
  513. con = Connection(self, c, id, encrypted = encrypted)
  514. self.connections[c] = con
  515. c.set_handler(con)
  516. except socketerror:
  517. return False
  518. return True
  519. def _start_connection(self, dns, id, encrypted = None):
  520. def foo(self=self, dns=dns, id=id, encrypted=encrypted):
  521. self.start_connection(dns, id, encrypted)
  522. self.schedulefunc(foo, 0)
  523. def check_ip(self, connection=None, ip=None):
  524. if not ip:
  525. ip = connection.get_ip(True)
  526. if self.config['security'] and self.banned.has_key(ip):
  527. return False
  528. if self.external_bans.includes(ip):
  529. return False
  530. return True
  531. def got_id(self, connection):
  532. if connection.id == self.my_id:
  533. self.connecter.external_connection_made -= 1
  534. return False
  535. ip = connection.get_ip(True)
  536. for v in self.connections.values():
  537. if connection is not v:
  538. if connection.id == v.id:
  539. if ip == v.get_ip(True):
  540. v.close()
  541. else:
  542. return False
  543. if self.config['security'] and ip != 'unknown' and ip == v.get_ip(True):
  544. v.close()
  545. return True
  546. def external_connection_made(self, connection):
  547. if self.paused or len(self.connections) >= self.max_connections:
  548. connection.close()
  549. return False
  550. con = Connection(self, connection, None)
  551. self.connections[connection] = con
  552. connection.set_handler(con)
  553. return True
  554. def externally_handshaked_connection_made(self, connection, options,
  555. already_read, encrypted = None):
  556. if ( self.paused
  557. or len(self.connections) >= self.max_connections
  558. or not self.check_ip(connection=connection) ):
  559. connection.close()
  560. return False
  561. con = Connection(self, connection, None,
  562. ext_handshake = True, encrypted = encrypted, options = options)
  563. self.connections[connection] = con
  564. connection.set_handler(con)
  565. if already_read:
  566. con.data_came_in(con, already_read)
  567. return True
  568. def close_all(self):
  569. for c in self.connections.values():
  570. c.close()
  571. self.connections = {}
  572. def ban(self, ip):
  573. self.banned[ip] = 1
  574. def pause(self, flag):
  575. self.paused = flag