| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # Written by Bram Cohen and Greg Hazel
- from __future__ import division
- import sys
- from BTL.platform import app_name
- from BTL.translation import _
- from BitTorrent import BTFailure
- from BTL.obsoletepythonsupport import *
- from BTL.hash import sha
- from BitTorrent.RawServer_twisted import Handler
- from BitTorrent.Connector import Connector
- from BitTorrent.HTTPConnector import HTTPConnector
- from BitTorrent.LocalDiscovery import LocalDiscovery
- from BitTorrent.InternetWatcher import InternetSubscriber
- from BTL.DictWithLists import DictWithInts, OrderedDict
- from BTL.platform import bttime
- from BTL.rand_tools import iter_rand_pos
- import random
- import logging
- import urlparse
- ONLY_LOCAL = False
- GLOBAL_FILTER = None
- def GLOBAL_FILTER(ip, port, direction=""):
- #print ip, direction
- return False
- GLOBAL_FILTER = None
- # header, reserved, download id, my id, [length, message]
- LOWER_BOUND = 1
- UPPER_BOUND = 120
- BUFFER = 1.2
- use_timeout_order = False
- timeout_order = [3, 15, 30]
- debug = False
- def set_timeout_metrics(delta):
- delta = max(delta, 0.0001)
- avg = ((timeout_order[0] / BUFFER) + delta) / 2
- avg *= BUFFER
- avg = max(LOWER_BOUND, avg)
- avg = min(UPPER_BOUND, avg)
- timeout_order[0] = avg
- timeout_order[2] = timeout_order[0] * 30
- timeout_order[1] = timeout_order[2] / 2
-
- class GaurdedInitialConnection(Handler):
- def __init__(self, parent, id, encrypt=False, log_prefix="", lan=False,
- urgent=False, timeout=None ):
- self.t = None
- self.id = id
- self.lan = lan
- self.parent = parent
- self.urgent = urgent
- self.timeout = timeout
- self.encrypt = encrypt
- self.connector = None
- self.log_prefix = log_prefix
- def _make_connector(self, s):
- addr = (s.ip, s.port)
- self.parent.cache_complete_peer(addr, self.id, type(self),
- encrypt=self.encrypt,
- urgent=self.urgent,
- lan=self.lan)
- return Connector(self.parent, s, self.id, True,
- obfuscate_outgoing=self.encrypt,
- log_prefix=self.log_prefix,
- lan=self.lan)
- def connection_starting(self, addr):
- self.start = bttime()
- self.t = self.parent.add_task(self.timeout,
- self.parent._cancel_connection, addr)
- def _abort_timeout(self):
- if self.t and self.t.active():
- self.t.cancel()
- self.t = None
-
- def connection_made(self, s):
- t = bttime() - self.start
- set_timeout_metrics(t)
- addr = (s.ip, s.port)
- if debug:
- self.parent.logger.warning('connection made: %s %s' %
- (addr, t))
- del self.parent.pending_connections[addr]
- self._abort_timeout()
- con = self._make_connector(s)
- self.parent._add_connection(con)
-
- # if the pending queue filled and put the remaining connections
- # into the spare list, this will push more connections in to pending
- self.parent.replace_connection()
-
- def connection_failed(self, s, exception):
- addr = (s.ip, s.port)
- if debug:
- self.parent.logger.warning('connection failed: %s %s' %
- (addr, exception.getErrorMessage()))
- if s.connector.wasPreempted():
- self.parent._resubmit_connection(addr)
- del self.parent.pending_connections[addr]
- self._abort_timeout()
- # only holepunch if this connection timed out entirely
- if self.timeout >= timeout_order[-1]:
- c = self.parent.find_connection_in_common(addr)
- if c:
- c.send_holepunch_request(addr)
- self.parent.replace_connection()
- class HTTPInitialConnection(GaurdedInitialConnection):
- def _make_connector(self, s):
- addr = (s.ip, s.port)
- self.parent.cache_complete_peer(addr, self.id, type(self),
- urgent=self.urgent)
- # ow!
- piece_size = self.parent.downloader.storage.piece_size
- urlage = self.parent.downloader.urlage
- return HTTPConnector(self.parent, piece_size, urlage, s, self.id, True,
- log_prefix=self.log_prefix)
-
- class ConnectionManager(InternetSubscriber):
- def __init__(self, make_upload, downloader, choker,
- numpieces, ratelimiter,
- rawserver, config, private, my_id, add_task, infohash, context,
- addcontactfunc, reported_port, tracker_ips, log_prefix ):
- """
- @param downloader: MultiDownload for this torrent.
- @param my_id: my peer id.
- @param tracker_ips: list of tracker ip addresses.
- ConnectionManager does not drop connections from the tracker.
- This allows trackers to perform NAT checks even when there
- are max_allow_in connections.
- @param log_prefix: string used as the prefix for all
- log entries generated by the ConnectionManager and its
- created Connectors.
- """
- self.make_upload = make_upload
- self.downloader = downloader
- self.choker = choker
- # aaargh
- self.piece_size = downloader.storage.piece_size
- self.numpieces = numpieces
- self.ratelimiter = ratelimiter
- self.rawserver = rawserver
- self.my_id = my_id
- self.private = private
- self.config = config
- self.add_task = add_task
- self.infohash = infohash
- self.context = context
- self.addcontact = addcontactfunc
- self.reported_port = reported_port
- self.everinc = False
- self.tracker_ips = tracker_ips
- self.log_prefix = log_prefix
- self.logger = logging.getLogger(self.log_prefix)
- self.closed = False
- # submitted
- self.pending_connections = {}
- # transport connected
- self.connectors = set()
- # protocol active
- # we do a lot of itterating and few mutations, so use a list
- self.complete_connectors = [] # set()
- # use a dict for a little semi-randomness
- self.spares = {} # OrderedDict()
- self.cached_peers = OrderedDict()
- self.cache_limit = 300
- self.connector_ips = DictWithInts()
- self.connector_ids = DictWithInts()
- self.banned = set()
- self._ka_task = self.add_task(config['keepalive_interval'],
- self.send_keepalives)
- self._pex_task = None
- if not self.private:
- self._pex_task = self.add_task(config['pex_interval'],
- self.send_pex)
- self.reopen(reported_port)
- def cleanup(self):
- if not self.closed:
- self.close_connections()
- del self.context
- self.cached_peers.clear()
- if self._ka_task.active():
- self._ka_task.cancel()
- if self._pex_task and self._pex_task.active():
- self._pex_task.cancel()
- def reopen(self, port):
- self.closed = False
- self.reported_port = port
- self.unthrottle_connections()
- for addr in self.cached_peers:
- self._fire_cached_connection(addr)
- self.rawserver.internet_watcher.add_subscriber(self)
- def internet_active(self):
- for addr in self.cached_peers.iterkeys():
- self._fire_cached_connection(addr)
- def remove_addr_from_cache(self, addr):
- # could have been an incoming connection
- # or could have been dropped by the cache limit
- if addr in self.cached_peers:
- del self.cached_peers[addr]
- def try_one_connection(self):
- keys = self.cached_peers.keys()
- if not keys:
- return False
- addr = random.choice(keys)
- self._fire_cached_connection(addr)
- return True
- def _fire_cached_connection(self, addr):
- v = self.cached_peers[addr]
- complete, (id, handler, a, kw) = v
- return self._start_connection(addr, id, handler, *a, **kw)
- def cache_complete_peer(self, addr, pid, handler, *a, **kw):
- self.cache_peer(addr, pid, handler, 1, *a, **kw)
- def cache_incomplete_peer(self, addr, pid, handler, *a, **kw):
- self.cache_peer(addr, pid, handler, 0, *a, **kw)
- def cache_peer(self, addr, pid, handler, complete, *a, **kw):
- # obey the cache size limit
- if (addr not in self.cached_peers and
- len(self.cached_peers) >= self.cache_limit):
- for k, v in self.cached_peers.iteritems():
- if not v[0]:
- del self.cached_peers[k]
- break
- else:
- # cache full of completes, delete a random peer.
- # yes, this can cache an incomplete when the cache is full of
- # completes, but only 1 because of the filter above.
- oldaddr = self.cached_peers.keys()[0]
- del self.cached_peers[oldaddr]
- elif not complete:
- if addr in self.cached_peers and self.cached_peers[addr][0]:
- # don't overwrite a complete with an incomplete.
- return
- self.cached_peers[addr] = (complete, (pid, handler, a, kw))
- def send_keepalives(self):
- self._ka_task = self.add_task(self.config['keepalive_interval'],
- self.send_keepalives)
- for c in self.complete_connectors:
- c.send_keepalive()
- def send_pex(self):
- self._pex_task = self.add_task(self.config['pex_interval'],
- self.send_pex)
- pex_set = set()
- for c in self.complete_connectors:
- if c.listening_port:
- pex_set.add((c.ip, c.listening_port))
- for c in self.complete_connectors:
- c.send_pex(pex_set)
- def hashcheck_succeeded(self, i):
- for c in self.complete_connectors:
- # should we send a have message if peer already has the piece?
- # yes! it is low bandwidth and useful for that peer.
- c.send_have(i)
- def find_connection_in_common(self, addr):
- for c in self.complete_connectors:
- if addr in c.remote_pex_set:
- return c
-
- # returns False if the connection info has been pushed on to self.spares
- # other filters and a successful connection return True
- def start_connection(self, addr, id=None, encrypt=False, lan=False):
- """@param addr: domain name/ip address and port pair.
- @param id: peer id.
- """
- return self._start_connection(addr, id, GaurdedInitialConnection,
- encrypt=encrypt,
- lan=lan)
-
- def start_http_connection(self, url):
- r = urlparse.urlparse(url)
- host = r[1]
- if ':' in host:
- host, port = host.split(':')
- port = int(port)
- else:
- port = 80
- df = self.rawserver.gethostbyname(host)
- df.addCallback(self._connect_http, port, url)
- df.addLogback(self.logger.warning, "Resolve failed")
- def _connect_http(self, ip, port, url):
- self._start_connection((ip, port), url,
- HTTPInitialConnection, urgent=True)
- def _start_connection(self, addr, pid, handler, *a, **kw):
- """@param addr: domain name/ip address and port pair.
- @param pid: peer id.
- """
- if self.closed:
- return True
- if addr[0] in self.banned:
- return True
- if pid == self.my_id:
- return True
- for v in self.connectors:
- if pid and v.id == pid:
- return True
- if self.config['one_connection_per_ip'] and v.ip == addr[0]:
- return True
- total_outstanding = len(self.connectors)
- # it's possible the pending connections could eventually complete,
- # so we have to account for those when enforcing max_initiate
- total_outstanding += len(self.pending_connections)
-
- if total_outstanding >= self.config['max_initiate']:
- self.spares[(addr, pid)] = (handler, a, kw)
- return False
- # if these fail, I'm getting a very weird addr object
- assert isinstance(addr, tuple)
- assert isinstance(addr[0], str)
- assert isinstance(addr[1], int)
- if ONLY_LOCAL and addr[0] != "127.0.0.1" and not addr[0].startswith("192.168") and addr[1] != 80:
- return True
- if GLOBAL_FILTER and not GLOBAL_FILTER(addr[0], addr[1], "out"):
- return True
- if addr not in self.cached_peers:
- self.cache_incomplete_peer(addr, pid, handler, *a, **kw)
- # sometimes we try to connect to a peer we're already trying to
- # connect to
- #assert addr not in self.pending_connections
- if addr in self.pending_connections:
- return True
- kw['log_prefix'] = self.log_prefix
- timeout = 30
- if use_timeout_order:
- timeout = timeout_order[0]
- kw.setdefault('timeout', timeout)
- h = handler(self, pid, *a, **kw)
- self.pending_connections[addr] = (h, (addr, pid, handler, a, kw))
- urgent = kw.pop('urgent', False)
- connector = self.rawserver.start_connection(addr, h, self.context,
- # we'll handle timeouts.
- # not so fond of this.
- timeout=None,
- urgent=urgent)
- h.connector = connector
- return True
- def _resubmit_connection(self, addr):
- # we leave it on pending_connections.
- # so the standard connection_failed handling occurs.
- h, info = self.pending_connections[addr]
- addr, pid, handler, a, kw = info
- self.spares[(addr, pid)] = (handler, a, kw)
- def _cancel_connection(self, addr):
- if addr not in self.pending_connections:
- # already made
- return
- # we leave it on pending_connections.
- # so the standard connection_failed handling occurs.
- h, info = self.pending_connections[addr]
- addr, pid, handler, a, kw = info
- if use_timeout_order and h.timeout < timeout_order[-1]:
- for t in timeout_order:
- if t > h.timeout:
- h.timeout = t
- break
- else:
- h.timeout = timeout_order[-1]
- # this feels odd
- kw['timeout'] = h.timeout
- self.spares[(addr, pid)] = (handler, a, kw)
- # do this last, since twisted might fire the event handler from inside
- # the function
- # HMM:
- # should be stopConnecting, but I've seen this fail.
- # close does the same thing, but disconnects in the case where the
- # connection was made. Not sure how that occurs without add being in
- # self.pending_connections
- # Maybe this was fixed recently in CRLR.
- #h.connector.stopConnecting()
- h.connector.close()
- def connection_handshake_completed(self, connector):
- self.connector_ips.add(connector.ip)
- self.connector_ids.add(connector.id)
- self.complete_connectors.append(connector)
- connector.upload = self.make_upload(connector)
- connector.download = self.downloader.make_download(connector)
- self.choker.connection_made(connector)
- if connector.uses_dht:
- connector.send_port(self.reported_port)
- if self.config['resolve_hostnames']:
- df = self.rawserver.gethostbyaddr(connector.ip)
- def save_hostname(hostname_tuple):
- hostname, aliases, ips = hostname_tuple
- connector.hostname = hostname
- df.addCallback(save_hostname)
- df.addErrback(lambda fuckoff : None)
- def got_port(self, connector):
- if self.addcontact and connector.uses_dht and \
- connector.dht_port != None:
- self.addcontact(connector.connection.ip, connector.dht_port)
- def ever_got_incoming(self):
- return self.everinc
- def how_many_connections(self):
- return len(self.complete_connectors)
- def replace_connection(self):
- if self.closed:
- return
- while self.spares:
- k, v = self.spares.popitem()
- addr, id = k
- handler, a, kw = v
- started = self._start_connection(addr, id, handler, *a, **kw)
- if not started:
- # start_connection decided to push this connection back on to
- # self.spares because a limit was hit. break now or loop
- # forever
- break
- def throttle_connections(self):
- self.throttled = True
- for c in iter_rand_pos(self.connectors):
- c.connection.pause_reading()
- def unthrottle_connections(self):
- self.throttled = False
- for c in iter_rand_pos(self.connectors):
- c.connection.resume_reading()
- # arg. resume actually flushes the buffers in iocpreactor, so
- # we have to check the state constantly
- if self.throttled:
- break
- def close_connection(self, id):
- for c in self.connectors:
- if c.id == id and not c.closed:
- c.connection.close()
- c.closed = True
- def close_connections(self):
- self.rawserver.internet_watcher.remove_subscriber(self)
- self.closed = True
- pending = self.pending_connections.values()
- # drop connections which could be made after we're not interested
- for h, info in pending:
- h.connector.close()
-
- for c in self.connectors:
- if not c.closed:
- c.connection.close()
- c.closed = True
- def singleport_connection(self, connector):
- """hand-off from SingleportListener once the infohash is known and
- thus we can map a connection on to a particular Torrent."""
-
- if connector.ip in self.banned:
- return False
- m = self.config['max_allow_in']
- if (m and len(self.connectors) >= m and
- connector.ip not in self.tracker_ips):
- return False
- self._add_connection(connector)
- if self.closed:
- return False
- connector.set_parent(self)
- connector.connection.context = self.context
- return True
- def _add_connection(self, connector):
- self.connectors.add(connector)
- if self.closed:
- connector.connection.close()
- elif self.throttled:
- connector.connection.pause_reading()
- def ban(self, ip):
- self.banned.add(ip)
- def connection_lost(self, connector):
- assert isinstance(connector, Connector)
- self.connectors.remove(connector)
- if self.ratelimiter:
- self.ratelimiter.dequeue(connector)
- if connector.complete:
- self.connector_ips.remove(connector.ip)
- self.connector_ids.remove(connector.id)
-
- self.complete_connectors.remove(connector)
- self.choker.connection_lost(connector)
- class AnyportListener(Handler):
- def __init__(self, port, singleport):
- self.port = port
- self.singleport = singleport
- rawserver = singleport.rawserver
- s = rawserver.create_serversocket(port, config['bind'])
- rawserver.start_listening(s, self)
- def __getattr__(self, attr):
- return getattr(self.singleport, attr)
- class SingleportListener(Handler):
- """Manages a server socket common to all torrents. When a remote
- peer opens a connection to the local peer, the SingleportListener
- maps that peer on to the appropriate torrent's connection manager
- (see SingleportListener.select_torrent).
- See Connector which upcalls to select_torrent after the infohash is
- received in the opening handshake."""
- def __init__(self, rawserver, nattraverser, log_prefix,
- use_local_discovery):
- self.rawserver = rawserver
- self.nattraverser = nattraverser
- self.port = 0
- self.ports = {}
- self.port_change_notification = None
- self.torrents = {}
- self.connectors = set()
- self.infohash = None
- self.obfuscated_torrents = {}
- self.local_discovery = None
- self.ld_services = {}
- self.use_local_discovery = use_local_discovery
- self._creating_local_discovery = False
- self.log_prefix = log_prefix
- self.logger = logging.getLogger(self.log_prefix)
- def _close(self, port):
- serversocket = self.ports[port][0]
- if self.nattraverser:
- try:
- self.nattraverser.unregister_port(port, "TCP")
- except:
- # blanket, just incase - we don't want to interrupt things
- self.logger.warning("UPnP deregistration error",
- exc_info=sys.exc_info())
- self.rawserver.stop_listening(serversocket)
- serversocket.close()
- if self.local_discovery:
- self.local_discovery.stop()
- self.local_discovery = None
- def _check_close(self, port):
- if not port or self.port == port or len(self.ports[port][1]) > 0:
- return
- self._close(port)
- del self.ports[port]
- def open_port(self, port, config):
- """Starts BitTorrent running as a server on the specified port."""
- if port in self.ports:
- self.port = port
- return
- s = self.rawserver.create_serversocket(port, config['bind'])
- if self.nattraverser:
- try:
- d = self.nattraverser.register_port(port, port, "TCP",
- config['bind'],
- app_name)
- def change(*a):
- self.rawserver.external_add_task(0, self._change_port, *a)
- d.addCallback(change)
- def silent(*e):
- pass
- d.addErrback(silent)
- except:
- # blanket, just incase - we don't want to interrupt things
- self.logger.warning("UPnP registration error",
- exc_info=sys.exc_info())
- self.rawserver.start_listening(s, self)
- oldport = self.port
- self.port = port
- self.ports[port] = [s, {}]
- self._check_close(oldport)
- if self.local_discovery:
- self.local_discovery.stop()
- if self.use_local_discovery:
- self._create_local_discovery()
- def _create_local_discovery(self):
- assert self.use_local_discovery
- self._creating_local_discovery = True
- try:
- self.local_discovery = LocalDiscovery(self.rawserver, self.port,
- self._start_connection)
- self._creating_local_discovery = False
- except:
- self.rawserver.add_task(5, self._create_local_discovery)
- def _start_connection(self, addr, infohash):
- infohash = infohash.decode('hex')
- if infohash not in self.torrents:
- return
- connection_manager = self.torrents[infohash]
- # TODO: peer id?
- connection_manager.start_connection(addr, None)
-
- def _change_port(self, port):
- if self.port == port:
- return
- [serversocket, callbacks] = self.ports[self.port]
- self.ports[port] = [serversocket, callbacks]
- del self.ports[self.port]
- self.port = port
- for callback in callbacks:
- if callback:
- callback(port)
- def get_port(self, callback = None):
- if self.port:
- callbacks = self.ports[self.port][1]
- callbacks.setdefault(callback, 0)
- callbacks[callback] += 1
- return self.port
- def release_port(self, port, callback = None):
- callbacks = self.ports[port][1]
- callbacks[callback] -= 1
- if callbacks[callback] == 0:
- del callbacks[callback]
- self._check_close(port)
- def close_sockets(self):
- for port in self.ports.iterkeys():
- self._close(port)
- def add_torrent(self, infohash, connection_manager):
- if infohash in self.torrents:
- raise BTFailure(_("Can't start two separate instances of the same "
- "torrent"))
- self.torrents[infohash] = connection_manager
- key = sha('req2' + infohash).digest()
- self.obfuscated_torrents[key] = connection_manager
- if self.local_discovery:
- service = self.local_discovery.announce(infohash.encode('hex'),
- connection_manager.my_id.encode('hex'))
- self.ld_services[infohash] = service
- def remove_torrent(self, infohash):
- del self.torrents[infohash]
- del self.obfuscated_torrents[sha('req2' + infohash).digest()]
- if infohash in self.ld_services:
- service = self.ld_services.pop(infohash)
- if self.local_discovery:
- self.local_discovery.unannounce(service)
- def connection_made(self, connection):
- """Called when TCP connection has finished opening, but before
- BitTorrent protocol has begun."""
- if ONLY_LOCAL and connection.ip != '127.0.0.1' and not connection.ip.startswith("192.168") :
- return
- if GLOBAL_FILTER and not GLOBAL_FILTER(connection.ip, connection.port, "in"):
- return
- connector = Connector(self, connection, None, False,
- log_prefix=self.log_prefix)
- self.connectors.add(connector)
- def select_torrent(self, connector, infohash):
- """Called when infohash has been received allowing us to map
- the connection on to a given Torrent's ConnectionManager."""
- # call-up from Connector.
- if infohash in self.torrents:
- accepted = self.torrents[infohash].singleport_connection(connector)
- if not accepted:
- # the connection manager may refuse the connection, in which
- # case keep the connection in our list until it is dropped
- connector.close()
- else:
- # otherwise remove it
- self.connectors.remove(connector)
- def select_torrent_obfuscated(self, connector, streamid):
- if ONLY_LOCAL and connector.connection.ip != '127.0.0.1':
- return
- if streamid not in self.obfuscated_torrents:
- return
- self.obfuscated_torrents[streamid].singleport_connection(connector)
- def connection_lost(self, connector):
- assert isinstance(connector, Connector)
- self.connectors.remove(connector)
- def remove_addr_from_cache(self, addr):
- # since this was incoming, we don't cache the peer anyway
- pass
-
|