| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- # Written by Bram Cohen
- # see LICENSE.txt for license information
- from BitTornado.CurrentRateMeasure import Measure
- from BitTornado.bitfield import Bitfield
- from random import shuffle
- from BitTornado.clock import clock
- try:
- True
- except:
- True = 1
- False = 0
- EXPIRE_TIME = 60 * 60
- class PerIPStats:
- def __init__(self, ip):
- self.numgood = 0
- self.bad = {}
- self.numconnections = 0
- self.lastdownload = None
- self.peerid = None
- class BadDataGuard:
- def __init__(self, download):
- self.download = download
- self.ip = download.ip
- self.downloader = download.downloader
- self.stats = self.downloader.perip[self.ip]
- self.lastindex = None
- def failed(self, index, bump = False):
- self.stats.bad.setdefault(index, 0)
- self.downloader.gotbaddata[self.ip] = 1
- self.stats.bad[index] += 1
- if len(self.stats.bad) > 1:
- if self.download is not None:
- self.downloader.try_kick(self.download)
- elif self.stats.numconnections == 1 and self.stats.lastdownload is not None:
- self.downloader.try_kick(self.stats.lastdownload)
- if len(self.stats.bad) >= 3 and len(self.stats.bad) > int(self.stats.numgood/30):
- self.downloader.try_ban(self.ip)
- elif bump:
- self.downloader.picker.bump(index)
- def good(self, index):
- # lastindex is a hack to only increase numgood by one for each good
- # piece, however many chunks come from the connection(s) from this IP
- if index != self.lastindex:
- self.stats.numgood += 1
- self.lastindex = index
- class SingleDownload:
- def __init__(self, downloader, connection):
- self.downloader = downloader
- self.connection = connection
- self.choked = True
- self.interested = False
- self.active_requests = []
- self.measure = Measure(downloader.max_rate_period)
- self.peermeasure = Measure(downloader.max_rate_period)
- self.have = Bitfield(downloader.numpieces)
- self.last = -1000
- self.last2 = -1000
- self.example_interest = None
- self.backlog = 2
- self.ip = connection.get_ip()
- self.guard = BadDataGuard(self)
- def _backlog(self, just_unchoked):
- self.backlog = min(
- 2+int(4*self.measure.get_rate()/self.downloader.chunksize),
- (2*just_unchoked)+self.downloader.queue_limit() )
- if self.backlog > 50:
- self.backlog = max(50, self.backlog * 0.075)
- return self.backlog
-
- def disconnected(self):
- self.downloader.lost_peer(self)
- if self.have.complete():
- self.downloader.picker.lost_seed()
- else:
- for i in xrange(len(self.have)):
- if self.have[i]:
- self.downloader.picker.lost_have(i)
- if self.have.complete() and self.downloader.storage.is_endgame():
- self.downloader.add_disconnected_seed(self.connection.get_readable_id())
- self._letgo()
- self.guard.download = None
- def _letgo(self):
- if self.downloader.queued_out.has_key(self):
- del self.downloader.queued_out[self]
- if not self.active_requests:
- return
- if self.downloader.endgamemode:
- self.active_requests = []
- return
- lost = {}
- for index, begin, length in self.active_requests:
- self.downloader.storage.request_lost(index, begin, length)
- lost[index] = 1
- lost = lost.keys()
- self.active_requests = []
- if self.downloader.paused:
- return
- ds = [d for d in self.downloader.downloads if not d.choked]
- shuffle(ds)
- for d in ds:
- d._request_more()
- for d in self.downloader.downloads:
- if d.choked and not d.interested:
- for l in lost:
- if d.have[l] and self.downloader.storage.do_I_have_requests(l):
- d.send_interested()
- break
- def got_choke(self):
- if not self.choked:
- self.choked = True
- self._letgo()
- def got_unchoke(self):
- if self.choked:
- self.choked = False
- if self.interested:
- self._request_more(new_unchoke = True)
- self.last2 = clock()
- def is_choked(self):
- return self.choked
- def is_interested(self):
- return self.interested
- def send_interested(self):
- if not self.interested:
- self.interested = True
- self.connection.send_interested()
- if not self.choked:
- self.last2 = clock()
- def send_not_interested(self):
- if self.interested:
- self.interested = False
- self.connection.send_not_interested()
- def got_piece(self, index, begin, piece):
- length = len(piece)
- try:
- self.active_requests.remove((index, begin, length))
- except ValueError:
- self.downloader.discarded += length
- return False
- if self.downloader.endgamemode:
- self.downloader.all_requests.remove((index, begin, length))
- self.last = clock()
- self.last2 = clock()
- self.measure.update_rate(length)
- self.downloader.measurefunc(length)
- if not self.downloader.storage.piece_came_in(index, begin, piece, self.guard):
- self.downloader.piece_flunked(index)
- return False
- if self.downloader.storage.do_I_have(index):
- self.downloader.picker.complete(index)
- if self.downloader.endgamemode:
- for d in self.downloader.downloads:
- if d is not self:
- if d.interested:
- if d.choked:
- assert not d.active_requests
- d.fix_download_endgame()
- else:
- try:
- d.active_requests.remove((index, begin, length))
- except ValueError:
- continue
- d.connection.send_cancel(index, begin, length)
- d.fix_download_endgame()
- else:
- assert not d.active_requests
- self._request_more()
- self.downloader.check_complete(index)
- return self.downloader.storage.do_I_have(index)
- def _request_more(self, new_unchoke = False):
- assert not self.choked
- if self.downloader.endgamemode:
- self.fix_download_endgame(new_unchoke)
- return
- if self.downloader.paused:
- return
- if len(self.active_requests) >= self._backlog(new_unchoke):
- if not (self.active_requests or self.backlog):
- self.downloader.queued_out[self] = 1
- return
- lost_interests = []
- while len(self.active_requests) < self.backlog:
- interest = self.downloader.picker.next(self.have,
- self.downloader.storage.do_I_have_requests,
- self.downloader.too_many_partials())
- if interest is None:
- break
- self.example_interest = interest
- self.send_interested()
- loop = True
- while len(self.active_requests) < self.backlog and loop:
- begin, length = self.downloader.storage.new_request(interest)
- self.downloader.picker.requested(interest)
- self.active_requests.append((interest, begin, length))
- self.connection.send_request(interest, begin, length)
- self.downloader.chunk_requested(length)
- if not self.downloader.storage.do_I_have_requests(interest):
- loop = False
- lost_interests.append(interest)
- if not self.active_requests:
- self.send_not_interested()
- if lost_interests:
- for d in self.downloader.downloads:
- if d.active_requests or not d.interested:
- continue
- if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
- continue
- for lost in lost_interests:
- if d.have[lost]:
- break
- else:
- continue
- interest = self.downloader.picker.next(d.have,
- self.downloader.storage.do_I_have_requests,
- self.downloader.too_many_partials())
- if interest is None:
- d.send_not_interested()
- else:
- d.example_interest = interest
- if self.downloader.storage.is_endgame():
- self.downloader.start_endgame()
- def fix_download_endgame(self, new_unchoke = False):
- if self.downloader.paused:
- return
- if len(self.active_requests) >= self._backlog(new_unchoke):
- if not (self.active_requests or self.backlog) and not self.choked:
- self.downloader.queued_out[self] = 1
- return
- want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
- if not (self.active_requests or want):
- self.send_not_interested()
- return
- if want:
- self.send_interested()
- if self.choked:
- return
- shuffle(want)
- del want[self.backlog - len(self.active_requests):]
- self.active_requests.extend(want)
- for piece, begin, length in want:
- self.connection.send_request(piece, begin, length)
- self.downloader.chunk_requested(length)
- def got_have(self, index):
- if index == self.downloader.numpieces-1:
- self.downloader.totalmeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
- self.peermeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
- else:
- self.downloader.totalmeasure.update_rate(self.downloader.storage.piece_length)
- self.peermeasure.update_rate(self.downloader.storage.piece_length)
- if not self.have[index]:
- self.have[index] = True
- self.downloader.picker.got_have(index)
- if self.have.complete():
- self.downloader.picker.became_seed()
- if self.downloader.storage.am_I_complete():
- self.downloader.add_disconnected_seed(self.connection.get_readable_id())
- self.connection.close()
- elif self.downloader.endgamemode:
- self.fix_download_endgame()
- elif ( not self.downloader.paused
- and not self.downloader.picker.is_blocked(index)
- and self.downloader.storage.do_I_have_requests(index) ):
- if not self.choked:
- self._request_more()
- else:
- self.send_interested()
- return self.have.complete()
- def _check_interests(self):
- if self.interested or self.downloader.paused:
- return
- for i in xrange(len(self.have)):
- if ( self.have[i] and not self.downloader.picker.is_blocked(i)
- and ( self.downloader.endgamemode
- or self.downloader.storage.do_I_have_requests(i) ) ):
- self.send_interested()
- return
- def got_have_bitfield(self, have):
- if self.downloader.storage.am_I_complete() and have.complete():
- if self.downloader.super_seeding:
- self.connection.send_bitfield(have.tostring()) # be nice, show you're a seed too
- self.connection.close()
- self.downloader.add_disconnected_seed(self.connection.get_readable_id())
- return False
- self.have = have
- if have.complete():
- self.downloader.picker.got_seed()
- else:
- for i in xrange(len(have)):
- if have[i]:
- self.downloader.picker.got_have(i)
- if self.downloader.endgamemode and not self.downloader.paused:
- for piece, begin, length in self.downloader.all_requests:
- if self.have[piece]:
- self.send_interested()
- break
- else:
- self._check_interests()
- return have.complete()
- def get_rate(self):
- return self.measure.get_rate()
- def is_snubbed(self):
- if ( self.interested and not self.choked
- and clock() - self.last2 > self.downloader.snub_time ):
- for index, begin, length in self.active_requests:
- self.connection.send_cancel(index, begin, length)
- self.got_choke() # treat it just like a choke
- return clock() - self.last > self.downloader.snub_time
- class Downloader:
- def __init__(self, storage, picker, backlog, max_rate_period,
- numpieces, chunksize, measurefunc, snub_time,
- kickbans_ok, kickfunc, banfunc):
- self.storage = storage
- self.picker = picker
- self.backlog = backlog
- self.max_rate_period = max_rate_period
- self.measurefunc = measurefunc
- self.totalmeasure = Measure(max_rate_period*storage.piece_length/storage.request_size)
- self.numpieces = numpieces
- self.chunksize = chunksize
- self.snub_time = snub_time
- self.kickfunc = kickfunc
- self.banfunc = banfunc
- self.disconnectedseeds = {}
- self.downloads = []
- self.perip = {}
- self.gotbaddata = {}
- self.kicked = {}
- self.banned = {}
- self.kickbans_ok = kickbans_ok
- self.kickbans_halted = False
- self.super_seeding = False
- self.endgamemode = False
- self.endgame_queued_pieces = []
- self.all_requests = []
- self.discarded = 0L
- # self.download_rate = 25000 # 25K/s test rate
- self.download_rate = 0
- self.bytes_requested = 0
- self.last_time = clock()
- self.queued_out = {}
- self.requeueing = False
- self.paused = False
- def set_download_rate(self, rate):
- self.download_rate = rate * 1000
- self.bytes_requested = 0
- def queue_limit(self):
- if not self.download_rate:
- return 10e10 # that's a big queue!
- t = clock()
- self.bytes_requested -= (t - self.last_time) * self.download_rate
- self.last_time = t
- if not self.requeueing and self.queued_out and self.bytes_requested < 0:
- self.requeueing = True
- q = self.queued_out.keys()
- shuffle(q)
- self.queued_out = {}
- for d in q:
- d._request_more()
- self.requeueing = False
- if -self.bytes_requested > 5*self.download_rate:
- self.bytes_requested = -5*self.download_rate
- return max(int(-self.bytes_requested/self.chunksize),0)
- def chunk_requested(self, size):
- self.bytes_requested += size
- external_data_received = chunk_requested
- def make_download(self, connection):
- ip = connection.get_ip()
- if self.perip.has_key(ip):
- perip = self.perip[ip]
- else:
- perip = self.perip.setdefault(ip, PerIPStats(ip))
- perip.peerid = connection.get_readable_id()
- perip.numconnections += 1
- d = SingleDownload(self, connection)
- perip.lastdownload = d
- self.downloads.append(d)
- return d
- def piece_flunked(self, index):
- if self.paused:
- return
- if self.endgamemode:
- if self.downloads:
- while self.storage.do_I_have_requests(index):
- nb, nl = self.storage.new_request(index)
- self.all_requests.append((index, nb, nl))
- for d in self.downloads:
- d.fix_download_endgame()
- return
- self._reset_endgame()
- return
- ds = [d for d in self.downloads if not d.choked]
- shuffle(ds)
- for d in ds:
- d._request_more()
- ds = [d for d in self.downloads if not d.interested and d.have[index]]
- for d in ds:
- d.example_interest = index
- d.send_interested()
- def has_downloaders(self):
- return len(self.downloads)
- def lost_peer(self, download):
- ip = download.ip
- self.perip[ip].numconnections -= 1
- if self.perip[ip].lastdownload == download:
- self.perip[ip].lastdownload = None
- self.downloads.remove(download)
- if self.endgamemode and not self.downloads: # all peers gone
- self._reset_endgame()
- def _reset_endgame(self):
- self.storage.reset_endgame(self.all_requests)
- self.endgamemode = False
- self.all_requests = []
- self.endgame_queued_pieces = []
- def add_disconnected_seed(self, id):
- # if not self.disconnectedseeds.has_key(id):
- # self.picker.seed_seen_recently()
- self.disconnectedseeds[id]=clock()
- # def expire_disconnected_seeds(self):
- def num_disconnected_seeds(self):
- # first expire old ones
- expired = []
- for id,t in self.disconnectedseeds.items():
- if clock() - t > EXPIRE_TIME: #Expire old seeds after so long
- expired.append(id)
- for id in expired:
- # self.picker.seed_disappeared()
- del self.disconnectedseeds[id]
- return len(self.disconnectedseeds)
- # if this isn't called by a stats-gathering function
- # it should be scheduled to run every minute or two.
- def _check_kicks_ok(self):
- if len(self.gotbaddata) > 10:
- self.kickbans_ok = False
- self.kickbans_halted = True
- return self.kickbans_ok and len(self.downloads) > 2
- def try_kick(self, download):
- if self._check_kicks_ok():
- download.guard.download = None
- ip = download.ip
- id = download.connection.get_readable_id()
- self.kicked[ip] = id
- self.perip[ip].peerid = id
- self.kickfunc(download.connection)
-
- def try_ban(self, ip):
- if self._check_kicks_ok():
- self.banfunc(ip)
- self.banned[ip] = self.perip[ip].peerid
- if self.kicked.has_key(ip):
- del self.kicked[ip]
- def set_super_seed(self):
- self.super_seeding = True
- def check_complete(self, index):
- if self.endgamemode and not self.all_requests:
- self.endgamemode = False
- if self.endgame_queued_pieces and not self.endgamemode:
- self.requeue_piece_download()
- if self.storage.am_I_complete():
- assert not self.all_requests
- assert not self.endgamemode
- for d in [i for i in self.downloads if i.have.complete()]:
- d.connection.send_have(index) # be nice, tell the other seed you completed
- self.add_disconnected_seed(d.connection.get_readable_id())
- d.connection.close()
- return True
- return False
- def too_many_partials(self):
- return len(self.storage.dirty) > (len(self.downloads)/2)
- def cancel_piece_download(self, pieces):
- if self.endgamemode:
- if self.endgame_queued_pieces:
- for piece in pieces:
- try:
- self.endgame_queued_pieces.remove(piece)
- except:
- pass
- new_all_requests = []
- for index, nb, nl in self.all_requests:
- if index in pieces:
- self.storage.request_lost(index, nb, nl)
- else:
- new_all_requests.append((index, nb, nl))
- self.all_requests = new_all_requests
- for d in self.downloads:
- hit = False
- for index, nb, nl in d.active_requests:
- if index in pieces:
- hit = True
- d.connection.send_cancel(index, nb, nl)
- if not self.endgamemode:
- self.storage.request_lost(index, nb, nl)
- if hit:
- d.active_requests = [ r for r in d.active_requests
- if r[0] not in pieces ]
- d._request_more()
- if not self.endgamemode and d.choked:
- d._check_interests()
- def requeue_piece_download(self, pieces = []):
- if self.endgame_queued_pieces:
- for piece in pieces:
- if not piece in self.endgame_queued_pieces:
- self.endgame_queued_pieces.append(piece)
- pieces = self.endgame_queued_pieces
- if self.endgamemode:
- if self.all_requests:
- self.endgame_queued_pieces = pieces
- return
- self.endgamemode = False
- self.endgame_queued_pieces = None
-
- ds = [d for d in self.downloads]
- shuffle(ds)
- for d in ds:
- if d.choked:
- d._check_interests()
- else:
- d._request_more()
- def start_endgame(self):
- assert not self.endgamemode
- self.endgamemode = True
- assert not self.all_requests
- for d in self.downloads:
- if d.active_requests:
- assert d.interested and not d.choked
- for request in d.active_requests:
- assert not request in self.all_requests
- self.all_requests.append(request)
- for d in self.downloads:
- d.fix_download_endgame()
- def pause(self, flag):
- self.paused = flag
- if flag:
- for d in self.downloads:
- for index, begin, length in d.active_requests:
- d.connection.send_cancel(index, begin, length)
- d._letgo()
- d.send_not_interested()
- if self.endgamemode:
- self._reset_endgame()
- else:
- shuffle(self.downloads)
- for d in self.downloads:
- d._check_interests()
- if d.interested and not d.choked:
- d._request_more()
|