1
0

ConnectionManager.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Bram Cohen and Greg Hazel
  11. from __future__ import division
  12. import sys
  13. from BTL.platform import app_name
  14. from BTL.translation import _
  15. from BitTorrent import BTFailure
  16. from BTL.obsoletepythonsupport import *
  17. from BTL.hash import sha
  18. from BitTorrent.RawServer_twisted import Handler
  19. from BitTorrent.Connector import Connector
  20. from BitTorrent.HTTPConnector import HTTPConnector
  21. from BitTorrent.LocalDiscovery import LocalDiscovery
  22. from BitTorrent.InternetWatcher import InternetSubscriber
  23. from BTL.DictWithLists import DictWithInts, OrderedDict
  24. from BTL.platform import bttime
  25. from BTL.rand_tools import iter_rand_pos
  26. import random
  27. import logging
  28. import urlparse
  29. ONLY_LOCAL = False
  30. GLOBAL_FILTER = None
  31. def GLOBAL_FILTER(ip, port, direction=""):
  32. #print ip, direction
  33. return False
  34. GLOBAL_FILTER = None
  35. # header, reserved, download id, my id, [length, message]
  36. LOWER_BOUND = 1
  37. UPPER_BOUND = 120
  38. BUFFER = 1.2
  39. use_timeout_order = False
  40. timeout_order = [3, 15, 30]
  41. debug = False
  42. def set_timeout_metrics(delta):
  43. delta = max(delta, 0.0001)
  44. avg = ((timeout_order[0] / BUFFER) + delta) / 2
  45. avg *= BUFFER
  46. avg = max(LOWER_BOUND, avg)
  47. avg = min(UPPER_BOUND, avg)
  48. timeout_order[0] = avg
  49. timeout_order[2] = timeout_order[0] * 30
  50. timeout_order[1] = timeout_order[2] / 2
  51. class GaurdedInitialConnection(Handler):
  52. def __init__(self, parent, id, encrypt=False, log_prefix="", lan=False,
  53. urgent=False, timeout=None ):
  54. self.t = None
  55. self.id = id
  56. self.lan = lan
  57. self.parent = parent
  58. self.urgent = urgent
  59. self.timeout = timeout
  60. self.encrypt = encrypt
  61. self.connector = None
  62. self.log_prefix = log_prefix
  63. def _make_connector(self, s):
  64. addr = (s.ip, s.port)
  65. self.parent.cache_complete_peer(addr, self.id, type(self),
  66. encrypt=self.encrypt,
  67. urgent=self.urgent,
  68. lan=self.lan)
  69. return Connector(self.parent, s, self.id, True,
  70. obfuscate_outgoing=self.encrypt,
  71. log_prefix=self.log_prefix,
  72. lan=self.lan)
  73. def connection_starting(self, addr):
  74. self.start = bttime()
  75. self.t = self.parent.add_task(self.timeout,
  76. self.parent._cancel_connection, addr)
  77. def _abort_timeout(self):
  78. if self.t and self.t.active():
  79. self.t.cancel()
  80. self.t = None
  81. def connection_made(self, s):
  82. t = bttime() - self.start
  83. set_timeout_metrics(t)
  84. addr = (s.ip, s.port)
  85. if debug:
  86. self.parent.logger.warning('connection made: %s %s' %
  87. (addr, t))
  88. del self.parent.pending_connections[addr]
  89. self._abort_timeout()
  90. con = self._make_connector(s)
  91. self.parent._add_connection(con)
  92. # if the pending queue filled and put the remaining connections
  93. # into the spare list, this will push more connections in to pending
  94. self.parent.replace_connection()
  95. def connection_failed(self, s, exception):
  96. addr = (s.ip, s.port)
  97. if debug:
  98. self.parent.logger.warning('connection failed: %s %s' %
  99. (addr, exception.getErrorMessage()))
  100. if s.connector.wasPreempted():
  101. self.parent._resubmit_connection(addr)
  102. del self.parent.pending_connections[addr]
  103. self._abort_timeout()
  104. # only holepunch if this connection timed out entirely
  105. if self.timeout >= timeout_order[-1]:
  106. c = self.parent.find_connection_in_common(addr)
  107. if c:
  108. c.send_holepunch_request(addr)
  109. self.parent.replace_connection()
  110. class HTTPInitialConnection(GaurdedInitialConnection):
  111. def _make_connector(self, s):
  112. addr = (s.ip, s.port)
  113. self.parent.cache_complete_peer(addr, self.id, type(self),
  114. urgent=self.urgent)
  115. # ow!
  116. piece_size = self.parent.downloader.storage.piece_size
  117. urlage = self.parent.downloader.urlage
  118. return HTTPConnector(self.parent, piece_size, urlage, s, self.id, True,
  119. log_prefix=self.log_prefix)
  120. class ConnectionManager(InternetSubscriber):
  121. def __init__(self, make_upload, downloader, choker,
  122. numpieces, ratelimiter,
  123. rawserver, config, private, my_id, add_task, infohash, context,
  124. addcontactfunc, reported_port, tracker_ips, log_prefix ):
  125. """
  126. @param downloader: MultiDownload for this torrent.
  127. @param my_id: my peer id.
  128. @param tracker_ips: list of tracker ip addresses.
  129. ConnectionManager does not drop connections from the tracker.
  130. This allows trackers to perform NAT checks even when there
  131. are max_allow_in connections.
  132. @param log_prefix: string used as the prefix for all
  133. log entries generated by the ConnectionManager and its
  134. created Connectors.
  135. """
  136. self.make_upload = make_upload
  137. self.downloader = downloader
  138. self.choker = choker
  139. # aaargh
  140. self.piece_size = downloader.storage.piece_size
  141. self.numpieces = numpieces
  142. self.ratelimiter = ratelimiter
  143. self.rawserver = rawserver
  144. self.my_id = my_id
  145. self.private = private
  146. self.config = config
  147. self.add_task = add_task
  148. self.infohash = infohash
  149. self.context = context
  150. self.addcontact = addcontactfunc
  151. self.reported_port = reported_port
  152. self.everinc = False
  153. self.tracker_ips = tracker_ips
  154. self.log_prefix = log_prefix
  155. self.logger = logging.getLogger(self.log_prefix)
  156. self.closed = False
  157. # submitted
  158. self.pending_connections = {}
  159. # transport connected
  160. self.connectors = set()
  161. # protocol active
  162. # we do a lot of itterating and few mutations, so use a list
  163. self.complete_connectors = [] # set()
  164. # use a dict for a little semi-randomness
  165. self.spares = {} # OrderedDict()
  166. self.cached_peers = OrderedDict()
  167. self.cache_limit = 300
  168. self.connector_ips = DictWithInts()
  169. self.connector_ids = DictWithInts()
  170. self.banned = set()
  171. self._ka_task = self.add_task(config['keepalive_interval'],
  172. self.send_keepalives)
  173. self._pex_task = None
  174. if not self.private:
  175. self._pex_task = self.add_task(config['pex_interval'],
  176. self.send_pex)
  177. self.reopen(reported_port)
  178. def cleanup(self):
  179. if not self.closed:
  180. self.close_connections()
  181. del self.context
  182. self.cached_peers.clear()
  183. if self._ka_task.active():
  184. self._ka_task.cancel()
  185. if self._pex_task and self._pex_task.active():
  186. self._pex_task.cancel()
  187. def reopen(self, port):
  188. self.closed = False
  189. self.reported_port = port
  190. self.unthrottle_connections()
  191. for addr in self.cached_peers:
  192. self._fire_cached_connection(addr)
  193. self.rawserver.internet_watcher.add_subscriber(self)
  194. def internet_active(self):
  195. for addr in self.cached_peers.iterkeys():
  196. self._fire_cached_connection(addr)
  197. def remove_addr_from_cache(self, addr):
  198. # could have been an incoming connection
  199. # or could have been dropped by the cache limit
  200. if addr in self.cached_peers:
  201. del self.cached_peers[addr]
  202. def try_one_connection(self):
  203. keys = self.cached_peers.keys()
  204. if not keys:
  205. return False
  206. addr = random.choice(keys)
  207. self._fire_cached_connection(addr)
  208. return True
  209. def _fire_cached_connection(self, addr):
  210. v = self.cached_peers[addr]
  211. complete, (id, handler, a, kw) = v
  212. return self._start_connection(addr, id, handler, *a, **kw)
  213. def cache_complete_peer(self, addr, pid, handler, *a, **kw):
  214. self.cache_peer(addr, pid, handler, 1, *a, **kw)
  215. def cache_incomplete_peer(self, addr, pid, handler, *a, **kw):
  216. self.cache_peer(addr, pid, handler, 0, *a, **kw)
  217. def cache_peer(self, addr, pid, handler, complete, *a, **kw):
  218. # obey the cache size limit
  219. if (addr not in self.cached_peers and
  220. len(self.cached_peers) >= self.cache_limit):
  221. for k, v in self.cached_peers.iteritems():
  222. if not v[0]:
  223. del self.cached_peers[k]
  224. break
  225. else:
  226. # cache full of completes, delete a random peer.
  227. # yes, this can cache an incomplete when the cache is full of
  228. # completes, but only 1 because of the filter above.
  229. oldaddr = self.cached_peers.keys()[0]
  230. del self.cached_peers[oldaddr]
  231. elif not complete:
  232. if addr in self.cached_peers and self.cached_peers[addr][0]:
  233. # don't overwrite a complete with an incomplete.
  234. return
  235. self.cached_peers[addr] = (complete, (pid, handler, a, kw))
  236. def send_keepalives(self):
  237. self._ka_task = self.add_task(self.config['keepalive_interval'],
  238. self.send_keepalives)
  239. for c in self.complete_connectors:
  240. c.send_keepalive()
  241. def send_pex(self):
  242. self._pex_task = self.add_task(self.config['pex_interval'],
  243. self.send_pex)
  244. pex_set = set()
  245. for c in self.complete_connectors:
  246. if c.listening_port:
  247. pex_set.add((c.ip, c.listening_port))
  248. for c in self.complete_connectors:
  249. c.send_pex(pex_set)
  250. def hashcheck_succeeded(self, i):
  251. for c in self.complete_connectors:
  252. # should we send a have message if peer already has the piece?
  253. # yes! it is low bandwidth and useful for that peer.
  254. c.send_have(i)
  255. def find_connection_in_common(self, addr):
  256. for c in self.complete_connectors:
  257. if addr in c.remote_pex_set:
  258. return c
  259. # returns False if the connection info has been pushed on to self.spares
  260. # other filters and a successful connection return True
  261. def start_connection(self, addr, id=None, encrypt=False, lan=False):
  262. """@param addr: domain name/ip address and port pair.
  263. @param id: peer id.
  264. """
  265. return self._start_connection(addr, id, GaurdedInitialConnection,
  266. encrypt=encrypt,
  267. lan=lan)
  268. def start_http_connection(self, url):
  269. r = urlparse.urlparse(url)
  270. host = r[1]
  271. if ':' in host:
  272. host, port = host.split(':')
  273. port = int(port)
  274. else:
  275. port = 80
  276. df = self.rawserver.gethostbyname(host)
  277. df.addCallback(self._connect_http, port, url)
  278. df.addLogback(self.logger.warning, "Resolve failed")
  279. def _connect_http(self, ip, port, url):
  280. self._start_connection((ip, port), url,
  281. HTTPInitialConnection, urgent=True)
  282. def _start_connection(self, addr, pid, handler, *a, **kw):
  283. """@param addr: domain name/ip address and port pair.
  284. @param pid: peer id.
  285. """
  286. if self.closed:
  287. return True
  288. if addr[0] in self.banned:
  289. return True
  290. if pid == self.my_id:
  291. return True
  292. for v in self.connectors:
  293. if pid and v.id == pid:
  294. return True
  295. if self.config['one_connection_per_ip'] and v.ip == addr[0]:
  296. return True
  297. total_outstanding = len(self.connectors)
  298. # it's possible the pending connections could eventually complete,
  299. # so we have to account for those when enforcing max_initiate
  300. total_outstanding += len(self.pending_connections)
  301. if total_outstanding >= self.config['max_initiate']:
  302. self.spares[(addr, pid)] = (handler, a, kw)
  303. return False
  304. # if these fail, I'm getting a very weird addr object
  305. assert isinstance(addr, tuple)
  306. assert isinstance(addr[0], str)
  307. assert isinstance(addr[1], int)
  308. if ONLY_LOCAL and addr[0] != "127.0.0.1" and not addr[0].startswith("192.168") and addr[1] != 80:
  309. return True
  310. if GLOBAL_FILTER and not GLOBAL_FILTER(addr[0], addr[1], "out"):
  311. return True
  312. if addr not in self.cached_peers:
  313. self.cache_incomplete_peer(addr, pid, handler, *a, **kw)
  314. # sometimes we try to connect to a peer we're already trying to
  315. # connect to
  316. #assert addr not in self.pending_connections
  317. if addr in self.pending_connections:
  318. return True
  319. kw['log_prefix'] = self.log_prefix
  320. timeout = 30
  321. if use_timeout_order:
  322. timeout = timeout_order[0]
  323. kw.setdefault('timeout', timeout)
  324. h = handler(self, pid, *a, **kw)
  325. self.pending_connections[addr] = (h, (addr, pid, handler, a, kw))
  326. urgent = kw.pop('urgent', False)
  327. connector = self.rawserver.start_connection(addr, h, self.context,
  328. # we'll handle timeouts.
  329. # not so fond of this.
  330. timeout=None,
  331. urgent=urgent)
  332. h.connector = connector
  333. return True
  334. def _resubmit_connection(self, addr):
  335. # we leave it on pending_connections.
  336. # so the standard connection_failed handling occurs.
  337. h, info = self.pending_connections[addr]
  338. addr, pid, handler, a, kw = info
  339. self.spares[(addr, pid)] = (handler, a, kw)
  340. def _cancel_connection(self, addr):
  341. if addr not in self.pending_connections:
  342. # already made
  343. return
  344. # we leave it on pending_connections.
  345. # so the standard connection_failed handling occurs.
  346. h, info = self.pending_connections[addr]
  347. addr, pid, handler, a, kw = info
  348. if use_timeout_order and h.timeout < timeout_order[-1]:
  349. for t in timeout_order:
  350. if t > h.timeout:
  351. h.timeout = t
  352. break
  353. else:
  354. h.timeout = timeout_order[-1]
  355. # this feels odd
  356. kw['timeout'] = h.timeout
  357. self.spares[(addr, pid)] = (handler, a, kw)
  358. # do this last, since twisted might fire the event handler from inside
  359. # the function
  360. # HMM:
  361. # should be stopConnecting, but I've seen this fail.
  362. # close does the same thing, but disconnects in the case where the
  363. # connection was made. Not sure how that occurs without add being in
  364. # self.pending_connections
  365. # Maybe this was fixed recently in CRLR.
  366. #h.connector.stopConnecting()
  367. h.connector.close()
  368. def connection_handshake_completed(self, connector):
  369. self.connector_ips.add(connector.ip)
  370. self.connector_ids.add(connector.id)
  371. self.complete_connectors.append(connector)
  372. connector.upload = self.make_upload(connector)
  373. connector.download = self.downloader.make_download(connector)
  374. self.choker.connection_made(connector)
  375. if connector.uses_dht:
  376. connector.send_port(self.reported_port)
  377. if self.config['resolve_hostnames']:
  378. df = self.rawserver.gethostbyaddr(connector.ip)
  379. def save_hostname(hostname_tuple):
  380. hostname, aliases, ips = hostname_tuple
  381. connector.hostname = hostname
  382. df.addCallback(save_hostname)
  383. df.addErrback(lambda fuckoff : None)
  384. def got_port(self, connector):
  385. if self.addcontact and connector.uses_dht and \
  386. connector.dht_port != None:
  387. self.addcontact(connector.connection.ip, connector.dht_port)
  388. def ever_got_incoming(self):
  389. return self.everinc
  390. def how_many_connections(self):
  391. return len(self.complete_connectors)
  392. def replace_connection(self):
  393. if self.closed:
  394. return
  395. while self.spares:
  396. k, v = self.spares.popitem()
  397. addr, id = k
  398. handler, a, kw = v
  399. started = self._start_connection(addr, id, handler, *a, **kw)
  400. if not started:
  401. # start_connection decided to push this connection back on to
  402. # self.spares because a limit was hit. break now or loop
  403. # forever
  404. break
  405. def throttle_connections(self):
  406. self.throttled = True
  407. for c in iter_rand_pos(self.connectors):
  408. c.connection.pause_reading()
  409. def unthrottle_connections(self):
  410. self.throttled = False
  411. for c in iter_rand_pos(self.connectors):
  412. c.connection.resume_reading()
  413. # arg. resume actually flushes the buffers in iocpreactor, so
  414. # we have to check the state constantly
  415. if self.throttled:
  416. break
  417. def close_connection(self, id):
  418. for c in self.connectors:
  419. if c.id == id and not c.closed:
  420. c.connection.close()
  421. c.closed = True
  422. def close_connections(self):
  423. self.rawserver.internet_watcher.remove_subscriber(self)
  424. self.closed = True
  425. pending = self.pending_connections.values()
  426. # drop connections which could be made after we're not interested
  427. for h, info in pending:
  428. h.connector.close()
  429. for c in self.connectors:
  430. if not c.closed:
  431. c.connection.close()
  432. c.closed = True
  433. def singleport_connection(self, connector):
  434. """hand-off from SingleportListener once the infohash is known and
  435. thus we can map a connection on to a particular Torrent."""
  436. if connector.ip in self.banned:
  437. return False
  438. m = self.config['max_allow_in']
  439. if (m and len(self.connectors) >= m and
  440. connector.ip not in self.tracker_ips):
  441. return False
  442. self._add_connection(connector)
  443. if self.closed:
  444. return False
  445. connector.set_parent(self)
  446. connector.connection.context = self.context
  447. return True
  448. def _add_connection(self, connector):
  449. self.connectors.add(connector)
  450. if self.closed:
  451. connector.connection.close()
  452. elif self.throttled:
  453. connector.connection.pause_reading()
  454. def ban(self, ip):
  455. self.banned.add(ip)
  456. def connection_lost(self, connector):
  457. assert isinstance(connector, Connector)
  458. self.connectors.remove(connector)
  459. if self.ratelimiter:
  460. self.ratelimiter.dequeue(connector)
  461. if connector.complete:
  462. self.connector_ips.remove(connector.ip)
  463. self.connector_ids.remove(connector.id)
  464. self.complete_connectors.remove(connector)
  465. self.choker.connection_lost(connector)
  466. class AnyportListener(Handler):
  467. def __init__(self, port, singleport):
  468. self.port = port
  469. self.singleport = singleport
  470. rawserver = singleport.rawserver
  471. s = rawserver.create_serversocket(port, config['bind'])
  472. rawserver.start_listening(s, self)
  473. def __getattr__(self, attr):
  474. return getattr(self.singleport, attr)
  475. class SingleportListener(Handler):
  476. """Manages a server socket common to all torrents. When a remote
  477. peer opens a connection to the local peer, the SingleportListener
  478. maps that peer on to the appropriate torrent's connection manager
  479. (see SingleportListener.select_torrent).
  480. See Connector which upcalls to select_torrent after the infohash is
  481. received in the opening handshake."""
  482. def __init__(self, rawserver, nattraverser, log_prefix,
  483. use_local_discovery):
  484. self.rawserver = rawserver
  485. self.nattraverser = nattraverser
  486. self.port = 0
  487. self.ports = {}
  488. self.port_change_notification = None
  489. self.torrents = {}
  490. self.connectors = set()
  491. self.infohash = None
  492. self.obfuscated_torrents = {}
  493. self.local_discovery = None
  494. self.ld_services = {}
  495. self.use_local_discovery = use_local_discovery
  496. self._creating_local_discovery = False
  497. self.log_prefix = log_prefix
  498. self.logger = logging.getLogger(self.log_prefix)
  499. def _close(self, port):
  500. serversocket = self.ports[port][0]
  501. if self.nattraverser:
  502. try:
  503. self.nattraverser.unregister_port(port, "TCP")
  504. except:
  505. # blanket, just incase - we don't want to interrupt things
  506. self.logger.warning("UPnP deregistration error",
  507. exc_info=sys.exc_info())
  508. self.rawserver.stop_listening(serversocket)
  509. serversocket.close()
  510. if self.local_discovery:
  511. self.local_discovery.stop()
  512. self.local_discovery = None
  513. def _check_close(self, port):
  514. if not port or self.port == port or len(self.ports[port][1]) > 0:
  515. return
  516. self._close(port)
  517. del self.ports[port]
  518. def open_port(self, port, config):
  519. """Starts BitTorrent running as a server on the specified port."""
  520. if port in self.ports:
  521. self.port = port
  522. return
  523. s = self.rawserver.create_serversocket(port, config['bind'])
  524. if self.nattraverser:
  525. try:
  526. d = self.nattraverser.register_port(port, port, "TCP",
  527. config['bind'],
  528. app_name)
  529. def change(*a):
  530. self.rawserver.external_add_task(0, self._change_port, *a)
  531. d.addCallback(change)
  532. def silent(*e):
  533. pass
  534. d.addErrback(silent)
  535. except:
  536. # blanket, just incase - we don't want to interrupt things
  537. self.logger.warning("UPnP registration error",
  538. exc_info=sys.exc_info())
  539. self.rawserver.start_listening(s, self)
  540. oldport = self.port
  541. self.port = port
  542. self.ports[port] = [s, {}]
  543. self._check_close(oldport)
  544. if self.local_discovery:
  545. self.local_discovery.stop()
  546. if self.use_local_discovery:
  547. self._create_local_discovery()
  548. def _create_local_discovery(self):
  549. assert self.use_local_discovery
  550. self._creating_local_discovery = True
  551. try:
  552. self.local_discovery = LocalDiscovery(self.rawserver, self.port,
  553. self._start_connection)
  554. self._creating_local_discovery = False
  555. except:
  556. self.rawserver.add_task(5, self._create_local_discovery)
  557. def _start_connection(self, addr, infohash):
  558. infohash = infohash.decode('hex')
  559. if infohash not in self.torrents:
  560. return
  561. connection_manager = self.torrents[infohash]
  562. # TODO: peer id?
  563. connection_manager.start_connection(addr, None)
  564. def _change_port(self, port):
  565. if self.port == port:
  566. return
  567. [serversocket, callbacks] = self.ports[self.port]
  568. self.ports[port] = [serversocket, callbacks]
  569. del self.ports[self.port]
  570. self.port = port
  571. for callback in callbacks:
  572. if callback:
  573. callback(port)
  574. def get_port(self, callback = None):
  575. if self.port:
  576. callbacks = self.ports[self.port][1]
  577. callbacks.setdefault(callback, 0)
  578. callbacks[callback] += 1
  579. return self.port
  580. def release_port(self, port, callback = None):
  581. callbacks = self.ports[port][1]
  582. callbacks[callback] -= 1
  583. if callbacks[callback] == 0:
  584. del callbacks[callback]
  585. self._check_close(port)
  586. def close_sockets(self):
  587. for port in self.ports.iterkeys():
  588. self._close(port)
  589. def add_torrent(self, infohash, connection_manager):
  590. if infohash in self.torrents:
  591. raise BTFailure(_("Can't start two separate instances of the same "
  592. "torrent"))
  593. self.torrents[infohash] = connection_manager
  594. key = sha('req2' + infohash).digest()
  595. self.obfuscated_torrents[key] = connection_manager
  596. if self.local_discovery:
  597. service = self.local_discovery.announce(infohash.encode('hex'),
  598. connection_manager.my_id.encode('hex'))
  599. self.ld_services[infohash] = service
  600. def remove_torrent(self, infohash):
  601. del self.torrents[infohash]
  602. del self.obfuscated_torrents[sha('req2' + infohash).digest()]
  603. if infohash in self.ld_services:
  604. service = self.ld_services.pop(infohash)
  605. if self.local_discovery:
  606. self.local_discovery.unannounce(service)
  607. def connection_made(self, connection):
  608. """Called when TCP connection has finished opening, but before
  609. BitTorrent protocol has begun."""
  610. if ONLY_LOCAL and connection.ip != '127.0.0.1' and not connection.ip.startswith("192.168") :
  611. return
  612. if GLOBAL_FILTER and not GLOBAL_FILTER(connection.ip, connection.port, "in"):
  613. return
  614. connector = Connector(self, connection, None, False,
  615. log_prefix=self.log_prefix)
  616. self.connectors.add(connector)
  617. def select_torrent(self, connector, infohash):
  618. """Called when infohash has been received allowing us to map
  619. the connection on to a given Torrent's ConnectionManager."""
  620. # call-up from Connector.
  621. if infohash in self.torrents:
  622. accepted = self.torrents[infohash].singleport_connection(connector)
  623. if not accepted:
  624. # the connection manager may refuse the connection, in which
  625. # case keep the connection in our list until it is dropped
  626. connector.close()
  627. else:
  628. # otherwise remove it
  629. self.connectors.remove(connector)
  630. def select_torrent_obfuscated(self, connector, streamid):
  631. if ONLY_LOCAL and connector.connection.ip != '127.0.0.1':
  632. return
  633. if streamid not in self.obfuscated_torrents:
  634. return
  635. self.obfuscated_torrents[streamid].singleport_connection(connector)
  636. def connection_lost(self, connector):
  637. assert isinstance(connector, Connector)
  638. self.connectors.remove(connector)
  639. def remove_addr_from_cache(self, addr):
  640. # since this was incoming, we don't cache the peer anyway
  641. pass