HTTPConnector.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. # by Greg Hazel
  2. from __future__ import generators
  3. import urllib
  4. import logging
  5. from BTL.DictWithLists import OrderedDict
  6. from BitTorrent.Connector import Connector
  7. from bisect import bisect_right
  8. from urlparse import urlparse
  9. from BitTorrent.HTTPDownloader import parseContentRange
  10. noisy = False
  11. #if noisy:
  12. connection_logger = logging.getLogger("BitTorrent.HTTPConnector")
  13. connection_logger.setLevel(logging.DEBUG)
  14. stream_handler = logging.StreamHandler()
  15. connection_logger.addHandler(stream_handler)
  16. log = connection_logger.debug
  17. class BatchRequests(object):
  18. def __init__(self):
  19. self.requests = {}
  20. # you should add from the perspective of a BatchRequest
  21. def _add_request(self, filename, begin, length, br):
  22. r = (filename, begin, length)
  23. assert r not in self.requests
  24. self.requests[r] = br
  25. def got_request(self, filename, begin, data):
  26. length = len(data)
  27. r = (filename, begin, length)
  28. br = self.requests.pop(r)
  29. br.got_request(filename, begin, length, data)
  30. return br
  31. class BatchRequest(object):
  32. def __init__(self, parent, start):
  33. self.parent = parent
  34. self.numactive = 0
  35. self.start = start
  36. self.requests = OrderedDict()
  37. def add_request(self, filename, begin, length):
  38. r = (filename, begin, length)
  39. assert r not in self.requests
  40. self.parent._add_request(filename, begin, length, self)
  41. self.requests[r] = None
  42. self.numactive += 1
  43. def got_request(self, filename, begin, length, data):
  44. self.requests[(filename, begin, length)] = data
  45. self.numactive -= 1
  46. def get_result(self):
  47. if self.numactive > 0:
  48. return None
  49. chunks = []
  50. for k in self.requests.itervalues():
  51. chunks.append(k)
  52. return ''.join(chunks)
  53. # kind of like storage wrapper for webserver interaction
  54. class URLage(object):
  55. def __init__(self, files):
  56. # a list of bytes ranges and filenames for window-based IO
  57. self.ranges = []
  58. self._build_url_structs(files)
  59. def _build_url_structs(self, files):
  60. total = 0
  61. for filename, length in files:
  62. if length > 0:
  63. self.ranges.append((total, total + length, filename))
  64. total += length
  65. self.total_length = total
  66. def _intervals(self, pos, amount):
  67. r = []
  68. stop = pos + amount
  69. p = max(bisect_right(self.ranges, (pos, )) - 1, 0)
  70. for begin, end, filename in self.ranges[p:]:
  71. if begin >= stop:
  72. break
  73. r.append((filename, max(pos, begin) - begin, min(end, stop) - begin))
  74. return r
  75. def _request(self, host, filename, pos, amount, prefix, append):
  76. b = pos
  77. e = b + amount - 1
  78. f = prefix
  79. if append:
  80. f += filename
  81. s = '\r\n'.join([
  82. "GET /%s HTTP/1.1" % (urllib.quote(f)),
  83. "Host: %s" % host,
  84. "Connection: Keep-Alive",
  85. "Range: bytes=%s-%s" % (b, e),
  86. "", ""])
  87. if noisy: log(s)
  88. return s
  89. def build_requests(self, brs, host, pos, amount, prefix, append):
  90. r = []
  91. br = BatchRequest(brs, pos)
  92. for filename, pos, end in self._intervals(pos, amount):
  93. s = self._request(host, filename, pos, end - pos, prefix, append)
  94. br.add_request(filename, pos, end - pos)
  95. r.append((filename, s))
  96. return r
  97. class HTTPConnector(Connector):
  98. """Implements the HTTP syntax with a BitTorrent Connector interface.
  99. Connection-level semantics are as normal, but the download is always
  100. unchoked after it's connected."""
  101. MAX_LINE_LENGTH = 16384
  102. UNCHOKED_SEED_COUNT = 5
  103. RATE_PERCENTAGE = 10
  104. def __init__(self, parent, piece_size, urlage, connection, id, outgoing, log_prefix):
  105. self.piece_size = piece_size
  106. self._header_lines = []
  107. self.manual_close = False
  108. self.urlage = urlage
  109. self.batch_requests = BatchRequests()
  110. # pipeline tracker
  111. self.request_paths = []
  112. # range request glue
  113. self.request_blocks = {}
  114. scheme, host, path, params, query, fragment = urlparse(id)
  115. if path and path[0] == '/':
  116. path = path[1:]
  117. self.host = host
  118. self.prefix = path
  119. self.append = not(len(self.urlage.ranges) == 1 and path and path[-1] != '/')
  120. Connector.__init__(self, parent, connection, id, outgoing, log_prefix=log_prefix)
  121. # blarg
  122. self._buffer = []
  123. self._buffer_len = 0
  124. def close(self):
  125. self.manual_close = True
  126. Connector.close(self)
  127. def send_handshake(self):
  128. if noisy: self.logger.info('connection made: %s' % self.id)
  129. self.complete = True
  130. self.parent.connection_handshake_completed(self)
  131. # ARGH. -G
  132. def _download_send_request(index, begin, length):
  133. piece_size = self.download.multidownload.storage.piece_size
  134. if begin + length > piece_size:
  135. raise ValueError("Issuing request that exceeds piece size: "
  136. "(%d + %d == %d) > %d" %
  137. (begin, length, begin + length, piece_size))
  138. self.download.multidownload.active_requests_add(index)
  139. a = self.download.multidownload.rm._break_up(begin, length)
  140. for b, l in a:
  141. self.download.active_requests.add((index, b, l))
  142. self.request_blocks[(index, begin, length)] = a
  143. assert self == self.download.connector
  144. self.send_request(index, begin, length)
  145. self.download.send_request = _download_send_request
  146. # prefer full pieces to reduce http overhead
  147. self.download.prefer_full = True
  148. self.download._got_have_all()
  149. self.download.got_unchoke()
  150. def send_request(self, index, begin, length):
  151. if noisy:
  152. self.logger.info("SEND %s %d %d %d" % ('GET', index, begin, length))
  153. b = (index * self.piece_size) + begin
  154. r = self.urlage.build_requests(self.batch_requests, self.host, b,
  155. length, self.prefix, self.append)
  156. for filename, s in r:
  157. self.request_paths.append(filename)
  158. self._write(s)
  159. def send_interested(self):
  160. pass
  161. def send_not_interested(self):
  162. pass
  163. def send_choke(self):
  164. self.choke_sent = self.upload.choked
  165. def send_unchoke(self):
  166. self.choke_sent = self.upload.choked
  167. def send_cancel(self, index, begin, length):
  168. pass
  169. def send_have(self, index):
  170. pass
  171. def send_bitfield(self, bitfield):
  172. pass
  173. def send_keepalive(self):
  174. # is there something I can do here?
  175. pass
  176. # yields the number of bytes it wants next, gets those in self._message
  177. def _read_messages(self):
  178. completing = False
  179. while True:
  180. self._header_lines = []
  181. yield None
  182. line = self._message.upper()
  183. if noisy: self.logger.info(line)
  184. l = line.split(None, 2)
  185. version = l[0]
  186. status = l[1]
  187. try:
  188. message = l[2]
  189. except IndexError:
  190. # sometimes there is no message
  191. message = ""
  192. if not version.startswith("HTTP"):
  193. self.protocol_violation('Not HTTP: %r' % self._message)
  194. return
  195. if status not in ('301', '302', '303', '206'):
  196. self.protocol_violation('Bad status message: %s' %
  197. self._message)
  198. return
  199. headers = {}
  200. while True:
  201. yield None
  202. if len(self._message) == 0:
  203. break
  204. if ':' not in self._message:
  205. self.protocol_violation('Bad header: %s' % self._message)
  206. return
  207. header, value = self._message.split(':', 1)
  208. header = header.lower()
  209. headers[header] = value.strip()
  210. if noisy: self.logger.info("incoming headers: %s" % (headers, ))
  211. # reset the header buffer so we can loop
  212. self._header_lines = []
  213. if status in ('301', '302', '303'):
  214. url = headers.get('location')
  215. if not url:
  216. self.protocol_violation('No location: %s' % self._message)
  217. return
  218. self.logger.warning("Redirect: %s" % url)
  219. self.parent.start_http_connection(url)
  220. return
  221. filename = self.request_paths.pop(0)
  222. start, end, realLength = parseContentRange(headers['content-range'])
  223. length = (end - start) + 1
  224. cl = int(headers.get('content-length', length))
  225. if cl != length:
  226. raise ValueError('Got c-l:%d bytes instead of l:%d' % (cl, length))
  227. yield length
  228. if len(self._message) != length:
  229. raise ValueError('Got m:%d bytes instead of l:%d' %
  230. (len(self._message), length))
  231. if noisy:
  232. self.logger.info("GOT %s %d %d" % ('GET', start, len(self._message)))
  233. self.got_anything = True
  234. br = self.batch_requests.got_request(filename, start, self._message)
  235. data = br.get_result()
  236. if data:
  237. index = br.start // self.piece_size
  238. if index >= self.parent.numpieces:
  239. return
  240. begin = br.start - (index * self.piece_size)
  241. if noisy:
  242. self.logger.info("GOT %s %d %d %d" % ('GET', index, begin, length))
  243. r = (index, begin, length)
  244. a = self.request_blocks.pop(r)
  245. for b, l in a:
  246. d = buffer(data, b - begin, l)
  247. self.download.got_piece(index, b, d)
  248. if noisy:
  249. self.logger.info("REMAINING: %d" % len(self.request_blocks))
  250. def data_came_in(self, conn, s):
  251. #self.logger.info( "HTTPConnector self=%s received string len(s): %d" % (self,len(s)))
  252. self.received_data = True
  253. if not self.download:
  254. # this is really annoying.
  255. self.sloppy_pre_connection_counter += len(s)
  256. else:
  257. l = self.sloppy_pre_connection_counter + len(s)
  258. self.sloppy_pre_connection_counter = 0
  259. self.download.fire_raw_received_listeners(l)
  260. self._buffer.append(s)
  261. self._buffer_len += len(s)
  262. #self.logger.info( "_buffer now has length: %s, _next_len=%s" %
  263. # (self._buffer_len, self._next_len ) )
  264. # not my favorite loop.
  265. # the goal is: read self._next_len bytes, or if it's None return all
  266. # data up to a \r\n
  267. while True:
  268. if self.closed:
  269. return
  270. if self._next_len == None:
  271. if self._header_lines:
  272. d = ''.join(self._buffer)
  273. m = self._header_lines.pop(0)
  274. else:
  275. if '\n' not in s:
  276. break
  277. d = ''.join(self._buffer)
  278. header = d.split('\n', 1)[0]
  279. self._header_lines.append(header)
  280. m = self._header_lines.pop(0)
  281. if len(m) > self.MAX_LINE_LENGTH:
  282. self.protocol_violation('Line length exceeded.')
  283. self.close()
  284. return
  285. self._next_len = len(m) + len('\n')
  286. m = m.rstrip('\r')
  287. else:
  288. if self._next_len > self._buffer_len:
  289. break
  290. d = ''.join(self._buffer)
  291. m = d[:self._next_len]
  292. s = d[self._next_len:]
  293. self._buffer = [s]
  294. self._buffer_len = len(s)
  295. self._message = m
  296. try:
  297. self._next_len = self._reader.next()
  298. except StopIteration:
  299. self.close()
  300. return
  301. def _optional_restart(self):
  302. if noisy: self.logger.info("_optional_restart: got_anything:%s manual_close:%s" % (self.got_anything, self.manual_close))
  303. if self.manual_close:
  304. return
  305. # we disconnect from the http seed in cases where we're getting
  306. # plenty of bandwidth elsewhere. the first measure is the number
  307. # of unchoked seeds we're connected to. the second is the
  308. # percentage of the total rate that the seed makes up.
  309. md = self.download.multidownload
  310. seed_count = md.get_unchoked_seed_count()
  311. # -1 because this http seed it counted still
  312. seed_count -= 1
  313. if seed_count > self.UNCHOKED_SEED_COUNT:
  314. torrent_rate = md.get_downrate()
  315. scale = (self.RATE_PERCENTAGE / 100.0)
  316. if self.download.get_rate() < (torrent_rate * scale):
  317. a = seed_count
  318. b = self.UNCHOKED_SEED_COUNT
  319. c = self.download.get_rate()
  320. d = torrent_rate * scale
  321. self.logger.info("Swarm performance: %s > %s && %s < %s" % (a, b, c, d))
  322. return
  323. if noisy: self.logger.info("restarting: %s" % self.id)
  324. # http keep-alive has a per-connection limit on the number of
  325. # requests also, it times out. both result it a dropped connection,
  326. # so re-make it. idealistically, the connection would hang around
  327. # even if dropped, and reconnect if we needed to make a new request
  328. # (that way we don't thrash the piece picker everytime we reconnect)
  329. self.parent.start_http_connection(self.id)