Download.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Bram Cohen, Uoti Urpala, David Harrison, and Greg Hazel
  11. import random
  12. import logging
  13. from BTL.obsoletepythonsupport import *
  14. from BTL.platform import bttime
  15. from BitTorrent.CurrentRateMeasure import Measure
  16. from BTL.bitfield import Bitfield
  17. logger = logging.getLogger("BitTorrent.Download")
  18. log = logger.debug
  19. class BadDataGuard(object):
  20. def __init__(self, download):
  21. self.download = download
  22. self.ip = download.connector.ip
  23. self.multidownload = download.multidownload
  24. self.stats = self.multidownload.perip[self.ip]
  25. self.lastindex = None
  26. def bad(self, index, bump = False):
  27. self.stats.bad.setdefault(index, 0)
  28. self.stats.bad[index] += 1
  29. if self.ip not in self.multidownload.bad_peers:
  30. self.multidownload.bad_peers[self.ip] = (False, self.stats)
  31. if self.download is not None:
  32. self.multidownload.kick(self.download)
  33. self.download = None
  34. elif (len(self.stats.bad) > 1 and self.stats.numconnections == 1 and
  35. self.stats.lastdownload is not None):
  36. # kick new connection from same IP if previous one sent bad data,
  37. # mainly to give the algorithm time to find other bad pieces
  38. # in case the peer is sending a lot of bad data
  39. self.multidownload.kick(self.stats.lastdownload)
  40. if len(self.stats.bad) >= 3 and len(self.stats.bad) > \
  41. self.stats.numgood // 30:
  42. self.multidownload.ban(self.ip)
  43. elif bump:
  44. self.multidownload.active_requests_remove(index)
  45. self.multidownload.picker.bump(index)
  46. def good(self, index):
  47. # lastindex is a hack to only increase numgood for by one for each good
  48. # piece, however many chunks came from the connection(s) from this IP
  49. if index != self.lastindex:
  50. self.stats.numgood += 1
  51. self.lastindex = index
  52. class Download(object):
  53. """Implements BitTorrent protocol semantics for downloading over a single
  54. connection. See Upload for the protocol semantics in the upload
  55. direction. See Connector for the protocol syntax implementation."""
  56. def __init__(self, multidownload, connector):
  57. self.multidownload = multidownload
  58. self.connector = connector
  59. self.choked = True
  60. self.interested = False
  61. self.prefer_full = False
  62. self.active_requests = set()
  63. self.expecting_reject = set()
  64. self.intro_size = self.multidownload.chunksize * 4 # just a guess
  65. self.measure = Measure(multidownload.config['max_rate_period'])
  66. self.peermeasure = Measure(
  67. max(multidownload.storage.piece_size / 10000, 20))
  68. self.have = Bitfield(multidownload.numpieces)
  69. self.last = 0
  70. self.example_interest = None
  71. self.guard = BadDataGuard(self)
  72. self.suggested_pieces = []
  73. self.allowed_fast_pieces = []
  74. self._useful_received_listeners = set()
  75. self._raw_received_listeners = set()
  76. self.add_useful_received_listener(self.measure.update_rate)
  77. self.total_bytes = 0
  78. self.add_useful_received_listener(self.accumulate_total)
  79. def accumulate_total(self, x):
  80. self.total_bytes += x
  81. def add_useful_received_listener(self, listener):
  82. # "useful received bytes are used in measuring goodput.
  83. self._useful_received_listeners.add(listener)
  84. def remove_useful_received_listener(self, listener):
  85. self._useful_received_listeners.remove(listener)
  86. def fire_useful_received_listeners(self, bytes):
  87. for f in self._useful_received_listeners:
  88. f(bytes)
  89. def add_raw_received_listener(self, listener):
  90. self._raw_received_listeners.add(listener)
  91. def remove_raw_received_listener(self, listener):
  92. self._raw_received_listeners.remove(listener)
  93. def fire_raw_received_listeners(self, bytes):
  94. for f in self._raw_received_listeners:
  95. f(bytes)
  96. def _backlog(self):
  97. # Dave's suggestion:
  98. # backlog = 2 + thruput delay product in chunks.
  99. # Assume one-way download propagation delay is always less than 200ms.
  100. # backlog = 2 + int(0.2 * self.measure.get_rate() /
  101. # self.multidownload.chunksize
  102. # Then eliminate the cap of 50 and the 0.075*backlog.
  103. backlog = 2 + int(4 * self.measure.get_rate() /
  104. self.multidownload.chunksize)
  105. if self.total_bytes < self.intro_size:
  106. # optimistic backlog to get things started
  107. backlog = max(10, backlog)
  108. if backlog > 50:
  109. backlog = max(50, int(.075 * backlog))
  110. if self.multidownload.rm.endgame:
  111. # OPTIONAL: zero pipelining during endgame
  112. #b = 1
  113. pass
  114. return backlog
  115. def disconnected(self):
  116. self.multidownload.lost_peer(self)
  117. if self.have.numfalse == 0:
  118. self.multidownload.lost_have_all()
  119. else:
  120. # arg, slow
  121. count = 0
  122. target = len(self.have) - self.have.numfalse
  123. for i in xrange(len(self.have)):
  124. if count == target:
  125. break
  126. if self.have[i]:
  127. self.multidownload.lost_have(i)
  128. count += 1
  129. self._letgo()
  130. self.guard.download = None
  131. def _letgo(self):
  132. if not self.active_requests:
  133. return
  134. if self.multidownload.rm.endgame:
  135. self.active_requests.clear()
  136. return
  137. lost = []
  138. for index, begin, length in self.active_requests:
  139. self.multidownload.rm.request_lost(index, begin, length)
  140. self.multidownload.active_requests_remove(index)
  141. if index not in lost:
  142. lost.append(index)
  143. self.active_requests.clear()
  144. ds = [d for d in self.multidownload.downloads if not d.choked]
  145. random.shuffle(ds)
  146. for d in ds:
  147. d._request_more(lost)
  148. for d in self.multidownload.downloads:
  149. if d.choked and not d.interested:
  150. for l in lost:
  151. if d._want(l):
  152. d.interested = True
  153. d.connector.send_interested()
  154. break
  155. def got_choke(self):
  156. if not self.choked:
  157. self.choked = True
  158. # ugly. instead, it should move all the requests to expecting_reject
  159. if not self.connector.uses_fast_extension:
  160. self._letgo()
  161. def got_unchoke(self):
  162. if self.choked:
  163. self.choked = False
  164. if self.interested:
  165. self._request_more()
  166. def got_piece(self, index, begin, piece):
  167. req = (index, begin, len(piece))
  168. if req not in self.active_requests:
  169. self.multidownload.discarded_bytes += len(piece)
  170. if self.connector.uses_fast_extension:
  171. # getting a piece we sent a cancel for
  172. # is just like receiving a reject
  173. self.got_reject_request(*req)
  174. return
  175. self.active_requests.remove(req)
  176. # we still give the peer credit in endgame, since we did request
  177. # the piece (it was in active_requests)
  178. self.fire_useful_received_listeners(len(piece))
  179. if self.multidownload.rm.endgame:
  180. if req not in self.multidownload.all_requests:
  181. self.multidownload.discarded_bytes += len(piece)
  182. return
  183. self.multidownload.all_requests.remove(req)
  184. for d in self.multidownload.downloads:
  185. if d.interested:
  186. if not d.choked and req in d.active_requests:
  187. d.connector.send_cancel(*req)
  188. d.active_requests.remove(req)
  189. if d.connector.uses_fast_extension:
  190. d.expecting_reject.add(req)
  191. d.fix_download_endgame()
  192. else:
  193. self._request_more()
  194. self.last = bttime()
  195. df = self.multidownload.storage.write(index, begin, piece, self.guard)
  196. df.addCallback(self._got_piece, index)
  197. df.addErrback(self.multidownload.errorfunc)
  198. def _got_piece(self, hashchecked, index):
  199. if hashchecked:
  200. self.multidownload.hashchecked(index)
  201. def _want(self, index):
  202. return (self.have[index] and
  203. self.multidownload.rm.want_requests(index))
  204. def send_request(self, index, begin, length):
  205. piece_size = self.multidownload.storage.piece_size
  206. if begin + length > piece_size:
  207. raise ValueError("Issuing request that exceeds piece size: "
  208. "(%d + %d == %d) > %d" %
  209. (begin, length, begin + length, piece_size))
  210. self.multidownload.active_requests_add(index)
  211. self.active_requests.add((index, begin, length))
  212. self.connector.send_request(index, begin, length)
  213. def _request_more(self, indices = []):
  214. if self.choked:
  215. self._request_when_choked()
  216. return
  217. #log( "_request_more.active_requests=%s" % self.active_requests )
  218. b = self._backlog()
  219. if len(self.active_requests) >= b:
  220. return
  221. if self.multidownload.rm.endgame:
  222. self.fix_download_endgame()
  223. return
  224. self.suggested_pieces = [i for i in self.suggested_pieces
  225. if not self.multidownload.storage.do_I_have(i)]
  226. lost_interests = []
  227. while len(self.active_requests) < b:
  228. if not indices:
  229. interest = self.multidownload.picker.next(self.have,
  230. self.multidownload.rm.active_requests,
  231. self.multidownload.rm.fully_active,
  232. self.suggested_pieces)
  233. else:
  234. interest = None
  235. for i in indices:
  236. if self._want(i):
  237. interest = i
  238. break
  239. if interest is None:
  240. break
  241. if not self.interested:
  242. self.interested = True
  243. self.connector.send_interested()
  244. # an example interest created by from_behind is preferable
  245. if self.example_interest is None:
  246. self.example_interest = interest
  247. # request as many chunks of interesting piece as fit in backlog.
  248. while len(self.active_requests) < b:
  249. begin, length = self.multidownload.rm.new_request(interest,
  250. self.prefer_full)
  251. self.send_request(interest, begin, length)
  252. if not self.multidownload.rm.want_requests(interest):
  253. lost_interests.append(interest)
  254. break
  255. if not self.active_requests and self.interested:
  256. self.interested = False
  257. self.connector.send_not_interested()
  258. self._check_lost_interests(lost_interests)
  259. self.multidownload.check_enter_endgame()
  260. def _check_lost_interests(self, lost_interests):
  261. """
  262. Notify other downloads that these pieces are no longer interesting.
  263. @param lost_interests: list of pieces that have been fully
  264. requested.
  265. """
  266. if not lost_interests:
  267. return
  268. for d in self.multidownload.downloads:
  269. if d.active_requests or not d.interested:
  270. continue
  271. if (d.example_interest is not None and
  272. self.multidownload.rm.want_requests(d.example_interest)):
  273. continue
  274. # any() does not exist until python 2.5
  275. #if not any([d.have[lost] for lost in lost_interests]):
  276. # continue
  277. for lost in lost_interests:
  278. if d.have[lost]:
  279. break
  280. else:
  281. continue
  282. interest = self.multidownload.picker.from_behind(d.have,
  283. self.multidownload.rm.fully_active)
  284. if interest is None:
  285. d.interested = False
  286. d.connector.send_not_interested()
  287. else:
  288. d.example_interest = interest
  289. def _request_when_choked(self):
  290. self.allowed_fast_pieces = [i for i in self.allowed_fast_pieces
  291. if not self.multidownload.storage.do_I_have(i)]
  292. if not self.allowed_fast_pieces:
  293. return
  294. fast = list(self.allowed_fast_pieces)
  295. b = self._backlog()
  296. lost_interests = []
  297. while len(self.active_requests) < b:
  298. while fast:
  299. piece = fast.pop()
  300. if self._want(piece):
  301. break
  302. else:
  303. break # no unrequested pieces among allowed fast.
  304. # request chunks until no more chunks or no more room in backlog.
  305. while len(self.active_requests) < b:
  306. begin, length = self.multidownload.rm.new_request(piece,
  307. self.prefer_full)
  308. self.send_request(piece, begin, length)
  309. if not self.multidownload.rm.want_requests(piece):
  310. lost_interests.append(piece)
  311. break
  312. self._check_lost_interests(lost_interests)
  313. self.multidownload.check_enter_endgame()
  314. def fix_download_endgame(self):
  315. want = []
  316. for a in self.multidownload.all_requests:
  317. if not self.have[a[0]]:
  318. continue
  319. if a in self.active_requests:
  320. continue
  321. want.append(a)
  322. if self.interested and not self.active_requests and not want:
  323. self.interested = False
  324. self.connector.send_not_interested()
  325. return
  326. if not self.interested and want:
  327. self.interested = True
  328. self.connector.send_interested()
  329. if self.choked:
  330. return
  331. random.shuffle(want)
  332. for req in want[:self._backlog() - len(self.active_requests)]:
  333. self.send_request(*req)
  334. def got_have(self, index):
  335. if self.have[index]:
  336. return
  337. if index == self.multidownload.numpieces-1:
  338. self.peermeasure.update_rate(self.multidownload.storage.total_length-
  339. (self.multidownload.numpieces-1)*self.multidownload.storage.piece_size)
  340. else:
  341. self.peermeasure.update_rate(self.multidownload.storage.piece_size)
  342. self.have[index] = True
  343. self.multidownload.got_have(index)
  344. if (self.multidownload.storage.get_amount_left() == 0 and
  345. self.have.numfalse == 0):
  346. self.connector.close()
  347. return
  348. if self.multidownload.rm.endgame:
  349. self.fix_download_endgame()
  350. elif self.multidownload.rm.want_requests(index):
  351. self._request_more([index]) # call _request_more whether choked.
  352. if self.choked and not self.interested:
  353. self.interested = True
  354. self.connector.send_interested()
  355. def got_have_bitfield(self, have):
  356. if have.numfalse == 0:
  357. self._got_have_all(have)
  358. return
  359. self.have = have
  360. # arg, slow
  361. count = 0
  362. target = len(self.have) - self.have.numfalse
  363. for i in xrange(len(self.have)):
  364. if count == target:
  365. break
  366. if self.have[i]:
  367. self.multidownload.got_have(i)
  368. count += 1
  369. if self.multidownload.rm.endgame:
  370. for piece, begin, length in self.multidownload.all_requests:
  371. if self.have[piece]:
  372. self.interested = True
  373. self.connector.send_interested()
  374. return
  375. for piece in self.multidownload.rm.iter_want():
  376. if self.have[piece]:
  377. self.interested = True
  378. self.connector.send_interested()
  379. return
  380. def _got_have_all(self, have=None):
  381. if self.multidownload.storage.get_amount_left() == 0:
  382. self.connector.close()
  383. return
  384. if have is None:
  385. # bleh
  386. n = self.multidownload.numpieces
  387. rlen, extra = divmod(n, 8)
  388. if extra:
  389. extra = chr((0xFF << (8 - extra)) & 0xFF)
  390. else:
  391. extra = ''
  392. s = (chr(0xFF) * rlen) + extra
  393. have = Bitfield(n, s)
  394. self.have = have
  395. self.multidownload.got_have_all()
  396. if self.multidownload.rm.endgame:
  397. for piece, begin, length in self.multidownload.all_requests:
  398. self.interested = True
  399. self.connector.send_interested()
  400. return
  401. for i in self.multidownload.rm.iter_want():
  402. self.interested = True
  403. self.connector.send_interested()
  404. return
  405. def get_rate(self):
  406. return self.measure.get_rate()
  407. def is_snubbed(self):
  408. return bttime() - self.last > self.multidownload.snub_time
  409. def got_have_none(self):
  410. pass # currently no action is taken when have_none is received.
  411. # The picker already assumes the local peer has none of the
  412. # pieces until got_have is called.
  413. def got_have_all(self):
  414. assert self.connector.uses_fast_extension
  415. self._got_have_all()
  416. def got_suggest_piece(self, piece):
  417. assert self.connector.uses_fast_extension
  418. if not self.multidownload.storage.do_I_have(piece):
  419. self.suggested_pieces.append(piece)
  420. self._request_more() # try to request more. Just returns if choked.
  421. def got_allowed_fast(self,piece):
  422. """Upon receiving this message, the multidownload knows that it is
  423. allowed to download the specified piece even when choked."""
  424. #log( "got_allowed_fast %d" % piece )
  425. assert self.connector.uses_fast_extension
  426. if not self.multidownload.storage.do_I_have(piece):
  427. if piece not in self.allowed_fast_pieces:
  428. self.allowed_fast_pieces.append(piece)
  429. random.shuffle(self.allowed_fast_pieces) # O(n) but n is small.
  430. self._request_more() # will try to request. Handles cases like
  431. # whether neighbor has "allowed fast" piece.
  432. def got_reject_request(self, piece, begin, length):
  433. assert self.connector.uses_fast_extension
  434. req = (piece, begin, length)
  435. if req not in self.expecting_reject:
  436. if req not in self.active_requests:
  437. self.connector.protocol_violation("Reject received for "
  438. "piece not pending")
  439. self.connector.close()
  440. return
  441. self.active_requests.remove(req)
  442. else:
  443. self.expecting_reject.remove(req)
  444. if self.multidownload.rm.endgame:
  445. return
  446. self.multidownload.rm.request_lost(*req)
  447. if not self.choked:
  448. self._request_more()
  449. ds = [d for d in self.multidownload.downloads if not d.choked]
  450. random.shuffle(ds)
  451. for d in ds:
  452. d._request_more([piece])
  453. for d in self.multidownload.downloads:
  454. if d.choked and not d.interested:
  455. if d._want(piece):
  456. d.interested = True
  457. d.connector.send_interested()
  458. break