MultiDownload.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Bram Cohen, Uoti Urpala
  11. import array
  12. import random
  13. from BTL.sparse_set import SparseSet
  14. from BTL.obsoletepythonsupport import set
  15. from BitTorrent.Download import Download
  16. SPARSE_SET = True
  17. if SPARSE_SET:
  18. from BitTorrent.PieceSetBuckets import PieceSetBuckets
  19. else:
  20. from BitTorrent.PieceSetBuckets import SortedPieceBuckets, resolve_typecode
  21. RENDER = False
  22. class PerIPStats(object):
  23. def __init__(self):
  24. self.numgood = 0
  25. self.bad = {}
  26. self.numconnections = 0
  27. self.lastdownload = None
  28. self.peerid = None
  29. class MultiDownload(object):
  30. def __init__(self, config, storage, rm, urlage, picker, numpieces,
  31. finished, errorfunc, kickfunc, banfunc, get_downrate):
  32. self.config = config
  33. self.storage = storage
  34. self.rm = rm
  35. self.urlage = urlage
  36. self.picker = picker
  37. self.errorfunc = errorfunc
  38. self.rerequester = None
  39. self.entered_endgame = False
  40. self.connection_manager = None
  41. self.chunksize = config['download_chunk_size']
  42. self.numpieces = numpieces
  43. self.finished = finished
  44. self.snub_time = config['snub_time']
  45. self.kickfunc = kickfunc
  46. self.banfunc = banfunc
  47. self.get_downrate = get_downrate
  48. self.downloads = []
  49. self.perip = {}
  50. self.bad_peers = {}
  51. self.discarded_bytes = 0
  52. self.useful_received_listeners = set()
  53. self.raw_received_listeners = set()
  54. if SPARSE_SET:
  55. self.piece_states = PieceSetBuckets()
  56. nothing = SparseSet()
  57. nothing.add(0, self.numpieces)
  58. self.piece_states.buckets.append(nothing)
  59. # I hate this
  60. nowhere = [(i, 0) for i in xrange(self.numpieces)]
  61. self.piece_states.place_in_buckets = dict(nowhere)
  62. else:
  63. typecode = resolve_typecode(self.numpieces)
  64. self.piece_states = SortedPieceBuckets(typecode)
  65. nothing = array.array(typecode, range(self.numpieces))
  66. self.piece_states.buckets.append(nothing)
  67. # I hate this
  68. nowhere = [(i, (0, i)) for i in xrange(self.numpieces)]
  69. self.piece_states.place_in_buckets = dict(nowhere)
  70. self.last_update = 0
  71. self.all_requests = set()
  72. def attach_connection_manager(self, connection_manager):
  73. self.connection_manager = connection_manager
  74. def aggregate_piece_states(self):
  75. d = {}
  76. d['h'] = self.storage.have_set
  77. d['t'] = set(self.rm.active_requests.iterkeys())
  78. for i, bucket in enumerate(self.piece_states.buckets):
  79. d[i] = bucket
  80. r = (self.numpieces, self.last_update, d)
  81. return r
  82. def get_unchoked_seed_count(self):
  83. seed_count = 0
  84. for d in self.downloads:
  85. if d.have.numfalse == 0 and not d.choked:
  86. seed_count += 1
  87. return seed_count
  88. def get_adjusted_distributed_copies(self):
  89. # compensate for the fact that piece picker does no
  90. # contain all the pieces
  91. num = self.picker.get_distributed_copies()
  92. percent_have = (float(len(self.storage.have_set)) /
  93. float(self.numpieces))
  94. num += percent_have
  95. if self.rerequester and self.rerequester.tracker_num_seeds:
  96. num += self.rerequester.tracker_num_seeds
  97. return num
  98. def active_requests_add(self, r):
  99. self.last_update += 1
  100. def active_requests_remove(self, r):
  101. self.last_update += 1
  102. def got_have(self, piece):
  103. self.picker.got_have(piece)
  104. self.last_update += 1
  105. p = self.piece_states
  106. p.add(piece, p.remove(piece) + 1)
  107. def got_have_all(self):
  108. self.picker.got_have_all()
  109. self.last_update += 1
  110. self.piece_states.prepend_bucket()
  111. def lost_have(self, piece):
  112. self.picker.lost_have(piece)
  113. self.last_update += 1
  114. p = self.piece_states
  115. p.add(piece, p.remove(piece) - 1)
  116. def lost_have_all(self):
  117. self.picker.lost_have_all()
  118. self.last_update += 1
  119. self.piece_states.popleft_bucket()
  120. def check_enter_endgame(self):
  121. if not self.entered_endgame:
  122. if self.rm.endgame:
  123. self.entered_endgame = True
  124. self.all_requests = set()
  125. for d in self.downloads:
  126. self.all_requests.update(d.active_requests)
  127. for d in self.downloads:
  128. d.fix_download_endgame()
  129. def hashchecked(self, index):
  130. if not self.storage.do_I_have(index):
  131. if self.rm.endgame:
  132. while self.rm.want_requests(index):
  133. nb, nl = self.rm.new_request(index)
  134. self.all_requests.add((index, nb, nl))
  135. for d in self.downloads:
  136. d.fix_download_endgame()
  137. else:
  138. ds = [d for d in self.downloads if not d.choked]
  139. random.shuffle(ds)
  140. for d in ds:
  141. d._request_more([index])
  142. return
  143. self.picker.complete(index)
  144. self.active_requests_remove(index)
  145. self.connection_manager.hashcheck_succeeded(index)
  146. if self.storage.have.numfalse == 0:
  147. for d in self.downloads:
  148. if d.have.numfalse == 0:
  149. d.connector.close()
  150. self.finished()
  151. def make_download(self, connector):
  152. ip = connector.ip
  153. perip = self.perip.setdefault(ip, PerIPStats())
  154. perip.numconnections += 1
  155. d = Download(self, connector)
  156. d.add_useful_received_listener(self.fire_useful_received_listeners)
  157. d.add_raw_received_listener(self.fire_raw_received_listeners)
  158. perip.lastdownload = d
  159. perip.peerid = connector.id
  160. self.downloads.append(d)
  161. return d
  162. def add_useful_received_listener(self,listener):
  163. """Listeners are called for useful arrivals to any of the downloaders
  164. managed by this MultiDownload object.
  165. (see Download.add_useful_received_listener for which listeners are
  166. called for bytes received by that particular Download."""
  167. self.useful_received_listeners.add(listener)
  168. def add_raw_received_listener(self, listener):
  169. """Listers are called whenever bytes arrive (i.e., to Connector.data_came_in)
  170. regardless of whether those bytes are useful."""
  171. self.raw_received_listeners.add(listener)
  172. def remove_useful_received_listener(self,listener):
  173. self.useful_received_listeners.remove(listener)
  174. def fire_useful_received_listeners(self,bytes):
  175. for f in self.useful_received_listeners:
  176. f(bytes)
  177. def remove_raw_received_listener(self, listener):
  178. self.raw_received_listeners.remove(listener)
  179. def fire_raw_received_listeners(self,bytes):
  180. for f in self.raw_received_listeners:
  181. f(bytes)
  182. def lost_peer(self, download):
  183. if download.have.numfalse == 0:
  184. # lost seed...
  185. pass
  186. self.downloads.remove(download)
  187. ip = download.connector.ip
  188. self.perip[ip].numconnections -= 1
  189. if self.perip[ip].lastdownload == download:
  190. self.perip[ip].lastdownload = None
  191. def kick(self, download):
  192. download.connector.protocol_violation("peer sent bad data")
  193. if not self.config['retaliate_to_garbled_data']:
  194. return
  195. ip = download.connector.ip
  196. peerid = download.connector.id
  197. # kickfunc will schedule connection.close() to be executed later; we
  198. # might now be inside RawServer event loop with events from that
  199. # connection already queued, and trying to handle them after doing
  200. # close() now could cause problems.
  201. self.kickfunc(download.connector)
  202. def ban(self, ip):
  203. if not self.config['retaliate_to_garbled_data']:
  204. return
  205. self.banfunc(ip)
  206. self.bad_peers[ip] = (True, self.perip[ip])