Torrent.py 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Bram Cohen and Uoti Urpala
  11. from __future__ import division
  12. from __future__ import generators
  13. import os
  14. import gc
  15. import sys
  16. import errno
  17. import shutil
  18. import random
  19. import socket
  20. import cPickle
  21. import logging
  22. import itertools
  23. from BTL.translation import _
  24. from BitTorrent.NamedMutex import NamedMutex
  25. import BTL.stackthreading as threading
  26. from BitTorrent.platform import is_path_too_long, no_really_makedirs
  27. from BTL.platform import bttime, get_filesystem_encoding
  28. from BitTorrent.ConnectionManager import ConnectionManager
  29. from BitTorrent import PeerID
  30. from BTL.exceptions import str_exc
  31. from BTL.defer import ThreadedDeferred, Failure, wrap_task
  32. from BTL.yielddefer import launch_coroutine
  33. from BitTorrent.TorrentStats import TorrentStats
  34. from BitTorrent.RateMeasure import RateMeasure
  35. from BitTorrent.PiecePicker import PiecePicker
  36. from BitTorrent.Rerequester import Rerequester, DHTRerequester
  37. from BitTorrent.CurrentRateMeasure import Measure
  38. from BitTorrent.Storage import Storage, UnregisteredFileException
  39. from BitTorrent.HTTPConnector import URLage
  40. from BitTorrent.StorageWrapper import StorageWrapper
  41. from BitTorrent.RequestManager import RequestManager
  42. from BitTorrent.Upload import Upload
  43. from BitTorrent.MultiDownload import MultiDownload
  44. from BitTorrent import BTFailure, UserFailure
  45. from BitTorrent.prefs import Preferences
  46. from khashmir import const
  47. class Feedback(object):
  48. """Inidivual torrents (Torrent) perform callbacks regarding
  49. changes of state to the rest of the program via a Feedback
  50. object."""
  51. def finished(self, torrent):
  52. pass
  53. def failed(self, torrent):
  54. pass
  55. def error(self, torrent, level, text):
  56. pass
  57. def exception(self, torrent, text):
  58. self.error(torrent, logging.CRITICAL, text)
  59. def started(self, torrent):
  60. pass
  61. class FeedbackMultiplier(object):
  62. def __init__(self, *a):
  63. self.chain = list(a)
  64. def __getattr__(self, attr):
  65. def multiply_calls(*a, **kw):
  66. exc_info = None
  67. for x in self.chain:
  68. try:
  69. getattr(x, attr)(*a, **kw)
  70. except:
  71. exc_info = sys.exc_info()
  72. if exc_info:
  73. raise exc_info[0], exc_info[1], exc_info[2]
  74. return multiply_calls
  75. class Torrent(object):
  76. """Represents a single file transfer or transfer for a batch of files
  77. in the case of a batch torrent. During the course of a single
  78. transfer, a Torrent may have many different connections to peers."""
  79. STATES = ["created", "initializing", "initialized", "running",
  80. "finishing", "failed"]
  81. POLICIES = ["stop", "start", "auto"]
  82. PRIORITIES = ["low", "normal", "high"]
  83. def __init__(self, metainfo, working_path, destination_path, config,
  84. data_dir, rawserver, choker,
  85. singleport_listener, ratelimiter, down_ratelimiter,
  86. total_downmeasure,
  87. filepool, dht, feedback, log_root,
  88. hidden=False, is_auto_update=False):
  89. # The passed working path and destination_path should be filesystem
  90. # encoded or should be unicode if the filesystem supports unicode.
  91. fs_encoding = get_filesystem_encoding()
  92. assert (
  93. (fs_encoding == None and isinstance(working_path, unicode)) or
  94. (fs_encoding != None and isinstance(working_path, str))
  95. ), "working path encoding problem"
  96. assert (
  97. (fs_encoding == None and isinstance(destination_path, unicode)) or
  98. (fs_encoding != None and isinstance(destination_path, str))
  99. ), "destination path encoding problem"
  100. self.state = "created"
  101. self.data_dir = data_dir
  102. self.feedback = FeedbackMultiplier(feedback)
  103. self.finished_this_session = False
  104. self._rawserver = rawserver
  105. self._singleport_listener = singleport_listener
  106. self._ratelimiter = ratelimiter
  107. self._down_ratelimiter = down_ratelimiter
  108. self._filepool = filepool
  109. self._dht = dht
  110. self._choker = choker
  111. self._total_downmeasure = total_downmeasure
  112. self._init()
  113. self._announced = False
  114. self._listening = False
  115. self.reserved_ports = []
  116. self.reported_port = None
  117. self._myfiles = None
  118. self._last_myfiles = None
  119. self.total_bytes = None
  120. self._doneflag = threading.Event()
  121. self.finflag = threading.Event()
  122. self._contfunc = None
  123. self._activity = (_("Initial startup"), 0)
  124. self._pending_file_priorities = []
  125. self._mutex = None
  126. self.time_created = bttime()
  127. self.time_started = None
  128. self.metainfo = metainfo
  129. self.infohash = metainfo.infohash
  130. self.log_root = log_root
  131. self.logger = logging.getLogger(log_root + '.' + repr(self.infohash))
  132. self.logger.setLevel(logging.DEBUG)
  133. self.total_bytes = metainfo.total_bytes
  134. if not metainfo.reported_errors:
  135. metainfo.show_encoding_errors(self._error)
  136. self.config = Preferences(config)#, persist_callback=self._dump_torrent_config)
  137. self.working_path = working_path #sets in config. See _set_working_path
  138. self.destination_path = destination_path # sets in config.
  139. self.priority = "normal"
  140. self.policy = "auto"
  141. self.hidden = hidden #sets in config
  142. self.is_auto_update = is_auto_update #sets in config
  143. self._completed = False
  144. self.config['finishtime'] = 0
  145. self.uptotal = 0
  146. self.uptotal_old = 0
  147. self.downtotal = 0
  148. self.downtotal_old = 0
  149. self.context_valid = True
  150. def _init(self):
  151. self._picker = None
  152. self._storage = None
  153. self._storagewrapper = None
  154. self._ratemeasure = None
  155. self._upmeasure = None
  156. self._downmeasure = None
  157. self._connection_manager = None
  158. self._rerequest = None
  159. self._dht_rerequest = None
  160. self._statuscollector = None
  161. self._rm = None
  162. self.multidownload = None
  163. def update_config(self, config):
  164. self.config.update(config)
  165. d = self.config.get('file_priorities', {})
  166. for k, v in d.iteritems():
  167. self.set_file_priority(k, v)
  168. if self.policy not in self.POLICIES:
  169. self.policy = "auto"
  170. if self.priority not in self.PRIORITIES:
  171. self.priority = "normal"
  172. def _set_state(self, value):
  173. assert value in self.STATES, ("value %s not in STATES %s" %
  174. (value, self.STATES))
  175. self._state = value
  176. def _get_state(self):
  177. return self._state
  178. state = property(_get_state, _set_state)
  179. def _set_policy(self, value):
  180. assert value in self.POLICIES, ("value %s not in POLICIES %s" %
  181. (value, self.POLICIES))
  182. self.config['policy'] = value
  183. def _get_policy(self):
  184. return self.config['policy']
  185. policy = property(_get_policy, _set_policy)
  186. def _set_priority(self, value):
  187. assert value in self.PRIORITIES, ("value %s not in PRIORITIES %s" %
  188. (value, self.PRIORITIES))
  189. self.config['priority'] = value
  190. def _get_priority(self):
  191. return self.config['priority']
  192. priority = property(_get_priority, _set_priority)
  193. def _set_hidden(self, value):
  194. self.config['hidden'] = value
  195. def _get_hidden(self):
  196. return self.config['hidden']
  197. hidden = property(_get_hidden, _set_hidden)
  198. def _set_is_auto_update(self, value):
  199. self.config['is_auto_update'] = value
  200. def _get_is_auto_update(self):
  201. return self.config['is_auto_update']
  202. is_auto_update = property(_get_is_auto_update, _set_is_auto_update)
  203. def _set_completed(self, val):
  204. self._completed = val
  205. if val:
  206. self.config['finishtime'] = bttime()
  207. def _get_completed(self):
  208. return self._completed
  209. completed = property(_get_completed, _set_completed)
  210. def _set_sent_completed(self, value):
  211. self.config['sent_completed'] = value
  212. def _get_sent_completed(self):
  213. return self.config['sent_completed']
  214. sent_completed = property(_get_sent_completed, _set_sent_completed)
  215. def _get_finishtime(self):
  216. return self.config['finishtime']
  217. finishtime = property(_get_finishtime)
  218. def _set_destination_path(self, value):
  219. # The following assertion will not always work. Consider
  220. # Torrent.py: self.working_path = self.destination_path
  221. # This assignment retrieves a unicode path from
  222. # config['destination_path'].
  223. #assert isinstance(value,str) # assume filesystem encoding.
  224. #
  225. # The following if statement is not necessary because config here
  226. # is not really a config file, but rather state that is pickled when
  227. # the Torrent shuts down.
  228. #if isinstance(value, str):
  229. # value = decode_from_filesystem(value)
  230. self.config['destination_path'] = value
  231. def _get_destination_path(self):
  232. return self.config['destination_path']
  233. destination_path = property(_get_destination_path, _set_destination_path)
  234. def _set_working_path(self, value):
  235. # See comments for _set_destination_path.
  236. self.config['working_path'] = value
  237. def _get_working_path(self):
  238. return self.config['working_path']
  239. working_path = property(_get_working_path, _set_working_path)
  240. def __cmp__(self, other):
  241. if not isinstance(other, Torrent):
  242. raise TypeError("Torrent.__cmp__(x,y) requires y to be a 'Torrent',"
  243. " not a '%s'" % type(other))
  244. return cmp(self.metainfo.infohash, other.metainfo.infohash)
  245. def is_initialized(self):
  246. return self.state not in ["created", "initializing", "failed"]
  247. def is_running(self):
  248. return self.state == "running"
  249. def is_context_valid(self):
  250. return self.context_valid
  251. def _context_wrap(self, _f, *a, **kw):
  252. # this filters out calls
  253. # to an invalid torrent
  254. # sloppy technique
  255. if not self.context_valid:
  256. return
  257. try:
  258. _f(*a, **kw)
  259. except KeyboardInterrupt:
  260. raise
  261. except:
  262. self.got_exception(Failure())
  263. # these wrappers add _context_wrap to the chain, so that calls on a dying
  264. # object are filtered, and errors on a valid call are logged.
  265. def add_task(self, delay, func, *a, **kw):
  266. return self._rawserver.add_task(delay, self._context_wrap,
  267. func, *a, **kw)
  268. def external_add_task(self, delay, func, *a, **kw):
  269. return self._rawserver.external_add_task(delay, self._context_wrap,
  270. func, *a, **kw)
  271. def _register_files(self):
  272. if self.metainfo.is_batch:
  273. myfiles = [os.path.join(self.destination_path, f) for f in
  274. self.metainfo.files_fs]
  275. else:
  276. myfiles = [self.destination_path, ]
  277. for filename in myfiles:
  278. if is_path_too_long(filename):
  279. raise BTFailure("Filename path exceeds platform limit: %s" % filename)
  280. # if the destination path contains any of the files in the torrent
  281. # then use the destination path instead of the working path.
  282. if len([x for x in myfiles if os.path.exists(x)]) > 0:
  283. self.working_path = self.destination_path
  284. else:
  285. if self.metainfo.is_batch:
  286. myfiles = [os.path.join(self.working_path, f) for f in
  287. self.metainfo.files_fs]
  288. else:
  289. myfiles = [self.working_path, ]
  290. assert self._myfiles == None, '_myfiles should be None!'
  291. self._filepool.add_files(myfiles, self)
  292. self._myfiles = myfiles
  293. def _build_url_mapping(self):
  294. # TODO: support non [-1] == '/' urls
  295. url_suffixes = []
  296. if self.metainfo.is_batch:
  297. for filename in self.metainfo.orig_files:
  298. path = '%s/%s' % (self.metainfo.name, filename)
  299. # am I right that orig_files could have windows paths?
  300. path = path.replace('\\', '/')
  301. url_suffixes.append(path)
  302. else:
  303. url_suffixes = [self.metainfo.name, ]
  304. self._url_suffixes = url_suffixes
  305. total = 0
  306. piece_size = self.metainfo.piece_length
  307. self._urls = zip(self._url_suffixes, self.metainfo.sizes)
  308. def _unregister_files(self):
  309. if self._myfiles is not None:
  310. self._filepool.remove_files(self._myfiles)
  311. self._last_myfiles = self._myfiles
  312. self._myfiles = None
  313. def initialize(self):
  314. self.context_valid = True
  315. assert self.state in ["created", "failed", "finishing"], "state not in set"
  316. self.state = "initializing"
  317. df = launch_coroutine(wrap_task(self.add_task), self._initialize)
  318. df.addErrback(self.got_exception)
  319. return df
  320. # this function is so nasty!
  321. def _initialize(self):
  322. self._doneflag = threading.Event()
  323. # only one torrent object for of a particular infohash at a time.
  324. # Note: This must be done after doneflag is created if shutdown()
  325. # is to be called from got_exception().
  326. if self.config["one_download_per_torrent"]:
  327. self._mutex = NamedMutex(self.infohash.encode("hex"))
  328. if not self._mutex.acquire(False):
  329. try:
  330. raise UserFailure(_("Torrent already being downloaded or "
  331. "seeded." ))
  332. except UserFailure, e:
  333. # perform exception handling including shutting down
  334. # the torrent.
  335. self.got_exception(Failure(),
  336. cannot_shutdown=True)
  337. return
  338. self.reported_port = self.config['forwarded_port']
  339. if not self.reported_port:
  340. self.reported_port = \
  341. self._singleport_listener.get_port(self.change_port)
  342. if self.reported_port:
  343. self.reserved_ports.append(self.reported_port)
  344. # backward compatibility with older 5.0 development versions
  345. if self.destination_path == "":
  346. try:
  347. self.destination_path = self.config['save_as']
  348. except:
  349. pass
  350. if self.working_path == "":
  351. self.working_path = self.destination_path
  352. self._myid = self._make_id()
  353. random.seed(self._myid)
  354. self._build_url_mapping()
  355. self._urlage = URLage(self._urls)
  356. self._register_files()
  357. self.logger.debug("_initialize: self.working_path=%s", self.working_path)
  358. self._storage = Storage(self.config, self._filepool, self.working_path,
  359. zip(self._myfiles, self.metainfo.sizes),
  360. self.add_task, self.external_add_task,
  361. self._doneflag)
  362. df = self._storage.startup_df
  363. yield df
  364. if df.getResult() != True:
  365. # initialization was aborted
  366. self.logger.debug("_initialize: initialization aborted")
  367. return
  368. self.logger.debug("_initialize: returned from Storage startup.")
  369. resumefile = None
  370. if self.data_dir:
  371. filename = os.path.join(self.data_dir, 'resume',
  372. self.infohash.encode('hex'))
  373. if os.path.exists(filename):
  374. try:
  375. resumefile = file(filename, 'rb')
  376. except Exception, e:
  377. self._error(logging.WARNING,
  378. _("Could not load fastresume data: %s") % str_exc(e)
  379. + ' ' + _("Will perform full hash check."))
  380. if resumefile is not None:
  381. resumefile.close()
  382. resumefile = None
  383. def data_flunked(amount, index):
  384. self._ratemeasure.data_rejected(amount)
  385. self._error(logging.INFO,
  386. _("piece %d failed hash check, re-downloading it")
  387. % index)
  388. def errorfunc(level, text):
  389. def e():
  390. self._error(level, text)
  391. self.external_add_task(0, e)
  392. def statusfunc(activity = None, fractionDone = 0):
  393. if activity is None:
  394. activity = self._activity[0]
  395. self._activity = (activity, fractionDone)
  396. numpieces = len(self.metainfo.hashes)
  397. self._rm = RequestManager(self.config['download_chunk_size'],
  398. self.metainfo.piece_length, numpieces,
  399. self._storage.get_total_length())
  400. self._storagewrapper = StorageWrapper(self._storage, self._rm,
  401. self.config,
  402. self.metainfo.hashes,
  403. self.metainfo.piece_length,
  404. statusfunc, self._doneflag,
  405. data_flunked, self.infohash,
  406. self.metainfo.is_batch,
  407. errorfunc, self.working_path,
  408. self.destination_path,
  409. resumefile,
  410. self.add_task,
  411. self.external_add_task)
  412. self._rm.set_storage(self._storagewrapper)
  413. df = self._storagewrapper.done_checking_df
  414. yield df
  415. if df.getResult() != True:
  416. # initialization was aborted
  417. return
  418. if resumefile is not None:
  419. resumefile.close()
  420. self._upmeasure = Measure(self.config['max_rate_period'])
  421. self._downmeasure = Measure(self.config['max_rate_period'])
  422. self._ratemeasure = RateMeasure(self._storagewrapper.amount_left_with_partials)
  423. self._picker = PiecePicker(self.config, numpieces,
  424. self._storagewrapper.have_set.iterneg(0, numpieces))
  425. self._periodic_save_fastresume()
  426. while self._pending_file_priorities:
  427. self.set_file_priority(*self._pending_file_priorities.pop())
  428. def kickpeer(connection):
  429. def kick():
  430. connection.close()
  431. self.add_task(0, kick)
  432. def banpeer(ip):
  433. self._connection_manager.ban(ip)
  434. md = MultiDownload(self.config, self._storagewrapper, self._rm,
  435. self._urlage, self._picker, numpieces,
  436. self.finished, self.got_exception, kickpeer, banpeer,
  437. self._downmeasure.get_rate)
  438. md.add_useful_received_listener(self._total_downmeasure.update_rate)
  439. md.add_useful_received_listener(self._downmeasure.update_rate)
  440. md.add_useful_received_listener(self._ratemeasure.data_came_in)
  441. md.add_raw_received_listener(self._down_ratelimiter.update_rate)
  442. self.multidownload = md
  443. # HERE. Yipee! Uploads are created by callback while Download
  444. # objects are created by MultiDownload. --Dave
  445. def make_upload(connector):
  446. up = Upload(self.multidownload, connector, self._ratelimiter,
  447. self._choker,
  448. self._storagewrapper, self.config['max_chunk_length'],
  449. self.config['max_rate_period'],
  450. self.config['num_fast'], self.infohash)
  451. connector.add_sent_listener(self._upmeasure.update_rate)
  452. return up
  453. if self._dht:
  454. addContact = self._dht.addContact
  455. else:
  456. addContact = None
  457. df = self.metainfo.get_tracker_ips(wrap_task(self.external_add_task))
  458. yield df
  459. tracker_ips = df.getResult()
  460. self._connection_manager = \
  461. ConnectionManager(make_upload, self.multidownload, self._choker,
  462. numpieces, self._ratelimiter,
  463. self._rawserver, self.config, self.metainfo.is_private,
  464. self._myid, self.add_task, self.infohash, self, addContact,
  465. 0, tracker_ips, self.log_root)
  466. self.multidownload.attach_connection_manager(self._connection_manager)
  467. self._statuscollector = TorrentStats(self.logger, self._choker,
  468. self.get_uprate, self.get_downrate, self._upmeasure.get_total,
  469. self._downmeasure.get_total, self._ratemeasure.get_time_left,
  470. self.get_percent_complete, self.multidownload.aggregate_piece_states,
  471. self.finflag, self._connection_manager, self.multidownload,
  472. self.get_file_priorities, self._myfiles,
  473. self._connection_manager.ever_got_incoming, None)
  474. self.state = "initialized"
  475. def _rerequest_op(self):
  476. # weee hee hee
  477. class Caller(object):
  478. def __getattr__(s, attr):
  479. def rerequest_function(*a, **kw):
  480. if self._rerequest:
  481. f = getattr(self._rerequest, attr)
  482. f(*a, **kw)
  483. if self._dht_rerequest:
  484. f = getattr(self._dht_rerequest, attr)
  485. f(*a, **kw)
  486. return rerequest_function
  487. return Caller()
  488. def start_download(self):
  489. assert self.state == "initialized", "state not initialized"
  490. self.time_started = bttime()
  491. self._down_ratelimiter.add_throttle_listener(self._connection_manager)
  492. self._connection_manager.reopen(self.reported_port)
  493. self._singleport_listener.add_torrent(self.infohash,
  494. self._connection_manager)
  495. self._listening = True
  496. # the DHT is broken
  497. if self.metainfo.is_trackerless or not self.metainfo.is_private:
  498. #if self.metainfo.is_trackerless:
  499. if not self._dht and self.metainfo.is_trackerless:
  500. self._error(self, logging.CRITICAL,
  501. _("Attempt to download a trackerless torrent "
  502. "with trackerless client turned off."))
  503. return
  504. else:
  505. if self._dht:
  506. nodes = self._dht.table.findNodes(self.metainfo.infohash,
  507. invalid=False)
  508. if len(nodes) < const.K:
  509. for host, port in self.metainfo.nodes:
  510. df = self._rawserver.gethostbyname(host)
  511. df.addCallback(self._dht.addContact, port)
  512. df.addLogback(self.logger.warning, "Resolve failed")
  513. self._dht_rerequest = DHTRerequester(self.config,
  514. self.add_task,
  515. self._connection_manager.how_many_connections,
  516. self._connection_manager.start_connection,
  517. self.external_add_task,
  518. self._rawserver,
  519. self._storagewrapper.get_amount_left,
  520. self._upmeasure.get_total,
  521. self._downmeasure.get_total, self.reported_port,
  522. self._myid,
  523. self.infohash, self._error, self.finflag,
  524. self._upmeasure.get_rate,
  525. self._downmeasure.get_rate,
  526. self._connection_manager.ever_got_incoming,
  527. self._no_announce_shutdown, self._announce_done,
  528. self._dht)
  529. if not self.metainfo.is_trackerless:
  530. self._rerequest = Rerequester(self.metainfo.announce,
  531. self.metainfo.announce_list, self.config,
  532. self.add_task, self.external_add_task, self._rawserver,
  533. self._connection_manager.how_many_connections,
  534. self._connection_manager.start_connection,
  535. self._storagewrapper.get_amount_left,
  536. self._upmeasure.get_total, self._downmeasure.get_total,
  537. self.reported_port, self._myid,
  538. self.infohash, self._error, self.finflag,
  539. self._upmeasure.get_rate,
  540. self._downmeasure.get_rate,
  541. self._connection_manager.ever_got_incoming,
  542. self._no_announce_shutdown, self._announce_done,
  543. bool(self._dht_rerequest))
  544. self._statuscollector.rerequester = self._rerequest or self._dht_rerequest
  545. self.multidownload.rerequester = self._rerequest or self._dht_rerequest
  546. self._announced = True
  547. if self._dht and len(self._dht.table.findNodes(self.infohash)) == 0:
  548. self.add_task(5, self._dht.findCloseNodes)
  549. self._rerequest_op().begin()
  550. for url_prefix in self.metainfo.url_list:
  551. self._connection_manager.start_http_connection(url_prefix)
  552. self.state = "running"
  553. if not self.finflag.isSet():
  554. self._activity = (_("downloading"), 0)
  555. self.feedback.started(self)
  556. if self._storagewrapper.amount_left == 0 and not self.completed:
  557. # By default, self.finished() resets the policy to "auto",
  558. # but if we discover on startup that we are already finished,
  559. # we don't want to reset it.
  560. # Also, if we discover on startup that we are already finished,
  561. # don't set finished_this_session.
  562. self.finished(policy=self.policy, finished_this_session=False)
  563. def stop_download(self, pause=False):
  564. assert self.state == "running", "state not running"
  565. self.state = "initialized"
  566. if not self.finflag.isSet():
  567. self._activity = (_("stopped"), 0)
  568. if self._announced:
  569. self._rerequest_op().announce_stop()
  570. self._announced = False
  571. self._statuscollector.rerequester = None
  572. self.multidownload.rerequester = None
  573. if self._listening:
  574. self._singleport_listener.remove_torrent(self.infohash)
  575. self._listening = False
  576. for port in self.reserved_ports:
  577. self._singleport_listener.release_port(port, self.change_port)
  578. del self.reserved_ports[:]
  579. if self._connection_manager is not None:
  580. if pause:
  581. self._down_ratelimiter.remove_throttle_listener(
  582. self._connection_manager )
  583. self._connection_manager.throttle_connections()
  584. else:
  585. self._connection_manager.close_connections()
  586. if self.config['check_hashes']:
  587. self._save_fastresume()
  588. def shutdown(self):
  589. # use _rawserver.add_task directly here, because we want the callbacks
  590. # to happen even though _shutdown is about to invalidate this torrent's
  591. # context
  592. df = launch_coroutine(wrap_task(self._rawserver.add_task), self._shutdown)
  593. df.addErrback(self.got_exception, cannot_shutdown=True)
  594. return df
  595. def _shutdown(self):
  596. self._doneflag.set()
  597. if self.state == "running":
  598. self.stop_download()
  599. # above is the last thing to set.
  600. if self._storagewrapper is not None:
  601. df = self._storagewrapper.done_checking_df
  602. yield df
  603. df.getResult()
  604. if self._storage is not None:
  605. df = self._storage.close()
  606. if df is not None:
  607. yield df
  608. df.getResult()
  609. self._unregister_files()
  610. if self._connection_manager is not None:
  611. self._down_ratelimiter.remove_throttle_listener(
  612. self._connection_manager )
  613. self._connection_manager.cleanup()
  614. self.context_valid = False
  615. self._init()
  616. self.state = "created"
  617. # release mutex on this torrent.
  618. if self.config["one_download_per_torrent"]:
  619. if self._mutex is not None and self._mutex.owner():
  620. self._mutex.release()
  621. self._rawserver.add_task(0, gc.collect)
  622. def _no_announce_shutdown(self, level, text):
  623. # This is only called when announce fails with no peers,
  624. # don't try to announce again telling we're leaving the torrent
  625. self._announced = False
  626. self._error(level, text)
  627. self.failed()
  628. def set_file_priority(self, filename, priority):
  629. if self._storagewrapper is None or self._picker is None:
  630. self._pending_file_priorities.append((filename, priority))
  631. else:
  632. begin, end = self._storagewrapper.get_piece_range_for_filename(filename)
  633. self._picker.set_priority(xrange(begin, end + 1), priority)
  634. self.config.setdefault('file_priorities', {})
  635. self.config['file_priorities'][filename] = priority
  636. self._dump_torrent_config()
  637. def get_file_priorities(self):
  638. return self.config.get('file_priorities', {})
  639. def get_file_priority(self, filename):
  640. fp = self.get_file_priorities()
  641. return fp.get(filename, 0)
  642. def add_feedback(self, feedback):
  643. self.feedback.chain.append(feedback)
  644. def remove_feedback(self, feedback):
  645. self.feedback.chain.remove(feedback)
  646. def got_exception(self, failure, cannot_shutdown=False):
  647. type, e = failure.exc_info()[0:2]
  648. severity = logging.CRITICAL
  649. msg = "Torrent got exception: %s" % type
  650. e_str = str_exc(e)
  651. if isinstance(e, UnregisteredFileException):
  652. # not an error, a pending disk op was aborted because the torrent
  653. # has unregistered files.
  654. return
  655. if isinstance(e, BTFailure):
  656. self._activity = ( _("download failed: ") + e_str, 0)
  657. elif isinstance(e, IOError):
  658. if e.errno == errno.ENOSPC:
  659. msg = _("IO Error: No space left on disk, "
  660. "or cannot create a file that large")
  661. self._activity = (_("killed by IO error: ") + e_str, 0)
  662. elif isinstance(e, OSError):
  663. self._activity = (_("killed by OS error: ") + e_str, 0)
  664. else:
  665. self._activity = (_("killed by internal exception: ") + e_str, 0)
  666. if isinstance(e, UserFailure):
  667. self.logger.log(severity, e_str )
  668. else:
  669. self.logger.log(severity, msg, exc_info=failure.exc_info())
  670. # steve wanted this too
  671. # Dave doesn't want it.
  672. #type, e, stack = failure.exc_info()
  673. #traceback.print_exception(type, e, stack, file=sys.stdout)
  674. self.failed(cannot_shutdown)
  675. def failed(self, cannot_shutdown=False):
  676. if cannot_shutdown:
  677. self.state = "failed"
  678. self.feedback.failed(self)
  679. return
  680. try:
  681. # this could complete later. sorry that's just the way it is.
  682. df = self.shutdown()
  683. def cb(r):
  684. self.state = "failed"
  685. self.feedback.failed(self)
  686. df.addBoth(cb)
  687. except:
  688. self.logger.exception(_("Additional error when closing down due"
  689. " to error: "))
  690. self.feedback.failed(self)
  691. def _error(self, level, text, exception=False, exc_info=None):
  692. if level > logging.WARNING:
  693. self.logger.log(level,
  694. _('Error regarding "%s":\n')%self.metainfo.name + text,
  695. exc_info=exc_info)
  696. if exception:
  697. self.feedback.exception(self, text)
  698. else:
  699. self.feedback.error(self, level, text)
  700. def finished(self, policy="auto", finished_this_session=True):
  701. assert self.state == "running", "state not running"
  702. self.logger.debug("done downloading, preparing to wrap up")
  703. # because _finished() calls shutdown(), which invalidates the torrent
  704. # context, we need to use _rawserver.add_task directly here
  705. df = launch_coroutine(wrap_task(self._rawserver.add_task), self._finished, policy=policy, finished_this_session=finished_this_session)
  706. df.addErrback(self.got_exception)
  707. return df
  708. def _finished(self, policy="auto", finished_this_session=True):
  709. self.logger.debug("wrapping up")
  710. if self.state != "running":
  711. return
  712. self.finflag.set()
  713. # Call self._storage.close() to flush buffers and change files to
  714. # read-only mode (when they're possibly reopened). Let exceptions
  715. # from self._storage.close() kill the torrent since files might not
  716. # be correct on disk if file.close() failed.
  717. self._storage.close()
  718. # don't bother trailing off the rate when we know we're done downloading
  719. self._downmeasure.rate = 0.0
  720. # If we haven't announced yet, normal first announce done later will
  721. # tell the tracker about seed status.
  722. # Only send completed the first time! Torrents transition to finished
  723. # everytime.
  724. if self._announced and not self.sent_completed:
  725. self._rerequest_op().announce_finish()
  726. self.sent_completed = True
  727. self._activity = (_("seeding"), 1)
  728. if self.config['check_hashes']:
  729. self._save_fastresume()
  730. # the old policy applied to downloading -- now that we are finished,
  731. # optionally reset it
  732. self.policy = policy
  733. self.feedback.finishing(self)
  734. config = self.config
  735. if finished_this_session:
  736. self.finished_this_session = True
  737. def move(working_path, destination_path):
  738. # this function is called from another thread, so don't do anything
  739. # that isn't thread safe in here
  740. self.logger.debug("deleting any file that might be in the way")
  741. try:
  742. os.remove(destination_path)
  743. self.logger.debug("successfully deleted file " +
  744. destination_path)
  745. except Exception, e:
  746. if os.path.exists(destination_path):
  747. self.logger.debug(str_exc(e))
  748. self.logger.debug("deleting any directory that might be in the way")
  749. try:
  750. shutil.rmtree(destination_path)
  751. self.logger.debug("successfully deleted directory " +
  752. destination_path)
  753. except Exception, e:
  754. if os.path.exists(destination_path):
  755. self.logger.debug(str_exc(e))
  756. self.logger.debug("ensuring destination exists")
  757. path, name = os.path.split(destination_path)
  758. no_really_makedirs(path)
  759. self.logger.debug("actually moving file")
  760. shutil.move(working_path, destination_path)
  761. self.logger.debug("returned from move")
  762. if self.working_path != self.destination_path:
  763. ## self.logger.debug("torrent finishing: shutting down, moving file, and restarting")
  764. ## df = self.shutdown()
  765. ## yield df
  766. ## df.getResult()
  767. self.logger.debug("torrent finishing: pausing, moving file, and restarting")
  768. self.stop_download(pause=True)
  769. self._unregister_files()
  770. self.logger.debug("successfully paused torrent, moving file")
  771. self.state = "finishing"
  772. df = ThreadedDeferred(wrap_task(self._rawserver.external_add_task),
  773. move, self.working_path, self.destination_path)
  774. yield df
  775. df.getResult()
  776. self.logger.debug("moved file, restarting")
  777. assert self.state == "finishing", "state not finishing"
  778. self.working_path = self.destination_path
  779. ## self.state = "created"
  780. ## df = self.initialize()
  781. ## yield df
  782. ## df.getResult()
  783. self.completed = True
  784. self.feedback.finished(self)
  785. self.state = "initializing"
  786. self._register_files()
  787. df = self._storage.initialize(self.working_path,
  788. zip(self._myfiles,
  789. self.metainfo.sizes))
  790. yield df
  791. df.getResult()
  792. # so we store new path names
  793. self._storagewrapper.fastresume_dirty = True
  794. self._statuscollector.files = self._myfiles
  795. self.state = "initialized"
  796. self.logger.debug("attempting restart")
  797. self.start_download()
  798. self.logger.debug("re-started torrent")
  799. else:
  800. self.completed = True
  801. self.feedback.finished(self)
  802. self._dump_torrent_config()
  803. def fastresume_file_path(self):
  804. # HEREDAVE: should probably be self.data_dir?
  805. return os.path.join(self.config['data_dir'], 'resume',
  806. self.infohash.encode('hex'))
  807. def config_file_path(self):
  808. return os.path.join(self.data_dir, 'torrents',
  809. self.metainfo.infohash.encode('hex'))
  810. def _periodic_save_fastresume(self):
  811. self._save_fastresume()
  812. if not self.finflag.isSet():
  813. self.add_task(30, self._periodic_save_fastresume)
  814. def _save_fastresume(self):
  815. if not self.is_initialized():
  816. return
  817. # HEREDAVE: should probably be self.data_dir?
  818. if not self.config['data_dir']:
  819. return
  820. filename = self.fastresume_file_path()
  821. if os.path.exists(filename) and not self._storagewrapper.fastresume_dirty:
  822. return
  823. resumefile = None
  824. try:
  825. resumefile = file(filename, 'wb')
  826. self._storagewrapper.write_fastresume(resumefile)
  827. resumefile.close()
  828. except Exception, e:
  829. self._error(logging.WARNING, _("Could not write fastresume data: ") +
  830. str_exc(e))
  831. if resumefile is not None:
  832. resumefile.close()
  833. def _dump_torrent_config(self):
  834. d = self.config.getDict()
  835. ## nd = {}
  836. ## for k,v in d.iteritems():
  837. ## # can't bencode floats!
  838. ## if not isinstance(v, float):
  839. ## if isinstance(v, unicode):
  840. ## # FIXME -- what is the right thing to do here?
  841. ## v = v.encode('utf8')
  842. ## nd[k] = v
  843. ## s = bencode(nd)
  844. s = cPickle.dumps(d)
  845. path = self.config_file_path()
  846. f = file(path+'.new', 'wb')
  847. f.write(s)
  848. f.close()
  849. shutil.move(path+'.new', path)
  850. def remove_state_files(self, del_files=False):
  851. assert self.state == "created", "state not created"
  852. try:
  853. os.remove(self.config_file_path())
  854. except Exception, e:
  855. self.logger.debug("error removing config file: %s", str_exc(e))
  856. try:
  857. os.remove(self.fastresume_file_path())
  858. except Exception, e:
  859. self.logger.debug("error removing fastresume file: %s", str_exc(e))
  860. if del_files:
  861. try:
  862. for file in self._last_myfiles:
  863. try:
  864. os.remove(file)
  865. except OSError:
  866. pass
  867. d, f = os.path.split(file)
  868. try:
  869. os.rmdir(d)
  870. except OSError:
  871. pass
  872. try:
  873. os.rmdir(self.working_path)
  874. except OSError:
  875. pass
  876. except Exception, e:
  877. self.logger.debug("error removing incomplete files: %s", str_exc(e))
  878. def get_downrate(self):
  879. if self.is_running():
  880. return self._downmeasure.get_rate()
  881. def get_uprate(self):
  882. if self.is_running():
  883. return self._upmeasure.get_rate()
  884. def get_rates(self):
  885. return (self.get_uprate(), self.get_downrate())
  886. def get_downtotal(self):
  887. if self.is_running():
  888. return self._downmeasure.get_total()
  889. def get_uptotal(self):
  890. if self.is_running():
  891. return self._upmeasure.get_total()
  892. def get_percent_complete(self):
  893. if self.is_initialized():
  894. if self.total_bytes > 0:
  895. r = 1 - self._ratemeasure.get_size_left() / self.total_bytes
  896. else:
  897. r = 1.0
  898. else:
  899. r = 0.0
  900. return r
  901. def get_num_connections(self):
  902. if self._connection_manager:
  903. return self._connection_manager.how_many_connections()
  904. return 0
  905. def get_connections(self):
  906. return self._connection_manager.complete_connectors
  907. def get_avg_peer_downrate(self):
  908. cs = self._connection_manager.complete_connectors
  909. if len(cs) == 0:
  910. return 0.0
  911. total = 0.0
  912. for c in cs:
  913. total += c.download.connector.download.peermeasure.get_rate()
  914. return total / len(cs)
  915. def get_status(self, spew = False, fileinfo=False):
  916. if self.is_initialized():
  917. r = self._statuscollector.get_statistics(spew, fileinfo)
  918. r['activity'] = self._activity[0]
  919. r['priority'] = self.priority
  920. if not self.is_running():
  921. r['timeEst'] = None
  922. else:
  923. r = dict(itertools.izip(('activity', 'fractionDone'), self._activity))
  924. r['pieceStates'] = (0, 0, {})
  925. r['priority'] = self.priority
  926. return r
  927. def get_total_transfer(self):
  928. if self._upmeasure is None:
  929. return (0, 0)
  930. return (self._upmeasure.get_total(), self._downmeasure.get_total())
  931. def set_option(self, option, value):
  932. if self.config.has_key(option) and self.config[option] == value:
  933. return
  934. self.config[option] = value
  935. def change_port(self, new_port = None):
  936. r = self.config['forwarded_port']
  937. if r:
  938. for port in self.reserved_ports:
  939. self._singleport_listener.release_port(port)
  940. del self.reserved_ports[:]
  941. if self.rescrewedported_port == r:
  942. return
  943. elif new_port is not None:
  944. r = new_port
  945. self.reserved_ports.remove(self.reported_port)
  946. self.reserved_ports.append(r)
  947. elif self._singleport_listener.port != self.reported_port:
  948. r = self._singleport_listener.get_port(self.change_port)
  949. self.reserved_ports.append(r)
  950. else:
  951. return
  952. self.reported_port = r
  953. self._myid = self._make_id()
  954. if self._connection_manager:
  955. self._connection_manager.my_id = self._myid
  956. if self._announced:
  957. self._rerequest_op().change_port(self._myid, r)
  958. def _announce_done(self):
  959. for port in self.reserved_ports[:-1]:
  960. self._singleport_listener.release_port(port, self.change_port)
  961. del self.reserved_ports[:-1]
  962. def _make_id(self):
  963. return PeerID.make_id()