StorageWrapper.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Bram Cohen and Greg Hazel
  11. from __future__ import division
  12. from __future__ import generators
  13. import os
  14. import sys
  15. import struct
  16. import cPickle
  17. import logging
  18. from array import array
  19. from BTL.translation import _
  20. from BTL.obsoletepythonsupport import set
  21. from BTL.sparse_set import SparseSet
  22. from BTL.bitfield import Bitfield
  23. from BTL import defer
  24. from BTL.defer import wrap_task
  25. from BTL.yielddefer import launch_coroutine
  26. from BitTorrent import BTFailure
  27. from BTL.exceptions import str_exc
  28. from BTL.hash import sha
  29. NO_PLACE = -1
  30. ALLOCATED = -1
  31. UNALLOCATED = -2
  32. FASTRESUME_PARTIAL = -3
  33. global_logger = logging.getLogger('StorageWrapper')
  34. #global_logger.setLevel(logging.DEBUG)
  35. #global_logger.addHandler(logging.StreamHandler(sys.stdout))
  36. class DataPig(object):
  37. def __init__(self, read, add_task):
  38. self.add_task = add_task
  39. self.read = read
  40. self.failed_pieces = {}
  41. self.download_history = {}
  42. def got_piece(self, index, begin, length, source):
  43. if index in self.failed_pieces:
  44. df = launch_coroutine(wrap_task(self.add_task),
  45. self._got_piece,
  46. index, begin, length, source)
  47. return df
  48. self.download_history.setdefault(index, {})
  49. self.download_history[index][begin] = source
  50. def _got_piece(self, index, begin, piece, source):
  51. df = self.read(index, len(piece), offset=begin)
  52. yield df
  53. data = df.getResult()
  54. if data != piece:
  55. if (index in self.download_history and
  56. begin in self.download_history[index]):
  57. d = self.download_history[index][begin]
  58. self.failed_pieces[index].add(d)
  59. self.download_history.setdefault(index, {})
  60. self.download_history[index][begin] = source
  61. def finished_piece(self, index):
  62. for d in self.download_history[index].itervalues():
  63. d.good(index)
  64. del self.download_history[index]
  65. if index in self.failed_pieces:
  66. for d in self.failed_pieces[index]:
  67. d.bad(index)
  68. del self.failed_pieces[index]
  69. def failed_piece(self, index):
  70. self.failed_pieces[index] = set()
  71. allsenders = {}
  72. for d in self.download_history[index].itervalues():
  73. allsenders[d] = None
  74. if len(allsenders) == 1:
  75. culprit = allsenders.keys()[0]
  76. culprit.bad(index, bump = True)
  77. del self.failed_pieces[index] # found the culprit already
  78. current_version = 2
  79. resume_prefix = 'BitTorrent resume state file, version '
  80. version_string = resume_prefix + str(current_version)
  81. class StorageWrapper(object):
  82. READ_AHEAD_BUFFER_SIZE = 2**22 # 4mB
  83. def __init__(self, storage, rm, config, hashes, piece_size,
  84. statusfunc, doneflag, data_flunked,
  85. infohash, # needed for partials
  86. is_batch, errorfunc, working_path, destination_path, resumefile,
  87. add_task, external_add_task):
  88. assert len(hashes) > 0
  89. assert piece_size > 0
  90. self.initialized = False
  91. self.numpieces = len(hashes)
  92. self.infohash = infohash
  93. self.is_batch = is_batch
  94. self.add_task = add_task
  95. self.external_add_task = external_add_task
  96. self.storage = storage
  97. self.config = config
  98. self.doneflag = doneflag
  99. self.hashes = hashes
  100. self.piece_size = piece_size
  101. self.data_flunked = data_flunked
  102. self.errorfunc = errorfunc
  103. self.statusfunc = statusfunc
  104. self.total_length = storage.get_total_length()
  105. # a brief explanation about the mildly confusing amount_ variables:
  106. # amount_left: total_length - fully_written_pieces
  107. # amount_inactive: amount_left - blocks_written - requests_pending_on_network
  108. # amount_left_with_partials (only correct during startup): amount_left + blocks_written
  109. self.amount_left = self.total_length
  110. if self.total_length <= piece_size * (self.numpieces - 1):
  111. raise BTFailure(_("bad data in torrent file - total too small"))
  112. if self.total_length > piece_size * self.numpieces:
  113. raise BTFailure(_("bad data in torrent file - total too big"))
  114. self.have_callbacks = {}
  115. # a index => df dict for locking pieces
  116. self.blocking_pieces = {}
  117. self.have = Bitfield(self.numpieces)
  118. self.have_set = SparseSet()
  119. self.checked_pieces = SparseSet()
  120. self.fastresume = False
  121. self.fastresume_dirty = False
  122. self._pieces_in_buf = []
  123. self._piece_buf = None
  124. self.partial_mark = None
  125. if self.numpieces < 32768:
  126. self.typecode = 'h'
  127. else:
  128. self.typecode = 'l'
  129. self.rm = rm
  130. self.rm.amount_inactive = self.total_length
  131. read = lambda i, l, offset : self._storage_read(self.places[i], l,
  132. offset=offset)
  133. self.datapig = DataPig(read, self.add_task)
  134. self.places = array(self.typecode, [NO_PLACE] * self.numpieces)
  135. check_hashes = self.config['check_hashes']
  136. self.done_checking_df = defer.Deferred()
  137. self.lastlen = self._piecelen(self.numpieces - 1)
  138. global_logger.debug("Loading fastresume...")
  139. if not check_hashes:
  140. self.rplaces = array(self.typecode, range(self.numpieces))
  141. self.places = self.rplaces
  142. self.amount_left = 0
  143. self.rm.amount_inactive = self.amount_left
  144. self.amount_left_with_partials = self.rm.amount_inactive
  145. self.have.numfalse = 0
  146. self.have.bits = None
  147. self.have_set.add(0, self.numpieces)
  148. self._initialized(True)
  149. else:
  150. try:
  151. result = self.read_fastresume(resumefile, working_path,
  152. destination_path)
  153. # if resume file doesn't apply to this destination or
  154. # working path then start over.
  155. if not result:
  156. self.rplaces = array(self.typecode, [UNALLOCATED] * self.numpieces)
  157. # full hashcheck
  158. df = self.hashcheck_pieces()
  159. df.addCallback(self._initialized)
  160. except:
  161. # if resumefile is not None:
  162. # global_logger.warning("Failed to read fastresume",
  163. # exc_info=sys.exc_info())
  164. self.rplaces = array(self.typecode, [UNALLOCATED] * self.numpieces)
  165. # full hashcheck
  166. df = self.hashcheck_pieces()
  167. df.addCallback(self._initialized)
  168. def _initialized(self, v):
  169. self._pieces_in_buf = []
  170. self._piece_buf = None
  171. self.initialized = v
  172. global_logger.debug('Initialized')
  173. self.done_checking_df.callback(v)
  174. ## fastresume
  175. ############################################################################
  176. def read_fastresume(self, f, working_path, destination_path):
  177. version_line = f.readline().strip()
  178. try:
  179. resume_version = version_line.split(resume_prefix)[1]
  180. except Exception, e:
  181. raise BTFailure(_("Unsupported fastresume file format, "
  182. "probably corrupted: %s on (%s)") %
  183. (str_exc(e), repr(version_line)))
  184. global_logger.debug('Reading fastresume v' + resume_version)
  185. if resume_version == '1':
  186. return self._read_fastresume_v1(f, working_path, destination_path)
  187. elif resume_version == '2':
  188. return self._read_fastresume_v2(f, working_path, destination_path)
  189. else:
  190. raise BTFailure(_("Unsupported fastresume file format, "
  191. "maybe from another client version?"))
  192. def _read_fastresume_v1(self, f, working_path, destination_path):
  193. # skip a bunch of lines
  194. amount_done = int(f.readline())
  195. for b, e, filename in self.storage.ranges:
  196. line = f.readline()
  197. # now for the good stuff
  198. r = array(self.typecode)
  199. r.fromfile(f, self.numpieces)
  200. self.rplaces = r
  201. df = self.checkPieces_v1()
  202. df.addCallback(self._initialized)
  203. def checkPieces_v1(self):
  204. df = launch_coroutine(wrap_task(self.add_task),
  205. self._checkPieces_v1)
  206. return df
  207. def _checkPieces_v1(self):
  208. partials = {}
  209. needs_full_hashcheck = False
  210. for i in xrange(self.numpieces):
  211. piece_len = self._piecelen(i)
  212. t = self.rplaces[i]
  213. if t >= 0:
  214. self._markgot(t, i)
  215. elif t in (ALLOCATED, UNALLOCATED):
  216. pass
  217. elif t == FASTRESUME_PARTIAL:
  218. df = self._storage_read(i, piece_len)
  219. yield df
  220. try:
  221. data = df.getResult()
  222. except:
  223. global_logger.error(_("Bad fastresume info "
  224. "(truncation at piece %d)") % i)
  225. needs_full_hashcheck = True
  226. i -= 1
  227. break
  228. self._check_partial(i, partials, data)
  229. self.rplaces[i] = ALLOCATED
  230. # we're shutting down, abort.
  231. if self.doneflag.isSet():
  232. yield False
  233. else:
  234. global_logger.error(_("Bad fastresume info (illegal value at "
  235. "piece %d)") % i)
  236. needs_full_hashcheck = True
  237. i -= 1
  238. break
  239. if needs_full_hashcheck:
  240. df = self.hashcheck_pieces(i)
  241. yield df
  242. r = df.getResult()
  243. if r == False:
  244. yield False
  245. self._realize_partials(partials)
  246. yield True
  247. def _read_fastresume_v2(self, f, working_path, destination_path):
  248. # The working and destination paths are "save_as" paths meaning
  249. # that they refer to the entire path for a single-file torrent and the
  250. # name of the directory containing the files for a batch torrent.
  251. # Path read from resume should either reside in/at the
  252. # working_path or the destination_path.
  253. d = cPickle.loads(f.read())
  254. try:
  255. snapshot = d['snapshot']
  256. work_or_dest = 0
  257. for filename, s in snapshot.iteritems():
  258. # all files should reside in either the working path or the
  259. # destination path. For batch torrents, the file may have a
  260. # relative path so compare common path.
  261. if self.is_batch:
  262. commonw = os.path.commonprefix((filename, working_path))
  263. commond = os.path.commonprefix((filename, destination_path))
  264. else:
  265. commonw = commond = filename
  266. # first file determines whether all are in work or dest path.
  267. if work_or_dest == 0:
  268. if commonw == working_path:
  269. work_or_dest = -1
  270. elif commond == destination_path:
  271. work_or_dest = 1
  272. else:
  273. return False
  274. elif work_or_dest == -1 and commonw != working_path:
  275. return False
  276. elif work_or_dest == 1 and commond != destination_path:
  277. return False
  278. # this could be a lot smarter, like punching holes in the
  279. # ranges on failed files in a batch torrent.
  280. if not os.path.exists(filename):
  281. raise ValueError("No such file or directory: %s" % filename)
  282. if os.path.getsize(filename) < s['size']:
  283. raise ValueError("File sizes do not match.")
  284. if os.path.getmtime(filename) < (s['mtime'] - 5):
  285. raise ValueError("File modification times do not match.")
  286. self.places = array(self.typecode)
  287. self.places.fromstring(d['places'])
  288. self.rplaces = array(self.typecode)
  289. self.rplaces.fromstring(d['rplaces'])
  290. self.have = d['have']
  291. self.have_set = d['have_set']
  292. # We are reading the undownloaded section from the fast resume.
  293. # We should check whether the file exists. If it doesn't then
  294. # we should not read from fastresume.
  295. self.storage.undownloaded = d['undownloaded']
  296. self.amount_left = d['amount_left']
  297. assert self.amount_left >= 0
  298. self.rm.amount_inactive = self.amount_left
  299. # all unwritten partials are now inactive
  300. for k, v in d['unwritten_partials'].iteritems():
  301. self.rm.add_inactive(k, v)
  302. # these are equal at startup, because nothing has been requested
  303. self.amount_left_with_partials = self.rm.amount_inactive
  304. if self.amount_left_with_partials < 0:
  305. raise ValueError("Amount left < 0: %d" %
  306. self.amount_left_with_partials)
  307. if self.amount_left_with_partials > self.total_length:
  308. raise ValueError("Amount left > total length: %d > %d" %
  309. (self.amount_left_with_partials, self.total_length))
  310. self._initialized(True)
  311. except:
  312. self.amount_left = self.total_length
  313. self.have = Bitfield(self.numpieces)
  314. self.have_set = SparseSet()
  315. self.rm.inactive_requests = {}
  316. self.rm.active_requests = {}
  317. self.places = array(self.typecode, [NO_PLACE] * self.numpieces)
  318. self.rplaces = array(self.typecode, range(self.numpieces))
  319. raise
  320. return True
  321. def write_fastresume(self, resumefile):
  322. try:
  323. self._write_fastresume_v2(resumefile)
  324. except:
  325. global_logger.exception("write_fastresume failed")
  326. def _write_fastresume_v2(self, resumefile):
  327. if not self.initialized:
  328. return
  329. global_logger.debug('Writing fast resume: %s' % version_string)
  330. resumefile.write(version_string + '\n')
  331. d = {}
  332. snapshot = {}
  333. for filename in self.storage.range_by_name.iterkeys():
  334. if not os.path.exists(filename):
  335. continue
  336. s = {}
  337. s['size'] = os.path.getsize(filename)
  338. s['mtime'] = os.path.getmtime(filename)
  339. snapshot[filename] = s
  340. d['snapshot'] = snapshot
  341. d['places'] = self.places.tostring()
  342. d['rplaces'] = self.rplaces.tostring()
  343. d['have'] = self.have
  344. d['have_set'] = self.have_set
  345. d['undownloaded'] = self.storage.undownloaded
  346. d['amount_left'] = self.amount_left
  347. d['unwritten_partials'] = self.rm.get_unwritten_requests()
  348. resumefile.write(cPickle.dumps(d))
  349. self.fastresume_dirty = False
  350. ############################################################################
  351. def _markgot(self, piece, pos):
  352. if self.have[piece]:
  353. if piece != pos:
  354. return
  355. self.rplaces[self.places[pos]] = ALLOCATED
  356. self.places[pos] = self.rplaces[pos] = pos
  357. return
  358. self.places[piece] = pos
  359. self.rplaces[pos] = piece
  360. self.have[piece] = True
  361. self.have_set.add(piece)
  362. plen = self._piecelen(piece)
  363. self.storage.downloaded(self.piece_size * piece, plen)
  364. self.amount_left -= plen
  365. assert self.amount_left >= 0
  366. self.rm.amount_inactive -= plen
  367. if piece in self.have_callbacks:
  368. for c in self.have_callbacks.pop(piece):
  369. c.callback(None)
  370. assert piece not in self.rm.inactive_requests
  371. ## hashcheck
  372. ############################################################################
  373. def _get_data(self, i):
  374. if i in self._pieces_in_buf:
  375. p = i - self._pieces_in_buf[0]
  376. return buffer(self._piece_buf, p * self.piece_size, self._piecelen(i))
  377. df = launch_coroutine(wrap_task(self.add_task),
  378. self._get_data_gen, i)
  379. return df
  380. def _get_data_gen(self, i):
  381. num_pieces = int(max(1, self.READ_AHEAD_BUFFER_SIZE / self.piece_size))
  382. if i + num_pieces >= self.numpieces:
  383. size = self.total_length - (i * self.piece_size)
  384. num_pieces = self.numpieces - i
  385. else:
  386. size = num_pieces * self.piece_size
  387. self._pieces_in_buf = range(i, i + num_pieces)
  388. df = self._storage_read(i, size)
  389. yield df
  390. try:
  391. self._piece_buf = df.getResult()
  392. except BTFailure: # short read
  393. self._piece_buf = ''
  394. p = i - self._pieces_in_buf[0]
  395. yield buffer(self._piece_buf, p * self.piece_size, self._piecelen(i))
  396. def hashcheck_pieces(self, begin=0, end=None):
  397. df = launch_coroutine(wrap_task(self.add_task),
  398. self._hashcheck_pieces,
  399. begin, end)
  400. return df
  401. def _hashcheck_pieces(self, begin=0, end=None):
  402. # we need a full reverse-lookup of hashes for out of order compatability
  403. targets = {}
  404. for i in xrange(self.numpieces):
  405. targets[self.hashes[i]] = i
  406. partials = {}
  407. if end is None:
  408. end = self.numpieces
  409. global_logger.debug('Hashcheck from %d to %d' % (begin, end))
  410. # TODO: make this work with more than one running at a time
  411. for i in xrange(begin, end):
  412. # we're shutting down, abort.
  413. if self.doneflag.isSet():
  414. yield False
  415. piece_len = self._piecelen(i)
  416. global_logger.debug( "i=%d, piece_len=%d" % (i,piece_len) )
  417. if not self._waspre(i, piece_len):
  418. # hole in the file
  419. continue
  420. r = self._get_data(i)
  421. if isinstance(r, defer.Deferred):
  422. yield r
  423. data = r.getResult()
  424. else:
  425. data = r
  426. sh = sha(buffer(data, 0, self.lastlen))
  427. sp = sh.digest()
  428. sh.update(buffer(data, self.lastlen))
  429. s = sh.digest()
  430. # handle out-of-order pieces
  431. if s in targets and piece_len == self._piecelen(targets[s]):
  432. # handle one or more pieces with identical hashes properly
  433. piece_found = i
  434. if s != self.hashes[i]:
  435. piece_found = targets[s]
  436. self.checked_pieces.add(piece_found)
  437. self._markgot(piece_found, i)
  438. # last piece junk. I'm not even sure this is right.
  439. elif (not self.have[self.numpieces - 1] and
  440. sp == self.hashes[-1] and
  441. (i == self.numpieces - 1 or
  442. not self._waspre(self.numpieces - 1))):
  443. self.checked_pieces.add(self.numpieces - 1)
  444. self._markgot(self.numpieces - 1, i)
  445. else:
  446. self._check_partial(i, partials, data)
  447. self.statusfunc(fractionDone = 1 - self.amount_left /
  448. self.total_length)
  449. global_logger.debug('Hashcheck from %d to %d complete.' % (begin, end))
  450. self._realize_partials(partials)
  451. self.fastresume_dirty = True
  452. yield True
  453. def hashcheck_piece(self, index, data = None):
  454. df = launch_coroutine(wrap_task(self.add_task),
  455. self._hashcheck_piece,
  456. index, data = data)
  457. return df
  458. def _hashcheck_piece(self, index, data = None):
  459. if not data:
  460. df = self._storage_read(index, self._piecelen(index))
  461. yield df
  462. data = df.getResult()
  463. if sha(data).digest() != self.hashes[index]:
  464. yield False
  465. self.checked_pieces.add(index)
  466. yield True
  467. ############################################################################
  468. ## out of order compatability
  469. ############################################################################
  470. def _initalloc(self, pos, piece):
  471. assert self.rplaces[pos] < 0
  472. assert self.places[piece] == NO_PLACE
  473. p = self.piece_size * pos
  474. length = self._piecelen(pos)
  475. self.places[piece] = pos
  476. self.rplaces[pos] = piece
  477. def _move_piece(self, oldpos, newpos):
  478. assert self.rplaces[newpos] < 0
  479. assert self.rplaces[oldpos] >= 0
  480. df = self._storage_read(oldpos, self._piecelen(newpos))
  481. yield df
  482. data = df.getResult()
  483. df = self._storage_write(newpos, data)
  484. yield df
  485. df.getResult()
  486. piece = self.rplaces[oldpos]
  487. self.places[piece] = newpos
  488. self.rplaces[oldpos] = ALLOCATED
  489. self.rplaces[newpos] = piece
  490. if not self.have[piece]:
  491. return
  492. data = buffer(data, 0, self._piecelen(piece))
  493. if sha(data).digest() != self.hashes[piece]:
  494. raise BTFailure(_("data corrupted on disk - "
  495. "maybe you have two copies running?"))
  496. ############################################################################
  497. def get_piece_range_for_filename(self, filename):
  498. begin, end = self.storage.get_byte_range_for_filename(filename)
  499. begin = int(begin / self.piece_size)
  500. end = int(end / self.piece_size)
  501. return begin, end
  502. def _waspre(self, piece, piece_len=None):
  503. if piece_len is None:
  504. piece_len = self._piecelen(piece)
  505. return self.storage.was_preallocated(piece * self.piece_size, piece_len)
  506. def _piecelen(self, piece):
  507. if piece < self.numpieces - 1:
  508. return self.piece_size
  509. else:
  510. return self.total_length - piece * self.piece_size
  511. def get_total_length(self):
  512. """Returns the total length of the torrent in bytes."""
  513. return self.total_length
  514. def get_num_pieces(self):
  515. """Returns the total number of pieces in this torrent."""
  516. return self.numpieces
  517. def get_amount_left(self):
  518. """Returns the number of bytes left to download."""
  519. return self.amount_left
  520. def do_I_have_anything(self):
  521. return self.amount_left < self.total_length
  522. def get_have_list(self):
  523. return self.have.tostring()
  524. def do_I_have(self, index):
  525. return self.have[index]
  526. def _block_piece(self, index, df):
  527. self.blocking_pieces[index] = df
  528. df.addCallback(lambda x: self.blocking_pieces.pop(index))
  529. return df
  530. def write(self, index, begin, piece, source):
  531. df = launch_coroutine(wrap_task(self.add_task),
  532. self._write,
  533. index, begin, piece, source)
  534. return df
  535. def _write(self, index, begin, piece, source):
  536. if index in self.blocking_pieces:
  537. df = self.blocking_pieces[index]
  538. yield df
  539. df.getResult()
  540. if self.places[index] < 0:
  541. # since old versions of BT wrote out-of-order, we could
  542. # come across a piece which is misplaced. move it to the
  543. # correct place.
  544. if self.rplaces[index] >= 0:
  545. new_pos = self.rplaces[index]
  546. df = launch_coroutine(wrap_task(self.add_task),
  547. self._move_piece, index, new_pos)
  548. yield self._block_piece(index, df)
  549. df.getResult()
  550. self._initalloc(index, index)
  551. df = self.datapig.got_piece(index, begin, piece, source)
  552. if df is not None:
  553. yield df
  554. df.getResult()
  555. df = self._storage_write(self.places[index], piece, offset=begin)
  556. yield df
  557. df.getResult()
  558. self.rm.request_received(index, begin, len(piece))
  559. hashcheck = self.rm.is_piece_received(index)
  560. if hashcheck:
  561. df = self.hashcheck_piece(self.places[index])
  562. yield df
  563. passed = df.getResult()
  564. self.rm.piece_finished(index)
  565. length = self._piecelen(index)
  566. if passed:
  567. self.have[index] = True
  568. self.have_set.add(index)
  569. self.storage.downloaded(index * self.piece_size, length)
  570. self.amount_left -= length
  571. assert self.amount_left >= 0
  572. self.datapig.finished_piece(index)
  573. if index in self.have_callbacks:
  574. for c in self.have_callbacks.pop(index):
  575. c.callback(None)
  576. else:
  577. self.data_flunked(length, index)
  578. self.rm.amount_inactive += length
  579. self.datapig.failed_piece(index)
  580. self.fastresume_dirty = True
  581. yield hashcheck
  582. def get_piece(self, index):
  583. if not self.have[index]:
  584. df = defer.Deferred()
  585. self.have_callbacks.setdefault(index, []).append(df)
  586. yield df
  587. df.getResult()
  588. assert self.have[index]
  589. df = self.read(index, 0, self._piecelen(index))
  590. yield df
  591. r = df.getResult()
  592. yield r
  593. def read(self, index, begin, length):
  594. if not self.have[index]:
  595. raise IndexError("Do not have piece %d of %d" %
  596. (index, self.numpieces))
  597. df = launch_coroutine(wrap_task(self.add_task),
  598. self._read, index, begin, length)
  599. return df
  600. def _read(self, index, begin, length):
  601. if index in self.blocking_pieces:
  602. df = self.blocking_pieces[index]
  603. yield df
  604. df.getResult()
  605. if index not in self.checked_pieces:
  606. df = self.hashcheck_piece(self.places[index])
  607. yield df
  608. passed = df.getResult()
  609. if not passed:
  610. # TODO: this case should cause a total file hash check and
  611. # reconnect when done.
  612. raise BTFailure, _("told file complete on start-up, but piece "
  613. "failed hash check")
  614. if begin + length > self._piecelen(index):
  615. #yield None
  616. raise ValueError("incorrect size: (%d + %d ==) %d >= %d" %
  617. (begin, length,
  618. begin + length, self._piecelen(index)))
  619. df = self._storage_read(self.places[index], length, offset=begin)
  620. yield df
  621. data = df.getResult()
  622. yield data
  623. def _storage_read(self, index, amount, offset=0):
  624. assert index >= 0
  625. return self.storage.read(index * self.piece_size + offset, amount)
  626. def _storage_write(self, index, data, offset=0):
  627. return self.storage.write(index * self.piece_size + offset, data)
  628. ## partials
  629. ############################################################################
  630. def _realize_partials(self, partials):
  631. self.amount_left_with_partials = self.amount_left
  632. for piece in partials:
  633. if self.places[piece] < 0:
  634. pos = partials[piece][0]
  635. self.places[piece] = pos
  636. self.rplaces[pos] = piece
  637. def _check_partial(self, pos, partials, data):
  638. index = None
  639. missing = False
  640. request_size = self.config['download_chunk_size']
  641. if self.partial_mark is None:
  642. i = struct.pack('>i', request_size)
  643. self.partial_mark = ("BitTorrent - this part has not been " +
  644. "downloaded yet." + self.infohash + i)
  645. marklen = len(self.partial_mark) + 4
  646. for i in xrange(0, len(data) - marklen, request_size):
  647. if data[i:i+marklen-4] == self.partial_mark:
  648. ind = struct.unpack('>i', data[i+marklen-4:i+marklen])[0]
  649. if index is None:
  650. index = ind
  651. parts = []
  652. if ind >= self.numpieces or ind != index:
  653. return
  654. parts.append(i)
  655. else:
  656. missing = True
  657. if index is not None and missing:
  658. i += request_size
  659. if i < len(data):
  660. parts.append(i)
  661. partials[index] = (pos, parts)
  662. def _make_pending(self, index, parts):
  663. length = self._piecelen(index)
  664. x = 0
  665. request_size = self.config['download_chunk_size']
  666. for x in xrange(0, length, request_size):
  667. if x not in parts:
  668. partlen = min(request_size, length - x)
  669. self.amount_left_with_partials -= partlen
  670. ############################################################################