| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # Written by Bram Cohen and Greg Hazel
- from __future__ import division
- from __future__ import generators
- import os
- import sys
- import struct
- import cPickle
- import logging
- from array import array
- from BTL.translation import _
- from BTL.obsoletepythonsupport import set
- from BTL.sparse_set import SparseSet
- from BTL.bitfield import Bitfield
- from BTL import defer
- from BTL.defer import wrap_task
- from BTL.yielddefer import launch_coroutine
- from BitTorrent import BTFailure
- from BTL.exceptions import str_exc
- from BTL.hash import sha
- NO_PLACE = -1
- ALLOCATED = -1
- UNALLOCATED = -2
- FASTRESUME_PARTIAL = -3
- global_logger = logging.getLogger('StorageWrapper')
- #global_logger.setLevel(logging.DEBUG)
- #global_logger.addHandler(logging.StreamHandler(sys.stdout))
- class DataPig(object):
- def __init__(self, read, add_task):
- self.add_task = add_task
- self.read = read
- self.failed_pieces = {}
- self.download_history = {}
- def got_piece(self, index, begin, length, source):
- if index in self.failed_pieces:
- df = launch_coroutine(wrap_task(self.add_task),
- self._got_piece,
- index, begin, length, source)
- return df
- self.download_history.setdefault(index, {})
- self.download_history[index][begin] = source
- def _got_piece(self, index, begin, piece, source):
- df = self.read(index, len(piece), offset=begin)
- yield df
- data = df.getResult()
- if data != piece:
- if (index in self.download_history and
- begin in self.download_history[index]):
- d = self.download_history[index][begin]
- self.failed_pieces[index].add(d)
- self.download_history.setdefault(index, {})
- self.download_history[index][begin] = source
- def finished_piece(self, index):
- for d in self.download_history[index].itervalues():
- d.good(index)
- del self.download_history[index]
- if index in self.failed_pieces:
- for d in self.failed_pieces[index]:
- d.bad(index)
- del self.failed_pieces[index]
- def failed_piece(self, index):
- self.failed_pieces[index] = set()
- allsenders = {}
- for d in self.download_history[index].itervalues():
- allsenders[d] = None
- if len(allsenders) == 1:
- culprit = allsenders.keys()[0]
- culprit.bad(index, bump = True)
- del self.failed_pieces[index] # found the culprit already
- current_version = 2
- resume_prefix = 'BitTorrent resume state file, version '
- version_string = resume_prefix + str(current_version)
- class StorageWrapper(object):
- READ_AHEAD_BUFFER_SIZE = 2**22 # 4mB
- def __init__(self, storage, rm, config, hashes, piece_size,
- statusfunc, doneflag, data_flunked,
- infohash, # needed for partials
- is_batch, errorfunc, working_path, destination_path, resumefile,
- add_task, external_add_task):
- assert len(hashes) > 0
- assert piece_size > 0
- self.initialized = False
- self.numpieces = len(hashes)
- self.infohash = infohash
- self.is_batch = is_batch
- self.add_task = add_task
- self.external_add_task = external_add_task
- self.storage = storage
- self.config = config
- self.doneflag = doneflag
- self.hashes = hashes
- self.piece_size = piece_size
- self.data_flunked = data_flunked
- self.errorfunc = errorfunc
- self.statusfunc = statusfunc
- self.total_length = storage.get_total_length()
- # a brief explanation about the mildly confusing amount_ variables:
- # amount_left: total_length - fully_written_pieces
- # amount_inactive: amount_left - blocks_written - requests_pending_on_network
- # amount_left_with_partials (only correct during startup): amount_left + blocks_written
- self.amount_left = self.total_length
- if self.total_length <= piece_size * (self.numpieces - 1):
- raise BTFailure(_("bad data in torrent file - total too small"))
- if self.total_length > piece_size * self.numpieces:
- raise BTFailure(_("bad data in torrent file - total too big"))
- self.have_callbacks = {}
- # a index => df dict for locking pieces
- self.blocking_pieces = {}
- self.have = Bitfield(self.numpieces)
- self.have_set = SparseSet()
- self.checked_pieces = SparseSet()
- self.fastresume = False
- self.fastresume_dirty = False
- self._pieces_in_buf = []
- self._piece_buf = None
- self.partial_mark = None
- if self.numpieces < 32768:
- self.typecode = 'h'
- else:
- self.typecode = 'l'
- self.rm = rm
- self.rm.amount_inactive = self.total_length
- read = lambda i, l, offset : self._storage_read(self.places[i], l,
- offset=offset)
- self.datapig = DataPig(read, self.add_task)
- self.places = array(self.typecode, [NO_PLACE] * self.numpieces)
- check_hashes = self.config['check_hashes']
- self.done_checking_df = defer.Deferred()
- self.lastlen = self._piecelen(self.numpieces - 1)
- global_logger.debug("Loading fastresume...")
- if not check_hashes:
- self.rplaces = array(self.typecode, range(self.numpieces))
- self.places = self.rplaces
- self.amount_left = 0
- self.rm.amount_inactive = self.amount_left
- self.amount_left_with_partials = self.rm.amount_inactive
- self.have.numfalse = 0
- self.have.bits = None
- self.have_set.add(0, self.numpieces)
- self._initialized(True)
- else:
- try:
- result = self.read_fastresume(resumefile, working_path,
- destination_path)
- # if resume file doesn't apply to this destination or
- # working path then start over.
- if not result:
- self.rplaces = array(self.typecode, [UNALLOCATED] * self.numpieces)
- # full hashcheck
- df = self.hashcheck_pieces()
- df.addCallback(self._initialized)
- except:
- # if resumefile is not None:
- # global_logger.warning("Failed to read fastresume",
- # exc_info=sys.exc_info())
- self.rplaces = array(self.typecode, [UNALLOCATED] * self.numpieces)
- # full hashcheck
- df = self.hashcheck_pieces()
- df.addCallback(self._initialized)
- def _initialized(self, v):
- self._pieces_in_buf = []
- self._piece_buf = None
- self.initialized = v
- global_logger.debug('Initialized')
- self.done_checking_df.callback(v)
- ## fastresume
- ############################################################################
- def read_fastresume(self, f, working_path, destination_path):
- version_line = f.readline().strip()
- try:
- resume_version = version_line.split(resume_prefix)[1]
- except Exception, e:
- raise BTFailure(_("Unsupported fastresume file format, "
- "probably corrupted: %s on (%s)") %
- (str_exc(e), repr(version_line)))
- global_logger.debug('Reading fastresume v' + resume_version)
- if resume_version == '1':
- return self._read_fastresume_v1(f, working_path, destination_path)
- elif resume_version == '2':
- return self._read_fastresume_v2(f, working_path, destination_path)
- else:
- raise BTFailure(_("Unsupported fastresume file format, "
- "maybe from another client version?"))
- def _read_fastresume_v1(self, f, working_path, destination_path):
- # skip a bunch of lines
- amount_done = int(f.readline())
- for b, e, filename in self.storage.ranges:
- line = f.readline()
- # now for the good stuff
- r = array(self.typecode)
- r.fromfile(f, self.numpieces)
- self.rplaces = r
- df = self.checkPieces_v1()
- df.addCallback(self._initialized)
- def checkPieces_v1(self):
- df = launch_coroutine(wrap_task(self.add_task),
- self._checkPieces_v1)
- return df
- def _checkPieces_v1(self):
- partials = {}
- needs_full_hashcheck = False
- for i in xrange(self.numpieces):
- piece_len = self._piecelen(i)
- t = self.rplaces[i]
- if t >= 0:
- self._markgot(t, i)
- elif t in (ALLOCATED, UNALLOCATED):
- pass
- elif t == FASTRESUME_PARTIAL:
- df = self._storage_read(i, piece_len)
- yield df
- try:
- data = df.getResult()
- except:
- global_logger.error(_("Bad fastresume info "
- "(truncation at piece %d)") % i)
- needs_full_hashcheck = True
- i -= 1
- break
- self._check_partial(i, partials, data)
- self.rplaces[i] = ALLOCATED
- # we're shutting down, abort.
- if self.doneflag.isSet():
- yield False
- else:
- global_logger.error(_("Bad fastresume info (illegal value at "
- "piece %d)") % i)
- needs_full_hashcheck = True
- i -= 1
- break
- if needs_full_hashcheck:
- df = self.hashcheck_pieces(i)
- yield df
- r = df.getResult()
- if r == False:
- yield False
- self._realize_partials(partials)
- yield True
- def _read_fastresume_v2(self, f, working_path, destination_path):
- # The working and destination paths are "save_as" paths meaning
- # that they refer to the entire path for a single-file torrent and the
- # name of the directory containing the files for a batch torrent.
- # Path read from resume should either reside in/at the
- # working_path or the destination_path.
- d = cPickle.loads(f.read())
- try:
- snapshot = d['snapshot']
- work_or_dest = 0
- for filename, s in snapshot.iteritems():
- # all files should reside in either the working path or the
- # destination path. For batch torrents, the file may have a
- # relative path so compare common path.
- if self.is_batch:
- commonw = os.path.commonprefix((filename, working_path))
- commond = os.path.commonprefix((filename, destination_path))
- else:
- commonw = commond = filename
- # first file determines whether all are in work or dest path.
- if work_or_dest == 0:
- if commonw == working_path:
- work_or_dest = -1
- elif commond == destination_path:
- work_or_dest = 1
- else:
- return False
- elif work_or_dest == -1 and commonw != working_path:
- return False
- elif work_or_dest == 1 and commond != destination_path:
- return False
- # this could be a lot smarter, like punching holes in the
- # ranges on failed files in a batch torrent.
- if not os.path.exists(filename):
- raise ValueError("No such file or directory: %s" % filename)
- if os.path.getsize(filename) < s['size']:
- raise ValueError("File sizes do not match.")
- if os.path.getmtime(filename) < (s['mtime'] - 5):
- raise ValueError("File modification times do not match.")
- self.places = array(self.typecode)
- self.places.fromstring(d['places'])
- self.rplaces = array(self.typecode)
- self.rplaces.fromstring(d['rplaces'])
- self.have = d['have']
- self.have_set = d['have_set']
- # We are reading the undownloaded section from the fast resume.
- # We should check whether the file exists. If it doesn't then
- # we should not read from fastresume.
- self.storage.undownloaded = d['undownloaded']
- self.amount_left = d['amount_left']
- assert self.amount_left >= 0
- self.rm.amount_inactive = self.amount_left
- # all unwritten partials are now inactive
- for k, v in d['unwritten_partials'].iteritems():
- self.rm.add_inactive(k, v)
- # these are equal at startup, because nothing has been requested
- self.amount_left_with_partials = self.rm.amount_inactive
- if self.amount_left_with_partials < 0:
- raise ValueError("Amount left < 0: %d" %
- self.amount_left_with_partials)
- if self.amount_left_with_partials > self.total_length:
- raise ValueError("Amount left > total length: %d > %d" %
- (self.amount_left_with_partials, self.total_length))
- self._initialized(True)
- except:
- self.amount_left = self.total_length
- self.have = Bitfield(self.numpieces)
- self.have_set = SparseSet()
- self.rm.inactive_requests = {}
- self.rm.active_requests = {}
- self.places = array(self.typecode, [NO_PLACE] * self.numpieces)
- self.rplaces = array(self.typecode, range(self.numpieces))
- raise
- return True
- def write_fastresume(self, resumefile):
- try:
- self._write_fastresume_v2(resumefile)
- except:
- global_logger.exception("write_fastresume failed")
- def _write_fastresume_v2(self, resumefile):
- if not self.initialized:
- return
- global_logger.debug('Writing fast resume: %s' % version_string)
- resumefile.write(version_string + '\n')
- d = {}
- snapshot = {}
- for filename in self.storage.range_by_name.iterkeys():
- if not os.path.exists(filename):
- continue
- s = {}
- s['size'] = os.path.getsize(filename)
- s['mtime'] = os.path.getmtime(filename)
- snapshot[filename] = s
- d['snapshot'] = snapshot
- d['places'] = self.places.tostring()
- d['rplaces'] = self.rplaces.tostring()
- d['have'] = self.have
- d['have_set'] = self.have_set
- d['undownloaded'] = self.storage.undownloaded
- d['amount_left'] = self.amount_left
- d['unwritten_partials'] = self.rm.get_unwritten_requests()
- resumefile.write(cPickle.dumps(d))
- self.fastresume_dirty = False
- ############################################################################
- def _markgot(self, piece, pos):
- if self.have[piece]:
- if piece != pos:
- return
- self.rplaces[self.places[pos]] = ALLOCATED
- self.places[pos] = self.rplaces[pos] = pos
- return
- self.places[piece] = pos
- self.rplaces[pos] = piece
- self.have[piece] = True
- self.have_set.add(piece)
- plen = self._piecelen(piece)
- self.storage.downloaded(self.piece_size * piece, plen)
- self.amount_left -= plen
- assert self.amount_left >= 0
- self.rm.amount_inactive -= plen
- if piece in self.have_callbacks:
- for c in self.have_callbacks.pop(piece):
- c.callback(None)
- assert piece not in self.rm.inactive_requests
- ## hashcheck
- ############################################################################
- def _get_data(self, i):
- if i in self._pieces_in_buf:
- p = i - self._pieces_in_buf[0]
- return buffer(self._piece_buf, p * self.piece_size, self._piecelen(i))
- df = launch_coroutine(wrap_task(self.add_task),
- self._get_data_gen, i)
- return df
- def _get_data_gen(self, i):
- num_pieces = int(max(1, self.READ_AHEAD_BUFFER_SIZE / self.piece_size))
- if i + num_pieces >= self.numpieces:
- size = self.total_length - (i * self.piece_size)
- num_pieces = self.numpieces - i
- else:
- size = num_pieces * self.piece_size
- self._pieces_in_buf = range(i, i + num_pieces)
- df = self._storage_read(i, size)
- yield df
- try:
- self._piece_buf = df.getResult()
- except BTFailure: # short read
- self._piece_buf = ''
- p = i - self._pieces_in_buf[0]
- yield buffer(self._piece_buf, p * self.piece_size, self._piecelen(i))
- def hashcheck_pieces(self, begin=0, end=None):
- df = launch_coroutine(wrap_task(self.add_task),
- self._hashcheck_pieces,
- begin, end)
- return df
- def _hashcheck_pieces(self, begin=0, end=None):
- # we need a full reverse-lookup of hashes for out of order compatability
- targets = {}
- for i in xrange(self.numpieces):
- targets[self.hashes[i]] = i
- partials = {}
- if end is None:
- end = self.numpieces
- global_logger.debug('Hashcheck from %d to %d' % (begin, end))
- # TODO: make this work with more than one running at a time
- for i in xrange(begin, end):
- # we're shutting down, abort.
- if self.doneflag.isSet():
- yield False
- piece_len = self._piecelen(i)
- global_logger.debug( "i=%d, piece_len=%d" % (i,piece_len) )
- if not self._waspre(i, piece_len):
- # hole in the file
- continue
- r = self._get_data(i)
- if isinstance(r, defer.Deferred):
- yield r
- data = r.getResult()
- else:
- data = r
- sh = sha(buffer(data, 0, self.lastlen))
- sp = sh.digest()
- sh.update(buffer(data, self.lastlen))
- s = sh.digest()
- # handle out-of-order pieces
- if s in targets and piece_len == self._piecelen(targets[s]):
- # handle one or more pieces with identical hashes properly
- piece_found = i
- if s != self.hashes[i]:
- piece_found = targets[s]
- self.checked_pieces.add(piece_found)
- self._markgot(piece_found, i)
- # last piece junk. I'm not even sure this is right.
- elif (not self.have[self.numpieces - 1] and
- sp == self.hashes[-1] and
- (i == self.numpieces - 1 or
- not self._waspre(self.numpieces - 1))):
- self.checked_pieces.add(self.numpieces - 1)
- self._markgot(self.numpieces - 1, i)
- else:
- self._check_partial(i, partials, data)
- self.statusfunc(fractionDone = 1 - self.amount_left /
- self.total_length)
- global_logger.debug('Hashcheck from %d to %d complete.' % (begin, end))
- self._realize_partials(partials)
- self.fastresume_dirty = True
- yield True
- def hashcheck_piece(self, index, data = None):
- df = launch_coroutine(wrap_task(self.add_task),
- self._hashcheck_piece,
- index, data = data)
- return df
- def _hashcheck_piece(self, index, data = None):
- if not data:
- df = self._storage_read(index, self._piecelen(index))
- yield df
- data = df.getResult()
- if sha(data).digest() != self.hashes[index]:
- yield False
- self.checked_pieces.add(index)
- yield True
- ############################################################################
- ## out of order compatability
- ############################################################################
- def _initalloc(self, pos, piece):
- assert self.rplaces[pos] < 0
- assert self.places[piece] == NO_PLACE
- p = self.piece_size * pos
- length = self._piecelen(pos)
- self.places[piece] = pos
- self.rplaces[pos] = piece
- def _move_piece(self, oldpos, newpos):
- assert self.rplaces[newpos] < 0
- assert self.rplaces[oldpos] >= 0
- df = self._storage_read(oldpos, self._piecelen(newpos))
- yield df
- data = df.getResult()
- df = self._storage_write(newpos, data)
- yield df
- df.getResult()
- piece = self.rplaces[oldpos]
- self.places[piece] = newpos
- self.rplaces[oldpos] = ALLOCATED
- self.rplaces[newpos] = piece
- if not self.have[piece]:
- return
- data = buffer(data, 0, self._piecelen(piece))
- if sha(data).digest() != self.hashes[piece]:
- raise BTFailure(_("data corrupted on disk - "
- "maybe you have two copies running?"))
- ############################################################################
- def get_piece_range_for_filename(self, filename):
- begin, end = self.storage.get_byte_range_for_filename(filename)
- begin = int(begin / self.piece_size)
- end = int(end / self.piece_size)
- return begin, end
- def _waspre(self, piece, piece_len=None):
- if piece_len is None:
- piece_len = self._piecelen(piece)
- return self.storage.was_preallocated(piece * self.piece_size, piece_len)
- def _piecelen(self, piece):
- if piece < self.numpieces - 1:
- return self.piece_size
- else:
- return self.total_length - piece * self.piece_size
- def get_total_length(self):
- """Returns the total length of the torrent in bytes."""
- return self.total_length
- def get_num_pieces(self):
- """Returns the total number of pieces in this torrent."""
- return self.numpieces
- def get_amount_left(self):
- """Returns the number of bytes left to download."""
- return self.amount_left
- def do_I_have_anything(self):
- return self.amount_left < self.total_length
- def get_have_list(self):
- return self.have.tostring()
- def do_I_have(self, index):
- return self.have[index]
- def _block_piece(self, index, df):
- self.blocking_pieces[index] = df
- df.addCallback(lambda x: self.blocking_pieces.pop(index))
- return df
- def write(self, index, begin, piece, source):
- df = launch_coroutine(wrap_task(self.add_task),
- self._write,
- index, begin, piece, source)
- return df
- def _write(self, index, begin, piece, source):
- if index in self.blocking_pieces:
- df = self.blocking_pieces[index]
- yield df
- df.getResult()
- if self.places[index] < 0:
- # since old versions of BT wrote out-of-order, we could
- # come across a piece which is misplaced. move it to the
- # correct place.
- if self.rplaces[index] >= 0:
- new_pos = self.rplaces[index]
- df = launch_coroutine(wrap_task(self.add_task),
- self._move_piece, index, new_pos)
- yield self._block_piece(index, df)
- df.getResult()
- self._initalloc(index, index)
- df = self.datapig.got_piece(index, begin, piece, source)
- if df is not None:
- yield df
- df.getResult()
- df = self._storage_write(self.places[index], piece, offset=begin)
- yield df
- df.getResult()
- self.rm.request_received(index, begin, len(piece))
- hashcheck = self.rm.is_piece_received(index)
- if hashcheck:
- df = self.hashcheck_piece(self.places[index])
- yield df
- passed = df.getResult()
- self.rm.piece_finished(index)
- length = self._piecelen(index)
- if passed:
- self.have[index] = True
- self.have_set.add(index)
- self.storage.downloaded(index * self.piece_size, length)
- self.amount_left -= length
- assert self.amount_left >= 0
- self.datapig.finished_piece(index)
- if index in self.have_callbacks:
- for c in self.have_callbacks.pop(index):
- c.callback(None)
- else:
- self.data_flunked(length, index)
- self.rm.amount_inactive += length
- self.datapig.failed_piece(index)
- self.fastresume_dirty = True
- yield hashcheck
- def get_piece(self, index):
- if not self.have[index]:
- df = defer.Deferred()
- self.have_callbacks.setdefault(index, []).append(df)
- yield df
- df.getResult()
- assert self.have[index]
- df = self.read(index, 0, self._piecelen(index))
- yield df
- r = df.getResult()
- yield r
- def read(self, index, begin, length):
- if not self.have[index]:
- raise IndexError("Do not have piece %d of %d" %
- (index, self.numpieces))
- df = launch_coroutine(wrap_task(self.add_task),
- self._read, index, begin, length)
- return df
- def _read(self, index, begin, length):
- if index in self.blocking_pieces:
- df = self.blocking_pieces[index]
- yield df
- df.getResult()
- if index not in self.checked_pieces:
- df = self.hashcheck_piece(self.places[index])
- yield df
- passed = df.getResult()
- if not passed:
- # TODO: this case should cause a total file hash check and
- # reconnect when done.
- raise BTFailure, _("told file complete on start-up, but piece "
- "failed hash check")
- if begin + length > self._piecelen(index):
- #yield None
- raise ValueError("incorrect size: (%d + %d ==) %d >= %d" %
- (begin, length,
- begin + length, self._piecelen(index)))
- df = self._storage_read(self.places[index], length, offset=begin)
- yield df
- data = df.getResult()
- yield data
- def _storage_read(self, index, amount, offset=0):
- assert index >= 0
- return self.storage.read(index * self.piece_size + offset, amount)
- def _storage_write(self, index, data, offset=0):
- return self.storage.write(index * self.piece_size + offset, data)
- ## partials
- ############################################################################
- def _realize_partials(self, partials):
- self.amount_left_with_partials = self.amount_left
- for piece in partials:
- if self.places[piece] < 0:
- pos = partials[piece][0]
- self.places[piece] = pos
- self.rplaces[pos] = piece
- def _check_partial(self, pos, partials, data):
- index = None
- missing = False
- request_size = self.config['download_chunk_size']
- if self.partial_mark is None:
- i = struct.pack('>i', request_size)
- self.partial_mark = ("BitTorrent - this part has not been " +
- "downloaded yet." + self.infohash + i)
- marklen = len(self.partial_mark) + 4
- for i in xrange(0, len(data) - marklen, request_size):
- if data[i:i+marklen-4] == self.partial_mark:
- ind = struct.unpack('>i', data[i+marklen-4:i+marklen])[0]
- if index is None:
- index = ind
- parts = []
- if ind >= self.numpieces or ind != index:
- return
- parts.append(i)
- else:
- missing = True
- if index is not None and missing:
- i += request_size
- if i < len(data):
- parts.append(i)
- partials[index] = (pos, parts)
- def _make_pending(self, index, parts):
- length = self._piecelen(index)
- x = 0
- request_size = self.config['download_chunk_size']
- for x in xrange(0, length, request_size):
- if x not in parts:
- partlen = min(request_size, length - x)
- self.amount_left_with_partials -= partlen
- ############################################################################
|