| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # Written by Bram Cohen, Uoti Urpala
- import array
- import random
- from BTL.sparse_set import SparseSet
- from BTL.obsoletepythonsupport import set
- from BitTorrent.Download import Download
- SPARSE_SET = True
- if SPARSE_SET:
- from BitTorrent.PieceSetBuckets import PieceSetBuckets
- else:
- from BitTorrent.PieceSetBuckets import SortedPieceBuckets, resolve_typecode
- RENDER = False
- class PerIPStats(object):
- def __init__(self):
- self.numgood = 0
- self.bad = {}
- self.numconnections = 0
- self.lastdownload = None
- self.peerid = None
- class MultiDownload(object):
- def __init__(self, config, storage, rm, urlage, picker, numpieces,
- finished, errorfunc, kickfunc, banfunc, get_downrate):
- self.config = config
- self.storage = storage
- self.rm = rm
- self.urlage = urlage
- self.picker = picker
- self.errorfunc = errorfunc
- self.rerequester = None
- self.entered_endgame = False
- self.connection_manager = None
- self.chunksize = config['download_chunk_size']
- self.numpieces = numpieces
- self.finished = finished
- self.snub_time = config['snub_time']
- self.kickfunc = kickfunc
- self.banfunc = banfunc
- self.get_downrate = get_downrate
- self.downloads = []
- self.perip = {}
- self.bad_peers = {}
- self.discarded_bytes = 0
- self.useful_received_listeners = set()
- self.raw_received_listeners = set()
-
- if SPARSE_SET:
- self.piece_states = PieceSetBuckets()
- nothing = SparseSet()
- nothing.add(0, self.numpieces)
- self.piece_states.buckets.append(nothing)
- # I hate this
- nowhere = [(i, 0) for i in xrange(self.numpieces)]
- self.piece_states.place_in_buckets = dict(nowhere)
- else:
- typecode = resolve_typecode(self.numpieces)
- self.piece_states = SortedPieceBuckets(typecode)
- nothing = array.array(typecode, range(self.numpieces))
- self.piece_states.buckets.append(nothing)
- # I hate this
- nowhere = [(i, (0, i)) for i in xrange(self.numpieces)]
- self.piece_states.place_in_buckets = dict(nowhere)
-
- self.last_update = 0
- self.all_requests = set()
- def attach_connection_manager(self, connection_manager):
- self.connection_manager = connection_manager
- def aggregate_piece_states(self):
- d = {}
- d['h'] = self.storage.have_set
- d['t'] = set(self.rm.active_requests.iterkeys())
- for i, bucket in enumerate(self.piece_states.buckets):
- d[i] = bucket
- r = (self.numpieces, self.last_update, d)
- return r
- def get_unchoked_seed_count(self):
- seed_count = 0
- for d in self.downloads:
- if d.have.numfalse == 0 and not d.choked:
- seed_count += 1
- return seed_count
- def get_adjusted_distributed_copies(self):
- # compensate for the fact that piece picker does no
- # contain all the pieces
- num = self.picker.get_distributed_copies()
- percent_have = (float(len(self.storage.have_set)) /
- float(self.numpieces))
- num += percent_have
- if self.rerequester and self.rerequester.tracker_num_seeds:
- num += self.rerequester.tracker_num_seeds
- return num
- def active_requests_add(self, r):
- self.last_update += 1
-
- def active_requests_remove(self, r):
- self.last_update += 1
- def got_have(self, piece):
- self.picker.got_have(piece)
- self.last_update += 1
- p = self.piece_states
- p.add(piece, p.remove(piece) + 1)
- def got_have_all(self):
- self.picker.got_have_all()
- self.last_update += 1
- self.piece_states.prepend_bucket()
- def lost_have(self, piece):
- self.picker.lost_have(piece)
- self.last_update += 1
- p = self.piece_states
- p.add(piece, p.remove(piece) - 1)
- def lost_have_all(self):
- self.picker.lost_have_all()
- self.last_update += 1
- self.piece_states.popleft_bucket()
- def check_enter_endgame(self):
- if not self.entered_endgame:
- if self.rm.endgame:
- self.entered_endgame = True
- self.all_requests = set()
- for d in self.downloads:
- self.all_requests.update(d.active_requests)
- for d in self.downloads:
- d.fix_download_endgame()
- def hashchecked(self, index):
- if not self.storage.do_I_have(index):
- if self.rm.endgame:
- while self.rm.want_requests(index):
- nb, nl = self.rm.new_request(index)
- self.all_requests.add((index, nb, nl))
- for d in self.downloads:
- d.fix_download_endgame()
- else:
- ds = [d for d in self.downloads if not d.choked]
- random.shuffle(ds)
- for d in ds:
- d._request_more([index])
- return
- self.picker.complete(index)
- self.active_requests_remove(index)
- self.connection_manager.hashcheck_succeeded(index)
- if self.storage.have.numfalse == 0:
- for d in self.downloads:
- if d.have.numfalse == 0:
- d.connector.close()
- self.finished()
- def make_download(self, connector):
- ip = connector.ip
- perip = self.perip.setdefault(ip, PerIPStats())
- perip.numconnections += 1
- d = Download(self, connector)
- d.add_useful_received_listener(self.fire_useful_received_listeners)
- d.add_raw_received_listener(self.fire_raw_received_listeners)
- perip.lastdownload = d
- perip.peerid = connector.id
- self.downloads.append(d)
- return d
- def add_useful_received_listener(self,listener):
- """Listeners are called for useful arrivals to any of the downloaders
- managed by this MultiDownload object.
- (see Download.add_useful_received_listener for which listeners are
- called for bytes received by that particular Download."""
- self.useful_received_listeners.add(listener)
- def add_raw_received_listener(self, listener):
- """Listers are called whenever bytes arrive (i.e., to Connector.data_came_in)
- regardless of whether those bytes are useful."""
- self.raw_received_listeners.add(listener)
- def remove_useful_received_listener(self,listener):
- self.useful_received_listeners.remove(listener)
- def fire_useful_received_listeners(self,bytes):
- for f in self.useful_received_listeners:
- f(bytes)
-
- def remove_raw_received_listener(self, listener):
- self.raw_received_listeners.remove(listener)
- def fire_raw_received_listeners(self,bytes):
- for f in self.raw_received_listeners:
- f(bytes)
- def lost_peer(self, download):
- if download.have.numfalse == 0:
- # lost seed...
- pass
- self.downloads.remove(download)
- ip = download.connector.ip
- self.perip[ip].numconnections -= 1
- if self.perip[ip].lastdownload == download:
- self.perip[ip].lastdownload = None
- def kick(self, download):
- download.connector.protocol_violation("peer sent bad data")
- if not self.config['retaliate_to_garbled_data']:
- return
- ip = download.connector.ip
- peerid = download.connector.id
- # kickfunc will schedule connection.close() to be executed later; we
- # might now be inside RawServer event loop with events from that
- # connection already queued, and trying to handle them after doing
- # close() now could cause problems.
- self.kickfunc(download.connector)
- def ban(self, ip):
- if not self.config['retaliate_to_garbled_data']:
- return
- self.banfunc(ip)
- self.bad_peers[ip] = (True, self.perip[ip])
|