| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- # by Greg Hazel
- from __future__ import generators
- import urllib
- import logging
- from BTL.DictWithLists import OrderedDict
- from BitTorrent.Connector import Connector
- from bisect import bisect_right
- from urlparse import urlparse
- from BitTorrent.HTTPDownloader import parseContentRange
- noisy = False
- #if noisy:
- connection_logger = logging.getLogger("BitTorrent.HTTPConnector")
- connection_logger.setLevel(logging.DEBUG)
- stream_handler = logging.StreamHandler()
- connection_logger.addHandler(stream_handler)
- log = connection_logger.debug
- class BatchRequests(object):
- def __init__(self):
- self.requests = {}
- # you should add from the perspective of a BatchRequest
- def _add_request(self, filename, begin, length, br):
- r = (filename, begin, length)
- assert r not in self.requests
- self.requests[r] = br
- def got_request(self, filename, begin, data):
- length = len(data)
- r = (filename, begin, length)
- br = self.requests.pop(r)
- br.got_request(filename, begin, length, data)
- return br
-
- class BatchRequest(object):
-
- def __init__(self, parent, start):
- self.parent = parent
- self.numactive = 0
- self.start = start
- self.requests = OrderedDict()
- def add_request(self, filename, begin, length):
- r = (filename, begin, length)
- assert r not in self.requests
- self.parent._add_request(filename, begin, length, self)
- self.requests[r] = None
- self.numactive += 1
- def got_request(self, filename, begin, length, data):
- self.requests[(filename, begin, length)] = data
- self.numactive -= 1
- def get_result(self):
- if self.numactive > 0:
- return None
- chunks = []
- for k in self.requests.itervalues():
- chunks.append(k)
- return ''.join(chunks)
-
- # kind of like storage wrapper for webserver interaction
- class URLage(object):
- def __init__(self, files):
- # a list of bytes ranges and filenames for window-based IO
- self.ranges = []
- self._build_url_structs(files)
-
- def _build_url_structs(self, files):
- total = 0
- for filename, length in files:
- if length > 0:
- self.ranges.append((total, total + length, filename))
- total += length
- self.total_length = total
- def _intervals(self, pos, amount):
- r = []
- stop = pos + amount
- p = max(bisect_right(self.ranges, (pos, )) - 1, 0)
- for begin, end, filename in self.ranges[p:]:
- if begin >= stop:
- break
- r.append((filename, max(pos, begin) - begin, min(end, stop) - begin))
- return r
- def _request(self, host, filename, pos, amount, prefix, append):
- b = pos
- e = b + amount - 1
- f = prefix
- if append:
- f += filename
- s = '\r\n'.join([
- "GET /%s HTTP/1.1" % (urllib.quote(f)),
- "Host: %s" % host,
- "Connection: Keep-Alive",
- "Range: bytes=%s-%s" % (b, e),
- "", ""])
- if noisy: log(s)
- return s
- def build_requests(self, brs, host, pos, amount, prefix, append):
- r = []
- br = BatchRequest(brs, pos)
- for filename, pos, end in self._intervals(pos, amount):
- s = self._request(host, filename, pos, end - pos, prefix, append)
- br.add_request(filename, pos, end - pos)
- r.append((filename, s))
- return r
-
- class HTTPConnector(Connector):
- """Implements the HTTP syntax with a BitTorrent Connector interface.
- Connection-level semantics are as normal, but the download is always
- unchoked after it's connected."""
- MAX_LINE_LENGTH = 16384
- UNCHOKED_SEED_COUNT = 5
- RATE_PERCENTAGE = 10
- def __init__(self, parent, piece_size, urlage, connection, id, outgoing, log_prefix):
- self.piece_size = piece_size
- self._header_lines = []
- self.manual_close = False
- self.urlage = urlage
- self.batch_requests = BatchRequests()
- # pipeline tracker
- self.request_paths = []
- # range request glue
- self.request_blocks = {}
- scheme, host, path, params, query, fragment = urlparse(id)
- if path and path[0] == '/':
- path = path[1:]
- self.host = host
- self.prefix = path
- self.append = not(len(self.urlage.ranges) == 1 and path and path[-1] != '/')
- Connector.__init__(self, parent, connection, id, outgoing, log_prefix=log_prefix)
- # blarg
- self._buffer = []
- self._buffer_len = 0
- def close(self):
- self.manual_close = True
- Connector.close(self)
- def send_handshake(self):
- if noisy: self.logger.info('connection made: %s' % self.id)
- self.complete = True
- self.parent.connection_handshake_completed(self)
- # ARGH. -G
- def _download_send_request(index, begin, length):
- piece_size = self.download.multidownload.storage.piece_size
- if begin + length > piece_size:
- raise ValueError("Issuing request that exceeds piece size: "
- "(%d + %d == %d) > %d" %
- (begin, length, begin + length, piece_size))
- self.download.multidownload.active_requests_add(index)
- a = self.download.multidownload.rm._break_up(begin, length)
- for b, l in a:
- self.download.active_requests.add((index, b, l))
- self.request_blocks[(index, begin, length)] = a
- assert self == self.download.connector
- self.send_request(index, begin, length)
- self.download.send_request = _download_send_request
- # prefer full pieces to reduce http overhead
- self.download.prefer_full = True
- self.download._got_have_all()
- self.download.got_unchoke()
- def send_request(self, index, begin, length):
- if noisy:
- self.logger.info("SEND %s %d %d %d" % ('GET', index, begin, length))
- b = (index * self.piece_size) + begin
- r = self.urlage.build_requests(self.batch_requests, self.host, b,
- length, self.prefix, self.append)
- for filename, s in r:
- self.request_paths.append(filename)
- self._write(s)
- def send_interested(self):
- pass
- def send_not_interested(self):
- pass
- def send_choke(self):
- self.choke_sent = self.upload.choked
- def send_unchoke(self):
- self.choke_sent = self.upload.choked
- def send_cancel(self, index, begin, length):
- pass
- def send_have(self, index):
- pass
- def send_bitfield(self, bitfield):
- pass
-
- def send_keepalive(self):
- # is there something I can do here?
- pass
- # yields the number of bytes it wants next, gets those in self._message
- def _read_messages(self):
- completing = False
-
- while True:
- self._header_lines = []
- yield None
- line = self._message.upper()
- if noisy: self.logger.info(line)
- l = line.split(None, 2)
- version = l[0]
- status = l[1]
- try:
- message = l[2]
- except IndexError:
- # sometimes there is no message
- message = ""
- if not version.startswith("HTTP"):
- self.protocol_violation('Not HTTP: %r' % self._message)
- return
-
- if status not in ('301', '302', '303', '206'):
- self.protocol_violation('Bad status message: %s' %
- self._message)
- return
- headers = {}
- while True:
- yield None
- if len(self._message) == 0:
- break
- if ':' not in self._message:
- self.protocol_violation('Bad header: %s' % self._message)
- return
- header, value = self._message.split(':', 1)
- header = header.lower()
- headers[header] = value.strip()
- if noisy: self.logger.info("incoming headers: %s" % (headers, ))
- # reset the header buffer so we can loop
- self._header_lines = []
- if status in ('301', '302', '303'):
- url = headers.get('location')
- if not url:
- self.protocol_violation('No location: %s' % self._message)
- return
- self.logger.warning("Redirect: %s" % url)
- self.parent.start_http_connection(url)
- return
- filename = self.request_paths.pop(0)
-
- start, end, realLength = parseContentRange(headers['content-range'])
- length = (end - start) + 1
- cl = int(headers.get('content-length', length))
- if cl != length:
- raise ValueError('Got c-l:%d bytes instead of l:%d' % (cl, length))
- yield length
- if len(self._message) != length:
- raise ValueError('Got m:%d bytes instead of l:%d' %
- (len(self._message), length))
-
- if noisy:
- self.logger.info("GOT %s %d %d" % ('GET', start, len(self._message)))
- self.got_anything = True
- br = self.batch_requests.got_request(filename, start, self._message)
- data = br.get_result()
- if data:
- index = br.start // self.piece_size
- if index >= self.parent.numpieces:
- return
- begin = br.start - (index * self.piece_size)
-
- if noisy:
- self.logger.info("GOT %s %d %d %d" % ('GET', index, begin, length))
- r = (index, begin, length)
- a = self.request_blocks.pop(r)
- for b, l in a:
- d = buffer(data, b - begin, l)
- self.download.got_piece(index, b, d)
- if noisy:
- self.logger.info("REMAINING: %d" % len(self.request_blocks))
- def data_came_in(self, conn, s):
- #self.logger.info( "HTTPConnector self=%s received string len(s): %d" % (self,len(s)))
- self.received_data = True
-
- if not self.download:
- # this is really annoying.
- self.sloppy_pre_connection_counter += len(s)
- else:
- l = self.sloppy_pre_connection_counter + len(s)
- self.sloppy_pre_connection_counter = 0
- self.download.fire_raw_received_listeners(l)
- self._buffer.append(s)
- self._buffer_len += len(s)
- #self.logger.info( "_buffer now has length: %s, _next_len=%s" %
- # (self._buffer_len, self._next_len ) )
- # not my favorite loop.
- # the goal is: read self._next_len bytes, or if it's None return all
- # data up to a \r\n
- while True:
- if self.closed:
- return
- if self._next_len == None:
- if self._header_lines:
- d = ''.join(self._buffer)
- m = self._header_lines.pop(0)
- else:
- if '\n' not in s:
- break
- d = ''.join(self._buffer)
- header = d.split('\n', 1)[0]
- self._header_lines.append(header)
- m = self._header_lines.pop(0)
- if len(m) > self.MAX_LINE_LENGTH:
- self.protocol_violation('Line length exceeded.')
- self.close()
- return
- self._next_len = len(m) + len('\n')
- m = m.rstrip('\r')
- else:
- if self._next_len > self._buffer_len:
- break
- d = ''.join(self._buffer)
- m = d[:self._next_len]
- s = d[self._next_len:]
- self._buffer = [s]
- self._buffer_len = len(s)
- self._message = m
- try:
- self._next_len = self._reader.next()
- except StopIteration:
- self.close()
- return
- def _optional_restart(self):
- if noisy: self.logger.info("_optional_restart: got_anything:%s manual_close:%s" % (self.got_anything, self.manual_close))
- if self.manual_close:
- return
- # we disconnect from the http seed in cases where we're getting
- # plenty of bandwidth elsewhere. the first measure is the number
- # of unchoked seeds we're connected to. the second is the
- # percentage of the total rate that the seed makes up.
- md = self.download.multidownload
- seed_count = md.get_unchoked_seed_count()
- # -1 because this http seed it counted still
- seed_count -= 1
- if seed_count > self.UNCHOKED_SEED_COUNT:
- torrent_rate = md.get_downrate()
- scale = (self.RATE_PERCENTAGE / 100.0)
- if self.download.get_rate() < (torrent_rate * scale):
- a = seed_count
- b = self.UNCHOKED_SEED_COUNT
- c = self.download.get_rate()
- d = torrent_rate * scale
- self.logger.info("Swarm performance: %s > %s && %s < %s" % (a, b, c, d))
- return
-
- if noisy: self.logger.info("restarting: %s" % self.id)
- # http keep-alive has a per-connection limit on the number of
- # requests also, it times out. both result it a dropped connection,
- # so re-make it. idealistically, the connection would hang around
- # even if dropped, and reconnect if we needed to make a new request
- # (that way we don't thrash the piece picker everytime we reconnect)
- self.parent.start_http_connection(self.id)
|