| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # Written by Bram Cohen and Uoti Urpala
- from __future__ import division
- from __future__ import generators
- import os
- import gc
- import sys
- import errno
- import shutil
- import random
- import socket
- import cPickle
- import logging
- import itertools
- from BTL.translation import _
- from BitTorrent.NamedMutex import NamedMutex
- import BTL.stackthreading as threading
- from BitTorrent.platform import is_path_too_long, no_really_makedirs
- from BTL.platform import bttime, get_filesystem_encoding
- from BitTorrent.ConnectionManager import ConnectionManager
- from BitTorrent import PeerID
- from BTL.exceptions import str_exc
- from BTL.defer import ThreadedDeferred, Failure, wrap_task
- from BTL.yielddefer import launch_coroutine
- from BitTorrent.TorrentStats import TorrentStats
- from BitTorrent.RateMeasure import RateMeasure
- from BitTorrent.PiecePicker import PiecePicker
- from BitTorrent.Rerequester import Rerequester, DHTRerequester
- from BitTorrent.CurrentRateMeasure import Measure
- from BitTorrent.Storage import Storage, UnregisteredFileException
- from BitTorrent.HTTPConnector import URLage
- from BitTorrent.StorageWrapper import StorageWrapper
- from BitTorrent.RequestManager import RequestManager
- from BitTorrent.Upload import Upload
- from BitTorrent.MultiDownload import MultiDownload
- from BitTorrent import BTFailure, UserFailure
- from BitTorrent.prefs import Preferences
- from khashmir import const
- class Feedback(object):
- """Inidivual torrents (Torrent) perform callbacks regarding
- changes of state to the rest of the program via a Feedback
- object."""
- def finished(self, torrent):
- pass
- def failed(self, torrent):
- pass
- def error(self, torrent, level, text):
- pass
- def exception(self, torrent, text):
- self.error(torrent, logging.CRITICAL, text)
- def started(self, torrent):
- pass
- class FeedbackMultiplier(object):
- def __init__(self, *a):
- self.chain = list(a)
- def __getattr__(self, attr):
- def multiply_calls(*a, **kw):
- exc_info = None
- for x in self.chain:
- try:
- getattr(x, attr)(*a, **kw)
- except:
- exc_info = sys.exc_info()
- if exc_info:
- raise exc_info[0], exc_info[1], exc_info[2]
- return multiply_calls
- class Torrent(object):
- """Represents a single file transfer or transfer for a batch of files
- in the case of a batch torrent. During the course of a single
- transfer, a Torrent may have many different connections to peers."""
- STATES = ["created", "initializing", "initialized", "running",
- "finishing", "failed"]
- POLICIES = ["stop", "start", "auto"]
- PRIORITIES = ["low", "normal", "high"]
- def __init__(self, metainfo, working_path, destination_path, config,
- data_dir, rawserver, choker,
- singleport_listener, ratelimiter, down_ratelimiter,
- total_downmeasure,
- filepool, dht, feedback, log_root,
- hidden=False, is_auto_update=False):
- # The passed working path and destination_path should be filesystem
- # encoded or should be unicode if the filesystem supports unicode.
- fs_encoding = get_filesystem_encoding()
- assert (
- (fs_encoding == None and isinstance(working_path, unicode)) or
- (fs_encoding != None and isinstance(working_path, str))
- ), "working path encoding problem"
- assert (
- (fs_encoding == None and isinstance(destination_path, unicode)) or
- (fs_encoding != None and isinstance(destination_path, str))
- ), "destination path encoding problem"
- self.state = "created"
- self.data_dir = data_dir
- self.feedback = FeedbackMultiplier(feedback)
- self.finished_this_session = False
- self._rawserver = rawserver
- self._singleport_listener = singleport_listener
- self._ratelimiter = ratelimiter
- self._down_ratelimiter = down_ratelimiter
- self._filepool = filepool
- self._dht = dht
- self._choker = choker
- self._total_downmeasure = total_downmeasure
- self._init()
- self._announced = False
- self._listening = False
- self.reserved_ports = []
- self.reported_port = None
- self._myfiles = None
- self._last_myfiles = None
- self.total_bytes = None
- self._doneflag = threading.Event()
- self.finflag = threading.Event()
- self._contfunc = None
- self._activity = (_("Initial startup"), 0)
- self._pending_file_priorities = []
- self._mutex = None
- self.time_created = bttime()
- self.time_started = None
- self.metainfo = metainfo
- self.infohash = metainfo.infohash
- self.log_root = log_root
- self.logger = logging.getLogger(log_root + '.' + repr(self.infohash))
- self.logger.setLevel(logging.DEBUG)
- self.total_bytes = metainfo.total_bytes
- if not metainfo.reported_errors:
- metainfo.show_encoding_errors(self._error)
- self.config = Preferences(config)#, persist_callback=self._dump_torrent_config)
- self.working_path = working_path #sets in config. See _set_working_path
- self.destination_path = destination_path # sets in config.
- self.priority = "normal"
- self.policy = "auto"
- self.hidden = hidden #sets in config
- self.is_auto_update = is_auto_update #sets in config
- self._completed = False
- self.config['finishtime'] = 0
- self.uptotal = 0
- self.uptotal_old = 0
- self.downtotal = 0
- self.downtotal_old = 0
- self.context_valid = True
- def _init(self):
- self._picker = None
- self._storage = None
- self._storagewrapper = None
- self._ratemeasure = None
- self._upmeasure = None
- self._downmeasure = None
- self._connection_manager = None
- self._rerequest = None
- self._dht_rerequest = None
- self._statuscollector = None
- self._rm = None
- self.multidownload = None
- def update_config(self, config):
- self.config.update(config)
- d = self.config.get('file_priorities', {})
- for k, v in d.iteritems():
- self.set_file_priority(k, v)
- if self.policy not in self.POLICIES:
- self.policy = "auto"
- if self.priority not in self.PRIORITIES:
- self.priority = "normal"
- def _set_state(self, value):
- assert value in self.STATES, ("value %s not in STATES %s" %
- (value, self.STATES))
- self._state = value
- def _get_state(self):
- return self._state
- state = property(_get_state, _set_state)
- def _set_policy(self, value):
- assert value in self.POLICIES, ("value %s not in POLICIES %s" %
- (value, self.POLICIES))
- self.config['policy'] = value
- def _get_policy(self):
- return self.config['policy']
- policy = property(_get_policy, _set_policy)
- def _set_priority(self, value):
- assert value in self.PRIORITIES, ("value %s not in PRIORITIES %s" %
- (value, self.PRIORITIES))
- self.config['priority'] = value
- def _get_priority(self):
- return self.config['priority']
- priority = property(_get_priority, _set_priority)
- def _set_hidden(self, value):
- self.config['hidden'] = value
- def _get_hidden(self):
- return self.config['hidden']
- hidden = property(_get_hidden, _set_hidden)
- def _set_is_auto_update(self, value):
- self.config['is_auto_update'] = value
- def _get_is_auto_update(self):
- return self.config['is_auto_update']
- is_auto_update = property(_get_is_auto_update, _set_is_auto_update)
- def _set_completed(self, val):
- self._completed = val
- if val:
- self.config['finishtime'] = bttime()
- def _get_completed(self):
- return self._completed
- completed = property(_get_completed, _set_completed)
- def _set_sent_completed(self, value):
- self.config['sent_completed'] = value
- def _get_sent_completed(self):
- return self.config['sent_completed']
- sent_completed = property(_get_sent_completed, _set_sent_completed)
- def _get_finishtime(self):
- return self.config['finishtime']
- finishtime = property(_get_finishtime)
- def _set_destination_path(self, value):
- # The following assertion will not always work. Consider
- # Torrent.py: self.working_path = self.destination_path
- # This assignment retrieves a unicode path from
- # config['destination_path'].
- #assert isinstance(value,str) # assume filesystem encoding.
- #
- # The following if statement is not necessary because config here
- # is not really a config file, but rather state that is pickled when
- # the Torrent shuts down.
- #if isinstance(value, str):
- # value = decode_from_filesystem(value)
- self.config['destination_path'] = value
- def _get_destination_path(self):
- return self.config['destination_path']
- destination_path = property(_get_destination_path, _set_destination_path)
- def _set_working_path(self, value):
- # See comments for _set_destination_path.
- self.config['working_path'] = value
- def _get_working_path(self):
- return self.config['working_path']
- working_path = property(_get_working_path, _set_working_path)
- def __cmp__(self, other):
- if not isinstance(other, Torrent):
- raise TypeError("Torrent.__cmp__(x,y) requires y to be a 'Torrent',"
- " not a '%s'" % type(other))
- return cmp(self.metainfo.infohash, other.metainfo.infohash)
- def is_initialized(self):
- return self.state not in ["created", "initializing", "failed"]
- def is_running(self):
- return self.state == "running"
- def is_context_valid(self):
- return self.context_valid
- def _context_wrap(self, _f, *a, **kw):
- # this filters out calls
- # to an invalid torrent
- # sloppy technique
- if not self.context_valid:
- return
- try:
- _f(*a, **kw)
- except KeyboardInterrupt:
- raise
- except:
- self.got_exception(Failure())
- # these wrappers add _context_wrap to the chain, so that calls on a dying
- # object are filtered, and errors on a valid call are logged.
- def add_task(self, delay, func, *a, **kw):
- return self._rawserver.add_task(delay, self._context_wrap,
- func, *a, **kw)
- def external_add_task(self, delay, func, *a, **kw):
- return self._rawserver.external_add_task(delay, self._context_wrap,
- func, *a, **kw)
- def _register_files(self):
- if self.metainfo.is_batch:
- myfiles = [os.path.join(self.destination_path, f) for f in
- self.metainfo.files_fs]
- else:
- myfiles = [self.destination_path, ]
- for filename in myfiles:
- if is_path_too_long(filename):
- raise BTFailure("Filename path exceeds platform limit: %s" % filename)
- # if the destination path contains any of the files in the torrent
- # then use the destination path instead of the working path.
- if len([x for x in myfiles if os.path.exists(x)]) > 0:
- self.working_path = self.destination_path
- else:
- if self.metainfo.is_batch:
- myfiles = [os.path.join(self.working_path, f) for f in
- self.metainfo.files_fs]
- else:
- myfiles = [self.working_path, ]
- assert self._myfiles == None, '_myfiles should be None!'
- self._filepool.add_files(myfiles, self)
- self._myfiles = myfiles
- def _build_url_mapping(self):
- # TODO: support non [-1] == '/' urls
- url_suffixes = []
- if self.metainfo.is_batch:
- for filename in self.metainfo.orig_files:
- path = '%s/%s' % (self.metainfo.name, filename)
- # am I right that orig_files could have windows paths?
- path = path.replace('\\', '/')
- url_suffixes.append(path)
- else:
- url_suffixes = [self.metainfo.name, ]
- self._url_suffixes = url_suffixes
- total = 0
- piece_size = self.metainfo.piece_length
- self._urls = zip(self._url_suffixes, self.metainfo.sizes)
- def _unregister_files(self):
- if self._myfiles is not None:
- self._filepool.remove_files(self._myfiles)
- self._last_myfiles = self._myfiles
- self._myfiles = None
- def initialize(self):
- self.context_valid = True
- assert self.state in ["created", "failed", "finishing"], "state not in set"
- self.state = "initializing"
- df = launch_coroutine(wrap_task(self.add_task), self._initialize)
- df.addErrback(self.got_exception)
- return df
- # this function is so nasty!
- def _initialize(self):
- self._doneflag = threading.Event()
- # only one torrent object for of a particular infohash at a time.
- # Note: This must be done after doneflag is created if shutdown()
- # is to be called from got_exception().
- if self.config["one_download_per_torrent"]:
- self._mutex = NamedMutex(self.infohash.encode("hex"))
- if not self._mutex.acquire(False):
- try:
- raise UserFailure(_("Torrent already being downloaded or "
- "seeded." ))
- except UserFailure, e:
- # perform exception handling including shutting down
- # the torrent.
- self.got_exception(Failure(),
- cannot_shutdown=True)
- return
- self.reported_port = self.config['forwarded_port']
- if not self.reported_port:
- self.reported_port = \
- self._singleport_listener.get_port(self.change_port)
- if self.reported_port:
- self.reserved_ports.append(self.reported_port)
- # backward compatibility with older 5.0 development versions
- if self.destination_path == "":
- try:
- self.destination_path = self.config['save_as']
- except:
- pass
- if self.working_path == "":
- self.working_path = self.destination_path
- self._myid = self._make_id()
- random.seed(self._myid)
- self._build_url_mapping()
- self._urlage = URLage(self._urls)
- self._register_files()
- self.logger.debug("_initialize: self.working_path=%s", self.working_path)
- self._storage = Storage(self.config, self._filepool, self.working_path,
- zip(self._myfiles, self.metainfo.sizes),
- self.add_task, self.external_add_task,
- self._doneflag)
- df = self._storage.startup_df
- yield df
- if df.getResult() != True:
- # initialization was aborted
- self.logger.debug("_initialize: initialization aborted")
- return
- self.logger.debug("_initialize: returned from Storage startup.")
- resumefile = None
- if self.data_dir:
- filename = os.path.join(self.data_dir, 'resume',
- self.infohash.encode('hex'))
- if os.path.exists(filename):
- try:
- resumefile = file(filename, 'rb')
- except Exception, e:
- self._error(logging.WARNING,
- _("Could not load fastresume data: %s") % str_exc(e)
- + ' ' + _("Will perform full hash check."))
- if resumefile is not None:
- resumefile.close()
- resumefile = None
- def data_flunked(amount, index):
- self._ratemeasure.data_rejected(amount)
- self._error(logging.INFO,
- _("piece %d failed hash check, re-downloading it")
- % index)
- def errorfunc(level, text):
- def e():
- self._error(level, text)
- self.external_add_task(0, e)
- def statusfunc(activity = None, fractionDone = 0):
- if activity is None:
- activity = self._activity[0]
- self._activity = (activity, fractionDone)
- numpieces = len(self.metainfo.hashes)
- self._rm = RequestManager(self.config['download_chunk_size'],
- self.metainfo.piece_length, numpieces,
- self._storage.get_total_length())
- self._storagewrapper = StorageWrapper(self._storage, self._rm,
- self.config,
- self.metainfo.hashes,
- self.metainfo.piece_length,
- statusfunc, self._doneflag,
- data_flunked, self.infohash,
- self.metainfo.is_batch,
- errorfunc, self.working_path,
- self.destination_path,
- resumefile,
- self.add_task,
- self.external_add_task)
- self._rm.set_storage(self._storagewrapper)
- df = self._storagewrapper.done_checking_df
- yield df
- if df.getResult() != True:
- # initialization was aborted
- return
- if resumefile is not None:
- resumefile.close()
- self._upmeasure = Measure(self.config['max_rate_period'])
- self._downmeasure = Measure(self.config['max_rate_period'])
- self._ratemeasure = RateMeasure(self._storagewrapper.amount_left_with_partials)
- self._picker = PiecePicker(self.config, numpieces,
- self._storagewrapper.have_set.iterneg(0, numpieces))
- self._periodic_save_fastresume()
- while self._pending_file_priorities:
- self.set_file_priority(*self._pending_file_priorities.pop())
- def kickpeer(connection):
- def kick():
- connection.close()
- self.add_task(0, kick)
- def banpeer(ip):
- self._connection_manager.ban(ip)
- md = MultiDownload(self.config, self._storagewrapper, self._rm,
- self._urlage, self._picker, numpieces,
- self.finished, self.got_exception, kickpeer, banpeer,
- self._downmeasure.get_rate)
- md.add_useful_received_listener(self._total_downmeasure.update_rate)
- md.add_useful_received_listener(self._downmeasure.update_rate)
- md.add_useful_received_listener(self._ratemeasure.data_came_in)
- md.add_raw_received_listener(self._down_ratelimiter.update_rate)
- self.multidownload = md
- # HERE. Yipee! Uploads are created by callback while Download
- # objects are created by MultiDownload. --Dave
- def make_upload(connector):
- up = Upload(self.multidownload, connector, self._ratelimiter,
- self._choker,
- self._storagewrapper, self.config['max_chunk_length'],
- self.config['max_rate_period'],
- self.config['num_fast'], self.infohash)
- connector.add_sent_listener(self._upmeasure.update_rate)
- return up
- if self._dht:
- addContact = self._dht.addContact
- else:
- addContact = None
- df = self.metainfo.get_tracker_ips(wrap_task(self.external_add_task))
- yield df
- tracker_ips = df.getResult()
- self._connection_manager = \
- ConnectionManager(make_upload, self.multidownload, self._choker,
- numpieces, self._ratelimiter,
- self._rawserver, self.config, self.metainfo.is_private,
- self._myid, self.add_task, self.infohash, self, addContact,
- 0, tracker_ips, self.log_root)
- self.multidownload.attach_connection_manager(self._connection_manager)
- self._statuscollector = TorrentStats(self.logger, self._choker,
- self.get_uprate, self.get_downrate, self._upmeasure.get_total,
- self._downmeasure.get_total, self._ratemeasure.get_time_left,
- self.get_percent_complete, self.multidownload.aggregate_piece_states,
- self.finflag, self._connection_manager, self.multidownload,
- self.get_file_priorities, self._myfiles,
- self._connection_manager.ever_got_incoming, None)
- self.state = "initialized"
- def _rerequest_op(self):
- # weee hee hee
- class Caller(object):
- def __getattr__(s, attr):
- def rerequest_function(*a, **kw):
- if self._rerequest:
- f = getattr(self._rerequest, attr)
- f(*a, **kw)
- if self._dht_rerequest:
- f = getattr(self._dht_rerequest, attr)
- f(*a, **kw)
- return rerequest_function
- return Caller()
- def start_download(self):
- assert self.state == "initialized", "state not initialized"
- self.time_started = bttime()
- self._down_ratelimiter.add_throttle_listener(self._connection_manager)
- self._connection_manager.reopen(self.reported_port)
- self._singleport_listener.add_torrent(self.infohash,
- self._connection_manager)
- self._listening = True
- # the DHT is broken
- if self.metainfo.is_trackerless or not self.metainfo.is_private:
- #if self.metainfo.is_trackerless:
- if not self._dht and self.metainfo.is_trackerless:
- self._error(self, logging.CRITICAL,
- _("Attempt to download a trackerless torrent "
- "with trackerless client turned off."))
- return
- else:
- if self._dht:
- nodes = self._dht.table.findNodes(self.metainfo.infohash,
- invalid=False)
- if len(nodes) < const.K:
- for host, port in self.metainfo.nodes:
- df = self._rawserver.gethostbyname(host)
- df.addCallback(self._dht.addContact, port)
- df.addLogback(self.logger.warning, "Resolve failed")
- self._dht_rerequest = DHTRerequester(self.config,
- self.add_task,
- self._connection_manager.how_many_connections,
- self._connection_manager.start_connection,
- self.external_add_task,
- self._rawserver,
- self._storagewrapper.get_amount_left,
- self._upmeasure.get_total,
- self._downmeasure.get_total, self.reported_port,
- self._myid,
- self.infohash, self._error, self.finflag,
- self._upmeasure.get_rate,
- self._downmeasure.get_rate,
- self._connection_manager.ever_got_incoming,
- self._no_announce_shutdown, self._announce_done,
- self._dht)
- if not self.metainfo.is_trackerless:
- self._rerequest = Rerequester(self.metainfo.announce,
- self.metainfo.announce_list, self.config,
- self.add_task, self.external_add_task, self._rawserver,
- self._connection_manager.how_many_connections,
- self._connection_manager.start_connection,
- self._storagewrapper.get_amount_left,
- self._upmeasure.get_total, self._downmeasure.get_total,
- self.reported_port, self._myid,
- self.infohash, self._error, self.finflag,
- self._upmeasure.get_rate,
- self._downmeasure.get_rate,
- self._connection_manager.ever_got_incoming,
- self._no_announce_shutdown, self._announce_done,
- bool(self._dht_rerequest))
- self._statuscollector.rerequester = self._rerequest or self._dht_rerequest
- self.multidownload.rerequester = self._rerequest or self._dht_rerequest
- self._announced = True
- if self._dht and len(self._dht.table.findNodes(self.infohash)) == 0:
- self.add_task(5, self._dht.findCloseNodes)
- self._rerequest_op().begin()
- for url_prefix in self.metainfo.url_list:
- self._connection_manager.start_http_connection(url_prefix)
- self.state = "running"
- if not self.finflag.isSet():
- self._activity = (_("downloading"), 0)
- self.feedback.started(self)
- if self._storagewrapper.amount_left == 0 and not self.completed:
- # By default, self.finished() resets the policy to "auto",
- # but if we discover on startup that we are already finished,
- # we don't want to reset it.
- # Also, if we discover on startup that we are already finished,
- # don't set finished_this_session.
- self.finished(policy=self.policy, finished_this_session=False)
- def stop_download(self, pause=False):
- assert self.state == "running", "state not running"
- self.state = "initialized"
- if not self.finflag.isSet():
- self._activity = (_("stopped"), 0)
- if self._announced:
- self._rerequest_op().announce_stop()
- self._announced = False
- self._statuscollector.rerequester = None
- self.multidownload.rerequester = None
- if self._listening:
- self._singleport_listener.remove_torrent(self.infohash)
- self._listening = False
- for port in self.reserved_ports:
- self._singleport_listener.release_port(port, self.change_port)
- del self.reserved_ports[:]
- if self._connection_manager is not None:
- if pause:
- self._down_ratelimiter.remove_throttle_listener(
- self._connection_manager )
- self._connection_manager.throttle_connections()
- else:
- self._connection_manager.close_connections()
- if self.config['check_hashes']:
- self._save_fastresume()
- def shutdown(self):
- # use _rawserver.add_task directly here, because we want the callbacks
- # to happen even though _shutdown is about to invalidate this torrent's
- # context
- df = launch_coroutine(wrap_task(self._rawserver.add_task), self._shutdown)
- df.addErrback(self.got_exception, cannot_shutdown=True)
- return df
- def _shutdown(self):
- self._doneflag.set()
- if self.state == "running":
- self.stop_download()
- # above is the last thing to set.
- if self._storagewrapper is not None:
- df = self._storagewrapper.done_checking_df
- yield df
- df.getResult()
- if self._storage is not None:
- df = self._storage.close()
- if df is not None:
- yield df
- df.getResult()
- self._unregister_files()
- if self._connection_manager is not None:
- self._down_ratelimiter.remove_throttle_listener(
- self._connection_manager )
- self._connection_manager.cleanup()
- self.context_valid = False
- self._init()
- self.state = "created"
- # release mutex on this torrent.
- if self.config["one_download_per_torrent"]:
- if self._mutex is not None and self._mutex.owner():
- self._mutex.release()
- self._rawserver.add_task(0, gc.collect)
- def _no_announce_shutdown(self, level, text):
- # This is only called when announce fails with no peers,
- # don't try to announce again telling we're leaving the torrent
- self._announced = False
- self._error(level, text)
- self.failed()
- def set_file_priority(self, filename, priority):
- if self._storagewrapper is None or self._picker is None:
- self._pending_file_priorities.append((filename, priority))
- else:
- begin, end = self._storagewrapper.get_piece_range_for_filename(filename)
- self._picker.set_priority(xrange(begin, end + 1), priority)
- self.config.setdefault('file_priorities', {})
- self.config['file_priorities'][filename] = priority
- self._dump_torrent_config()
- def get_file_priorities(self):
- return self.config.get('file_priorities', {})
- def get_file_priority(self, filename):
- fp = self.get_file_priorities()
- return fp.get(filename, 0)
- def add_feedback(self, feedback):
- self.feedback.chain.append(feedback)
- def remove_feedback(self, feedback):
- self.feedback.chain.remove(feedback)
- def got_exception(self, failure, cannot_shutdown=False):
- type, e = failure.exc_info()[0:2]
- severity = logging.CRITICAL
- msg = "Torrent got exception: %s" % type
- e_str = str_exc(e)
- if isinstance(e, UnregisteredFileException):
- # not an error, a pending disk op was aborted because the torrent
- # has unregistered files.
- return
- if isinstance(e, BTFailure):
- self._activity = ( _("download failed: ") + e_str, 0)
- elif isinstance(e, IOError):
- if e.errno == errno.ENOSPC:
- msg = _("IO Error: No space left on disk, "
- "or cannot create a file that large")
- self._activity = (_("killed by IO error: ") + e_str, 0)
- elif isinstance(e, OSError):
- self._activity = (_("killed by OS error: ") + e_str, 0)
- else:
- self._activity = (_("killed by internal exception: ") + e_str, 0)
- if isinstance(e, UserFailure):
- self.logger.log(severity, e_str )
- else:
- self.logger.log(severity, msg, exc_info=failure.exc_info())
- # steve wanted this too
- # Dave doesn't want it.
- #type, e, stack = failure.exc_info()
- #traceback.print_exception(type, e, stack, file=sys.stdout)
- self.failed(cannot_shutdown)
- def failed(self, cannot_shutdown=False):
- if cannot_shutdown:
- self.state = "failed"
- self.feedback.failed(self)
- return
- try:
- # this could complete later. sorry that's just the way it is.
- df = self.shutdown()
- def cb(r):
- self.state = "failed"
- self.feedback.failed(self)
- df.addBoth(cb)
- except:
- self.logger.exception(_("Additional error when closing down due"
- " to error: "))
- self.feedback.failed(self)
- def _error(self, level, text, exception=False, exc_info=None):
- if level > logging.WARNING:
- self.logger.log(level,
- _('Error regarding "%s":\n')%self.metainfo.name + text,
- exc_info=exc_info)
- if exception:
- self.feedback.exception(self, text)
- else:
- self.feedback.error(self, level, text)
- def finished(self, policy="auto", finished_this_session=True):
- assert self.state == "running", "state not running"
- self.logger.debug("done downloading, preparing to wrap up")
- # because _finished() calls shutdown(), which invalidates the torrent
- # context, we need to use _rawserver.add_task directly here
- df = launch_coroutine(wrap_task(self._rawserver.add_task), self._finished, policy=policy, finished_this_session=finished_this_session)
- df.addErrback(self.got_exception)
- return df
- def _finished(self, policy="auto", finished_this_session=True):
- self.logger.debug("wrapping up")
- if self.state != "running":
- return
- self.finflag.set()
- # Call self._storage.close() to flush buffers and change files to
- # read-only mode (when they're possibly reopened). Let exceptions
- # from self._storage.close() kill the torrent since files might not
- # be correct on disk if file.close() failed.
- self._storage.close()
- # don't bother trailing off the rate when we know we're done downloading
- self._downmeasure.rate = 0.0
- # If we haven't announced yet, normal first announce done later will
- # tell the tracker about seed status.
- # Only send completed the first time! Torrents transition to finished
- # everytime.
- if self._announced and not self.sent_completed:
- self._rerequest_op().announce_finish()
- self.sent_completed = True
- self._activity = (_("seeding"), 1)
- if self.config['check_hashes']:
- self._save_fastresume()
- # the old policy applied to downloading -- now that we are finished,
- # optionally reset it
- self.policy = policy
- self.feedback.finishing(self)
- config = self.config
- if finished_this_session:
- self.finished_this_session = True
- def move(working_path, destination_path):
- # this function is called from another thread, so don't do anything
- # that isn't thread safe in here
- self.logger.debug("deleting any file that might be in the way")
- try:
- os.remove(destination_path)
- self.logger.debug("successfully deleted file " +
- destination_path)
- except Exception, e:
- if os.path.exists(destination_path):
- self.logger.debug(str_exc(e))
- self.logger.debug("deleting any directory that might be in the way")
- try:
- shutil.rmtree(destination_path)
- self.logger.debug("successfully deleted directory " +
- destination_path)
- except Exception, e:
- if os.path.exists(destination_path):
- self.logger.debug(str_exc(e))
- self.logger.debug("ensuring destination exists")
- path, name = os.path.split(destination_path)
- no_really_makedirs(path)
- self.logger.debug("actually moving file")
- shutil.move(working_path, destination_path)
- self.logger.debug("returned from move")
- if self.working_path != self.destination_path:
- ## self.logger.debug("torrent finishing: shutting down, moving file, and restarting")
- ## df = self.shutdown()
- ## yield df
- ## df.getResult()
- self.logger.debug("torrent finishing: pausing, moving file, and restarting")
- self.stop_download(pause=True)
- self._unregister_files()
- self.logger.debug("successfully paused torrent, moving file")
- self.state = "finishing"
- df = ThreadedDeferred(wrap_task(self._rawserver.external_add_task),
- move, self.working_path, self.destination_path)
- yield df
- df.getResult()
- self.logger.debug("moved file, restarting")
- assert self.state == "finishing", "state not finishing"
- self.working_path = self.destination_path
- ## self.state = "created"
- ## df = self.initialize()
- ## yield df
- ## df.getResult()
- self.completed = True
- self.feedback.finished(self)
- self.state = "initializing"
- self._register_files()
- df = self._storage.initialize(self.working_path,
- zip(self._myfiles,
- self.metainfo.sizes))
- yield df
- df.getResult()
- # so we store new path names
- self._storagewrapper.fastresume_dirty = True
- self._statuscollector.files = self._myfiles
- self.state = "initialized"
- self.logger.debug("attempting restart")
- self.start_download()
- self.logger.debug("re-started torrent")
- else:
- self.completed = True
- self.feedback.finished(self)
- self._dump_torrent_config()
- def fastresume_file_path(self):
- # HEREDAVE: should probably be self.data_dir?
- return os.path.join(self.config['data_dir'], 'resume',
- self.infohash.encode('hex'))
- def config_file_path(self):
- return os.path.join(self.data_dir, 'torrents',
- self.metainfo.infohash.encode('hex'))
- def _periodic_save_fastresume(self):
- self._save_fastresume()
- if not self.finflag.isSet():
- self.add_task(30, self._periodic_save_fastresume)
- def _save_fastresume(self):
- if not self.is_initialized():
- return
- # HEREDAVE: should probably be self.data_dir?
- if not self.config['data_dir']:
- return
- filename = self.fastresume_file_path()
- if os.path.exists(filename) and not self._storagewrapper.fastresume_dirty:
- return
- resumefile = None
- try:
- resumefile = file(filename, 'wb')
- self._storagewrapper.write_fastresume(resumefile)
- resumefile.close()
- except Exception, e:
- self._error(logging.WARNING, _("Could not write fastresume data: ") +
- str_exc(e))
- if resumefile is not None:
- resumefile.close()
- def _dump_torrent_config(self):
- d = self.config.getDict()
- ## nd = {}
- ## for k,v in d.iteritems():
- ## # can't bencode floats!
- ## if not isinstance(v, float):
- ## if isinstance(v, unicode):
- ## # FIXME -- what is the right thing to do here?
- ## v = v.encode('utf8')
- ## nd[k] = v
- ## s = bencode(nd)
- s = cPickle.dumps(d)
- path = self.config_file_path()
- f = file(path+'.new', 'wb')
- f.write(s)
- f.close()
- shutil.move(path+'.new', path)
- def remove_state_files(self, del_files=False):
- assert self.state == "created", "state not created"
- try:
- os.remove(self.config_file_path())
- except Exception, e:
- self.logger.debug("error removing config file: %s", str_exc(e))
- try:
- os.remove(self.fastresume_file_path())
- except Exception, e:
- self.logger.debug("error removing fastresume file: %s", str_exc(e))
- if del_files:
- try:
- for file in self._last_myfiles:
- try:
- os.remove(file)
- except OSError:
- pass
- d, f = os.path.split(file)
- try:
- os.rmdir(d)
- except OSError:
- pass
- try:
- os.rmdir(self.working_path)
- except OSError:
- pass
- except Exception, e:
- self.logger.debug("error removing incomplete files: %s", str_exc(e))
- def get_downrate(self):
- if self.is_running():
- return self._downmeasure.get_rate()
- def get_uprate(self):
- if self.is_running():
- return self._upmeasure.get_rate()
- def get_rates(self):
- return (self.get_uprate(), self.get_downrate())
- def get_downtotal(self):
- if self.is_running():
- return self._downmeasure.get_total()
- def get_uptotal(self):
- if self.is_running():
- return self._upmeasure.get_total()
- def get_percent_complete(self):
- if self.is_initialized():
- if self.total_bytes > 0:
- r = 1 - self._ratemeasure.get_size_left() / self.total_bytes
- else:
- r = 1.0
- else:
- r = 0.0
- return r
- def get_num_connections(self):
- if self._connection_manager:
- return self._connection_manager.how_many_connections()
- return 0
- def get_connections(self):
- return self._connection_manager.complete_connectors
- def get_avg_peer_downrate(self):
- cs = self._connection_manager.complete_connectors
- if len(cs) == 0:
- return 0.0
- total = 0.0
- for c in cs:
- total += c.download.connector.download.peermeasure.get_rate()
- return total / len(cs)
- def get_status(self, spew = False, fileinfo=False):
- if self.is_initialized():
- r = self._statuscollector.get_statistics(spew, fileinfo)
- r['activity'] = self._activity[0]
- r['priority'] = self.priority
- if not self.is_running():
- r['timeEst'] = None
- else:
- r = dict(itertools.izip(('activity', 'fractionDone'), self._activity))
- r['pieceStates'] = (0, 0, {})
- r['priority'] = self.priority
- return r
- def get_total_transfer(self):
- if self._upmeasure is None:
- return (0, 0)
- return (self._upmeasure.get_total(), self._downmeasure.get_total())
- def set_option(self, option, value):
- if self.config.has_key(option) and self.config[option] == value:
- return
- self.config[option] = value
- def change_port(self, new_port = None):
- r = self.config['forwarded_port']
- if r:
- for port in self.reserved_ports:
- self._singleport_listener.release_port(port)
- del self.reserved_ports[:]
- if self.rescrewedported_port == r:
- return
- elif new_port is not None:
- r = new_port
- self.reserved_ports.remove(self.reported_port)
- self.reserved_ports.append(r)
- elif self._singleport_listener.port != self.reported_port:
- r = self._singleport_listener.get_port(self.change_port)
- self.reserved_ports.append(r)
- else:
- return
- self.reported_port = r
- self._myid = self._make_id()
- if self._connection_manager:
- self._connection_manager.my_id = self._myid
- if self._announced:
- self._rerequest_op().change_port(self._myid, r)
- def _announce_done(self):
- for port in self.reserved_ports[:-1]:
- self._singleport_listener.release_port(port, self.change_port)
- del self.reserved_ports[:-1]
- def _make_id(self):
- return PeerID.make_id()
|