1
0

HTTPDownloader.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # Written by John Hoffman
  2. # see LICENSE.txt for license information
  3. from BitTornado.CurrentRateMeasure import Measure
  4. from random import randint
  5. from urlparse import urlparse
  6. from httplib import HTTPConnection
  7. from urllib import quote
  8. from threading import Thread
  9. from BitTornado.__init__ import product_name,version_short
  10. try:
  11. True
  12. except:
  13. True = 1
  14. False = 0
  15. EXPIRE_TIME = 60 * 60
  16. VERSION = product_name+'/'+version_short
  17. class haveComplete:
  18. def complete(self):
  19. return True
  20. def __getitem__(self, x):
  21. return True
  22. haveall = haveComplete()
  23. class SingleDownload:
  24. def __init__(self, downloader, url):
  25. self.downloader = downloader
  26. self.baseurl = url
  27. try:
  28. (scheme, self.netloc, path, pars, query, fragment) = urlparse(url)
  29. except:
  30. self.downloader.errorfunc('cannot parse http seed address: '+url)
  31. return
  32. if scheme != 'http':
  33. self.downloader.errorfunc('http seed url not http: '+url)
  34. return
  35. try:
  36. self.connection = HTTPConnection(self.netloc)
  37. except:
  38. self.downloader.errorfunc('cannot connect to http seed: '+url)
  39. return
  40. self.seedurl = path
  41. if pars:
  42. self.seedurl += ';'+pars
  43. self.seedurl += '?'
  44. if query:
  45. self.seedurl += query+'&'
  46. self.seedurl += 'info_hash='+quote(self.downloader.infohash)
  47. self.measure = Measure(downloader.max_rate_period)
  48. self.index = None
  49. self.url = ''
  50. self.requests = []
  51. self.request_size = 0
  52. self.endflag = False
  53. self.error = None
  54. self.retry_period = 30
  55. self._retry_period = None
  56. self.errorcount = 0
  57. self.goodseed = False
  58. self.active = False
  59. self.cancelled = False
  60. self.resched(randint(2,10))
  61. def resched(self, len = None):
  62. if len is None:
  63. len = self.retry_period
  64. if self.errorcount > 3:
  65. len = len * (self.errorcount - 2)
  66. self.downloader.rawserver.add_task(self.download, len)
  67. def _want(self, index):
  68. if self.endflag:
  69. return self.downloader.storage.do_I_have_requests(index)
  70. else:
  71. return self.downloader.storage.is_unstarted(index)
  72. def download(self):
  73. self.cancelled = False
  74. if self.downloader.picker.am_I_complete():
  75. self.downloader.downloads.remove(self)
  76. return
  77. self.index = self.downloader.picker.next(haveall, self._want)
  78. if ( self.index is None and not self.endflag
  79. and not self.downloader.peerdownloader.has_downloaders() ):
  80. self.endflag = True
  81. self.index = self.downloader.picker.next(haveall, self._want)
  82. if self.index is None:
  83. self.endflag = True
  84. self.resched()
  85. else:
  86. self.url = ( self.seedurl+'&piece='+str(self.index) )
  87. self._get_requests()
  88. if self.request_size < self.downloader.storage._piecelen(self.index):
  89. self.url += '&ranges='+self._request_ranges()
  90. rq = Thread(target = self._request)
  91. rq.setDaemon(False)
  92. rq.start()
  93. self.active = True
  94. def _request(self):
  95. import encodings.ascii
  96. import encodings.punycode
  97. import encodings.idna
  98. self.error = None
  99. self.received_data = None
  100. try:
  101. self.connection.request('GET',self.url, None,
  102. {'User-Agent': VERSION})
  103. r = self.connection.getresponse()
  104. self.connection_status = r.status
  105. self.received_data = r.read()
  106. except Exception, e:
  107. self.error = 'error accessing http seed: '+str(e)
  108. try:
  109. self.connection.close()
  110. except:
  111. pass
  112. try:
  113. self.connection = HTTPConnection(self.netloc)
  114. except:
  115. self.connection = None # will cause an exception and retry next cycle
  116. self.downloader.rawserver.add_task(self.request_finished)
  117. def request_finished(self):
  118. self.active = False
  119. if self.error is not None:
  120. if self.goodseed:
  121. self.downloader.errorfunc(self.error)
  122. self.errorcount += 1
  123. if self.received_data:
  124. self.errorcount = 0
  125. if not self._got_data():
  126. self.received_data = None
  127. if not self.received_data:
  128. self._release_requests()
  129. self.downloader.peerdownloader.piece_flunked(self.index)
  130. if self._retry_period:
  131. self.resched(self._retry_period)
  132. self._retry_period = None
  133. return
  134. self.resched()
  135. def _got_data(self):
  136. if self.connection_status == 503: # seed is busy
  137. try:
  138. self.retry_period = max(int(self.received_data),5)
  139. except:
  140. pass
  141. return False
  142. if self.connection_status != 200:
  143. self.errorcount += 1
  144. return False
  145. self._retry_period = 1
  146. if len(self.received_data) != self.request_size:
  147. if self.goodseed:
  148. self.downloader.errorfunc('corrupt data from http seed - redownloading')
  149. return False
  150. self.measure.update_rate(len(self.received_data))
  151. self.downloader.measurefunc(len(self.received_data))
  152. if self.cancelled:
  153. return False
  154. if not self._fulfill_requests():
  155. return False
  156. if not self.goodseed:
  157. self.goodseed = True
  158. self.downloader.seedsfound += 1
  159. if self.downloader.storage.do_I_have(self.index):
  160. self.downloader.picker.complete(self.index)
  161. self.downloader.peerdownloader.check_complete(self.index)
  162. self.downloader.gotpiecefunc(self.index)
  163. return True
  164. def _get_requests(self):
  165. self.requests = []
  166. self.request_size = 0L
  167. while self.downloader.storage.do_I_have_requests(self.index):
  168. r = self.downloader.storage.new_request(self.index)
  169. self.requests.append(r)
  170. self.request_size += r[1]
  171. self.requests.sort()
  172. def _fulfill_requests(self):
  173. start = 0L
  174. success = True
  175. while self.requests:
  176. begin, length = self.requests.pop(0)
  177. if not self.downloader.storage.piece_came_in(self.index, begin,
  178. self.received_data[start:start+length]):
  179. success = False
  180. break
  181. start += length
  182. return success
  183. def _release_requests(self):
  184. for begin, length in self.requests:
  185. self.downloader.storage.request_lost(self.index, begin, length)
  186. self.requests = []
  187. def _request_ranges(self):
  188. s = ''
  189. begin, length = self.requests[0]
  190. for begin1, length1 in self.requests[1:]:
  191. if begin + length == begin1:
  192. length += length1
  193. continue
  194. else:
  195. if s:
  196. s += ','
  197. s += str(begin)+'-'+str(begin+length-1)
  198. begin, length = begin1, length1
  199. if s:
  200. s += ','
  201. s += str(begin)+'-'+str(begin+length-1)
  202. return s
  203. class HTTPDownloader:
  204. def __init__(self, storage, picker, rawserver,
  205. finflag, errorfunc, peerdownloader,
  206. max_rate_period, infohash, measurefunc, gotpiecefunc):
  207. self.storage = storage
  208. self.picker = picker
  209. self.rawserver = rawserver
  210. self.finflag = finflag
  211. self.errorfunc = errorfunc
  212. self.peerdownloader = peerdownloader
  213. self.infohash = infohash
  214. self.max_rate_period = max_rate_period
  215. self.gotpiecefunc = gotpiecefunc
  216. self.measurefunc = measurefunc
  217. self.downloads = []
  218. self.seedsfound = 0
  219. def make_download(self, url):
  220. self.downloads.append(SingleDownload(self, url))
  221. return self.downloads[-1]
  222. def get_downloads(self):
  223. if self.finflag.isSet():
  224. return []
  225. return self.downloads
  226. def cancel_piece_download(self, pieces):
  227. for d in self.downloads:
  228. if d.active and d.index in pieces:
  229. d.cancelled = True