| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- # Written by Bram Cohen
- # see LICENSE.txt for license information
- from BitTornado.bitfield import Bitfield
- from BitTornado.clock import clock
- from binascii import b2a_hex
- try:
- True
- except:
- True = 1
- False = 0
- DEBUG1 = False
- DEBUG2 = False
- def toint(s):
- return long(b2a_hex(s), 16)
- def tobinary(i):
- return (chr(i >> 24) + chr((i >> 16) & 0xFF) +
- chr((i >> 8) & 0xFF) + chr(i & 0xFF))
- CHOKE = chr(0)
- UNCHOKE = chr(1)
- INTERESTED = chr(2)
- NOT_INTERESTED = chr(3)
- # index
- HAVE = chr(4)
- # index, bitfield
- BITFIELD = chr(5)
- # index, begin, length
- REQUEST = chr(6)
- # index, begin, piece
- PIECE = chr(7)
- # index, begin, piece
- CANCEL = chr(8)
- class Connection:
- def __init__(self, connection, connecter, ccount):
- self.connection = connection
- self.connecter = connecter
- self.ccount = ccount
- self.got_anything = False
- self.next_upload = None
- self.outqueue = []
- self.partial_message = None
- self.download = None
- self.send_choke_queued = False
- self.just_unchoked = None
- def get_ip(self, real=False):
- return self.connection.get_ip(real)
- def get_id(self):
- return self.connection.get_id()
- def get_readable_id(self):
- return self.connection.get_readable_id()
- def close(self):
- if DEBUG1:
- print (self.ccount,'connection closed')
- self.connection.close()
- def is_locally_initiated(self):
- return self.connection.is_locally_initiated()
- def is_encrypted(self):
- return self.connection.is_encrypted()
- def send_interested(self):
- self._send_message(INTERESTED)
- def send_not_interested(self):
- self._send_message(NOT_INTERESTED)
- def send_choke(self):
- if self.partial_message:
- self.send_choke_queued = True
- else:
- self._send_message(CHOKE)
- self.upload.choke_sent()
- self.just_unchoked = 0
- def send_unchoke(self):
- if self.send_choke_queued:
- self.send_choke_queued = False
- if DEBUG1:
- print (self.ccount,'CHOKE SUPPRESSED')
- else:
- self._send_message(UNCHOKE)
- if ( self.partial_message or self.just_unchoked is None
- or not self.upload.interested or self.download.active_requests ):
- self.just_unchoked = 0
- else:
- self.just_unchoked = clock()
- def send_request(self, index, begin, length):
- self._send_message(REQUEST + tobinary(index) +
- tobinary(begin) + tobinary(length))
- if DEBUG1:
- print (self.ccount,'sent request',index,begin,begin+length)
- def send_cancel(self, index, begin, length):
- self._send_message(CANCEL + tobinary(index) +
- tobinary(begin) + tobinary(length))
- if DEBUG1:
- print (self.ccount,'sent cancel',index,begin,begin+length)
- def send_bitfield(self, bitfield):
- self._send_message(BITFIELD + bitfield)
- def send_have(self, index):
- self._send_message(HAVE + tobinary(index))
- def send_keepalive(self):
- self._send_message('')
- def _send_message(self, s):
- if DEBUG2:
- if s:
- print (self.ccount,'SENDING MESSAGE',ord(s[0]),len(s))
- else:
- print (self.ccount,'SENDING MESSAGE',-1,0)
- s = tobinary(len(s))+s
- if self.partial_message:
- self.outqueue.append(s)
- else:
- self.connection.send_message_raw(s)
- def send_partial(self, bytes):
- if self.connection.closed:
- return 0
- if self.partial_message is None:
- s = self.upload.get_upload_chunk()
- if s is None:
- return 0
- index, begin, piece = s
- self.partial_message = ''.join((
- tobinary(len(piece) + 9), PIECE,
- tobinary(index), tobinary(begin), piece.tostring() ))
- if DEBUG1:
- print (self.ccount,'sending chunk',index,begin,begin+len(piece))
- if bytes < len(self.partial_message):
- self.connection.send_message_raw(self.partial_message[:bytes])
- self.partial_message = self.partial_message[bytes:]
- return bytes
- q = [self.partial_message]
- self.partial_message = None
- if self.send_choke_queued:
- self.send_choke_queued = False
- self.outqueue.append(tobinary(1)+CHOKE)
- self.upload.choke_sent()
- self.just_unchoked = 0
- q.extend(self.outqueue)
- self.outqueue = []
- q = ''.join(q)
- self.connection.send_message_raw(q)
- return len(q)
- def get_upload(self):
- return self.upload
- def get_download(self):
- return self.download
- def set_download(self, download):
- self.download = download
- def backlogged(self):
- return not self.connection.is_flushed()
- def got_request(self, i, p, l):
- self.upload.got_request(i, p, l)
- if self.just_unchoked:
- self.connecter.ratelimiter.ping(clock() - self.just_unchoked)
- self.just_unchoked = 0
-
- class Connecter:
- def __init__(self, make_upload, downloader, choker, numpieces,
- totalup, config, ratelimiter, sched = None):
- self.downloader = downloader
- self.make_upload = make_upload
- self.choker = choker
- self.numpieces = numpieces
- self.config = config
- self.ratelimiter = ratelimiter
- self.rate_capped = False
- self.sched = sched
- self.totalup = totalup
- self.rate_capped = False
- self.connections = {}
- self.external_connection_made = 0
- self.ccount = 0
- def how_many_connections(self):
- return len(self.connections)
- def connection_made(self, connection):
- self.ccount += 1
- c = Connection(connection, self, self.ccount)
- if DEBUG2:
- print (c.ccount,'connection made')
- self.connections[connection] = c
- c.upload = self.make_upload(c, self.ratelimiter, self.totalup)
- c.download = self.downloader.make_download(c)
- self.choker.connection_made(c)
- return c
- def connection_lost(self, connection):
- c = self.connections[connection]
- if DEBUG2:
- print (c.ccount,'connection closed')
- del self.connections[connection]
- if c.download:
- c.download.disconnected()
- self.choker.connection_lost(c)
- def connection_flushed(self, connection):
- conn = self.connections[connection]
- if conn.next_upload is None and (conn.partial_message is not None
- or len(conn.upload.buffer) > 0):
- self.ratelimiter.queue(conn)
-
- def got_piece(self, i):
- for co in self.connections.values():
- co.send_have(i)
- def got_message(self, connection, message):
- c = self.connections[connection]
- t = message[0]
- if DEBUG2:
- print (c.ccount,'message received',ord(t))
- if t == BITFIELD and c.got_anything:
- if DEBUG2:
- print (c.ccount,'misplaced bitfield')
- connection.close()
- return
- c.got_anything = True
- if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and
- len(message) != 1):
- if DEBUG2:
- print (c.ccount,'bad message length')
- connection.close()
- return
- if t == CHOKE:
- c.download.got_choke()
- elif t == UNCHOKE:
- c.download.got_unchoke()
- elif t == INTERESTED:
- if not c.download.have.complete():
- c.upload.got_interested()
- elif t == NOT_INTERESTED:
- c.upload.got_not_interested()
- elif t == HAVE:
- if len(message) != 5:
- if DEBUG2:
- print (c.ccount,'bad message length')
- connection.close()
- return
- i = toint(message[1:])
- if i >= self.numpieces:
- if DEBUG2:
- print (c.ccount,'bad piece number')
- connection.close()
- return
- if c.download.got_have(i):
- c.upload.got_not_interested()
- elif t == BITFIELD:
- try:
- b = Bitfield(self.numpieces, message[1:])
- except ValueError:
- if DEBUG2:
- print (c.ccount,'bad bitfield')
- connection.close()
- return
- if c.download.got_have_bitfield(b):
- c.upload.got_not_interested()
- elif t == REQUEST:
- if len(message) != 13:
- if DEBUG2:
- print (c.ccount,'bad message length')
- connection.close()
- return
- i = toint(message[1:5])
- if i >= self.numpieces:
- if DEBUG2:
- print (c.ccount,'bad piece number')
- connection.close()
- return
- c.got_request(i, toint(message[5:9]),
- toint(message[9:]))
- elif t == CANCEL:
- if len(message) != 13:
- if DEBUG2:
- print (c.ccount,'bad message length')
- connection.close()
- return
- i = toint(message[1:5])
- if i >= self.numpieces:
- if DEBUG2:
- print (c.ccount,'bad piece number')
- connection.close()
- return
- c.upload.got_cancel(i, toint(message[5:9]),
- toint(message[9:]))
- elif t == PIECE:
- if len(message) <= 9:
- if DEBUG2:
- print (c.ccount,'bad message length')
- connection.close()
- return
- i = toint(message[1:5])
- if i >= self.numpieces:
- if DEBUG2:
- print (c.ccount,'bad piece number')
- connection.close()
- return
- if c.download.got_piece(i, toint(message[5:9]), message[9:]):
- self.got_piece(i)
- else:
- connection.close()
|