Rerequester.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Greg Hazel
  11. # based on code by Bram Cohen, Uoti Urpala
  12. import sys
  13. import random
  14. import logging
  15. from binascii import b2a_hex
  16. from BTL.translation import _
  17. from BitTorrent import version
  18. from BTL.platform import bttime
  19. from urllib import quote
  20. from BitTorrent.btformats import check_peers
  21. from BTL.bencode import bencode, bdecode
  22. from BTL.exceptions import str_exc
  23. from BTL.defer import Failure
  24. from BTL import IPTools
  25. from BitTorrent import BTFailure
  26. from BitTorrent.prefs import Preferences
  27. from BitTorrent.HTTPDownloader import getPageFactory
  28. import twisted.internet.error
  29. LOG_RESPONSE = False
  30. class Rerequester(object):
  31. STATES = ['started', 'completed', 'stopped']
  32. def __init__(self, url, announce_list, config, sched, externalsched, rawserver,
  33. howmany, connect,
  34. amount_left, up, down, port, myid, infohash, errorfunc, doneflag,
  35. upratefunc, downratefunc, ever_got_incoming, diefunc, sfunc,
  36. has_dht):
  37. """
  38. @param url: tracker's announce URL.
  39. @param announce_list: ?
  40. @param config: preferences obj storing BitTorrent-wide
  41. configuration.
  42. @param sched: used to schedule events from inside rawserver's
  43. thread. (Oh boy. externalsched and sched.
  44. We expect Rerequester to
  45. recognize the difference between rawserver's
  46. thread and yet we go through the trouble of
  47. abstracting away rawserver using a callback...
  48. So what was the point? --Dave)
  49. @param externalsched: see sched. This one is called from outside
  50. rawserver's thread.
  51. @param howmany: callback to get the number of complete connections.
  52. @param connect: callback to establish a connection to a peer
  53. obtained from the tracker.
  54. @param amount_left: callback to obtain the number of bytes left to
  55. download for this torrent.
  56. @param up: callback to obtain the total number of bytes sent
  57. for this torrent.
  58. @param down: callback to obtain the total number of bytes
  59. received for this torrent.
  60. @param port: port to report to the tracker. If the local peer
  61. is behind a NAT then this is the local peer's port
  62. on the NAT facing the outside world.
  63. @param myid: local peer's unique (self-generated) id.
  64. @param infohash: hash of the info section of the metainfo file.
  65. @param errorfunc: callback to report errors.
  66. @param doneflag: when set all threads cleanup and then terminate.
  67. @param upratefunc: callback to obtain moving average rate on the
  68. uplink for this torrent.
  69. @param downratefunc: callback to obtain moving average rate on the
  70. downlink for this torrent.
  71. @param ever_got_incoming: callback to determine if this torrent
  72. has ever received any mesages from other peers.
  73. @param diefunc: callback that is called when announce fails to find
  74. any peers.
  75. @param sfunc: success function? With regard to what? --Dave
  76. """
  77. assert isinstance(url, str)
  78. assert isinstance(config, Preferences)
  79. assert type(port) in (int,long) and port > 0 and port < 65536, "Port: %s" % repr(port)
  80. assert callable(connect)
  81. assert callable(amount_left)
  82. assert callable(errorfunc)
  83. assert callable(upratefunc)
  84. assert callable(downratefunc)
  85. assert callable(ever_got_incoming)
  86. assert callable(diefunc)
  87. assert callable(sfunc)
  88. self.rawserver = rawserver
  89. self.dead = False
  90. self.baseurl = url
  91. self.announce_list = None
  92. if announce_list:
  93. # shuffle a new copy of the whole set only once
  94. shuffled_announce_list = []
  95. for tier in announce_list:
  96. # strip out udp urls
  97. shuffled_tier = [ u for u in tier if not u.lower().startswith("udp://") ]
  98. if not shuffled_tier:
  99. # strip blank lists
  100. continue
  101. random.shuffle(shuffled_tier)
  102. shuffled_announce_list.append(shuffled_tier)
  103. if shuffled_announce_list:
  104. self.announce_list = shuffled_announce_list
  105. self.tier = 0
  106. self.announce_i = 0
  107. self.baseurl = self.announce_list_next()
  108. self.announce_infohash = infohash
  109. self.peerid = None
  110. self.wanted_peerid = myid
  111. self.port = port
  112. self.url = None
  113. self.config = config
  114. self.last = None
  115. self.trackerid = None
  116. self.announce_interval = 30 * 60
  117. self.sched = sched
  118. self.howmany = howmany
  119. self.connect = connect
  120. self.amount_left = amount_left
  121. self.up = up
  122. self.down = down
  123. self.errorfunc = errorfunc
  124. self.doneflag = doneflag
  125. self.upratefunc = upratefunc
  126. self.downratefunc = downratefunc
  127. self.ever_got_incoming = ever_got_incoming
  128. self.diefunc = diefunc
  129. self.successfunc = sfunc
  130. self.finish = False
  131. self.current_started = None
  132. self.fail_wait = None
  133. self.last_time = bttime()
  134. self.previous_down = 0
  135. self.previous_up = 0
  136. self.tracker_num_peers = None
  137. self.tracker_num_seeds = None
  138. self.has_dht = has_dht
  139. def _makeurl(self, peerid, port):
  140. return ('%s?info_hash=%s&peer_id=%s&port=%s&key=%s' %
  141. (self.baseurl, quote(self.announce_infohash), quote(peerid), str(port),
  142. b2a_hex(''.join([chr(random.randrange(256)) for i in xrange(4)]))))
  143. def change_port(self, peerid, port):
  144. self.wanted_peerid = peerid
  145. self.port = port
  146. self.last = None
  147. self.trackerid = None
  148. self._check()
  149. def begin(self):
  150. if self.sched:
  151. self.sched(10, self.begin)
  152. self._check()
  153. def announce_list_success(self):
  154. tmp = self.announce_list[self.tier].pop(self.announce_i)
  155. self.announce_list[self.tier].insert(0, tmp)
  156. self.tier = 0
  157. self.announce_i = 0
  158. def announce_list_fail(self):
  159. """returns True if the announce-list was restarted"""
  160. self.announce_i += 1
  161. if self.announce_i == len(self.announce_list[self.tier]):
  162. self.announce_i = 0
  163. self.tier += 1
  164. if self.tier == len(self.announce_list):
  165. self.tier = 0
  166. return True
  167. return False
  168. def announce_list_next(self):
  169. return self.announce_list[self.tier][self.announce_i]
  170. def announce_finish(self):
  171. if self.dead:
  172. return
  173. self.finish = True
  174. self._check()
  175. def announce_stop(self):
  176. if self.dead:
  177. return
  178. self._announce('stopped')
  179. def _check(self):
  180. assert not self.dead
  181. #self.errorfunc(logging.INFO, 'check: ' + str(self.current_started))
  182. if self.current_started is not None:
  183. if (bttime() - self.current_started) >= 58:
  184. self.errorfunc(logging.WARNING,
  185. _("Tracker announce still not complete "
  186. "%d seconds after starting it") %
  187. int(bttime() - self.current_started))
  188. return
  189. if self.peerid is None:
  190. self.peerid = self.wanted_peerid
  191. self.url = self._makeurl(self.peerid, self.port)
  192. self._announce('started')
  193. return
  194. if self.peerid != self.wanted_peerid:
  195. # _announce will clean up these
  196. up = self.up
  197. down = self.down
  198. self._announce('stopped')
  199. self.peerid = None
  200. self.previous_up = up()
  201. self.previous_down = down()
  202. return
  203. if self.finish:
  204. self.finish = False
  205. self._announce('completed')
  206. return
  207. if self.fail_wait is not None:
  208. if self.last_time + self.fail_wait <= bttime():
  209. self._announce()
  210. return
  211. if self.last_time > bttime() - self.config['rerequest_interval']:
  212. return
  213. if self.ever_got_incoming():
  214. getmore = self.howmany() <= self.config['min_peers'] / 3
  215. else:
  216. getmore = self.howmany() < self.config['min_peers']
  217. if getmore or bttime() - self.last_time > self.announce_interval:
  218. self._announce()
  219. def get_next_announce_time_est(self):
  220. # I'm sure this is wrong, but _check is confusing
  221. return bttime() - (self.last_time + self.announce_interval)
  222. def _announce(self, event=None):
  223. assert not self.dead
  224. self.current_started = bttime()
  225. #self.errorfunc(logging.INFO, 'announce: ' +
  226. # str(bttime() - self.current_started))
  227. s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
  228. (self.url, str(self.up()*self.config.get('lie',1) - self.previous_up),
  229. str(self.down() - self.previous_down), str(self.amount_left())))
  230. if self.last is not None:
  231. s += '&last=' + quote(str(self.last))
  232. if self.trackerid is not None:
  233. s += '&trackerid=' + quote(str(self.trackerid))
  234. if self.howmany() >= self.config['max_initiate']:
  235. s += '&numwant=0'
  236. else:
  237. s += '&compact=1'
  238. if event is not None:
  239. s += '&event=' + event
  240. self._rerequest(s)
  241. # Must destroy all references that could cause reference circles
  242. def cleanup(self):
  243. self.dead = True
  244. self.sched = None
  245. self.howmany = None
  246. self.connect = None
  247. self.amount_left = None
  248. self.up = None
  249. self.down = None
  250. self.upratefunc = None
  251. self.downratefunc = None
  252. self.ever_got_incoming = None
  253. self.diefunc = None
  254. self.successfunc = None
  255. def _rerequest(self, url):
  256. if self.config['ip']:
  257. df = self.rawserver.reactor.resolve(self.config['ip'])
  258. def error(f):
  259. self.errorfunc(logging.WARNING,
  260. _("Problem resolving config ip (%r), "
  261. "gethostbyname failed") % self.config['ip'],
  262. exc_info=f.exc_info())
  263. df.addErrback(error)
  264. df.addCallback(lambda ip : self._rerequest2(url, ip=ip))
  265. else:
  266. self._rerequest2(url)
  267. def _rerequest2(self, url, ip=None):
  268. proxy = self.config.get('tracker_proxy', None)
  269. if not proxy:
  270. proxy = None
  271. bind = self.config.get('bind', None)
  272. if not bind:
  273. bind = None
  274. factory = getPageFactory(url,
  275. agent='BitTorrent/' + version,
  276. bindAddress=bind,
  277. proxy=proxy,
  278. timeout=60)
  279. df = factory.deferred
  280. df.addCallback(lambda d : self._postrequest(data=d))
  281. df.addErrback(lambda f : self._postrequest(failure=f))
  282. def _give_up(self):
  283. if self.howmany() == 0 and self.amount_left() > 0 and not self.has_dht:
  284. # sched shouldn't be strictly necessary
  285. def die():
  286. self.diefunc(logging.CRITICAL,
  287. _("Aborting the torrent as it could not "
  288. "connect to the tracker while not "
  289. "connected to any peers. "))
  290. self.sched(0, die)
  291. def _fail(self, exc=None, rejected=False):
  292. if self.announce_list:
  293. restarted = self.announce_list_fail()
  294. if restarted:
  295. self.fail_wait = None
  296. if rejected:
  297. self._give_up()
  298. else:
  299. self.baseurl = self.announce_list_next()
  300. self.peerid = None
  301. # If it was a dns error, try the new url right away. it's
  302. # probably not abusive since there was no one there to abuse.
  303. if isinstance(exc, twisted.internet.error.DNSLookupError):
  304. self._check()
  305. return
  306. else:
  307. if rejected:
  308. self._give_up()
  309. if self.fail_wait is None:
  310. self.fail_wait = 50
  311. else:
  312. self.fail_wait *= 1.4 + random.random() * .2
  313. self.fail_wait = min(self.fail_wait,
  314. self.config['max_announce_retry_interval'])
  315. def _make_errormsg(self, msg):
  316. proxy = self.config.get('tracker_proxy', None)
  317. url = self.baseurl
  318. if proxy:
  319. url = _("%s through proxy %s") % (url, proxy)
  320. return (_("Problem connecting to tracker (%s): %s") %
  321. (url, msg))
  322. def _postrequest(self, data=None, failure=None):
  323. #self.errorfunc(logging.INFO, 'postrequest(%s): %s d:%s f:%s' %
  324. # (self.__class__.__name__, self.current_started,
  325. # bool(data), bool(failure)))
  326. self.current_started = None
  327. self.last_time = bttime()
  328. if self.dead:
  329. return
  330. if failure is not None:
  331. if failure.type == twisted.internet.error.TimeoutError:
  332. m = _("Timeout while contacting server.")
  333. else:
  334. m = failure.getErrorMessage()
  335. self.errorfunc(logging.WARNING, self._make_errormsg(m))
  336. self._fail(failure.exc_info())
  337. return
  338. try:
  339. r = bdecode(data)
  340. if LOG_RESPONSE:
  341. self.errorfunc(logging.INFO, 'tracker said: %r' % r)
  342. check_peers(r)
  343. except BTFailure, e:
  344. if data:
  345. self.errorfunc(logging.ERROR,
  346. _("bad data from tracker (%r)") % data,
  347. exc_info=sys.exc_info())
  348. self._fail()
  349. return
  350. if (isinstance(r.get('complete'), (int, long)) and
  351. isinstance(r.get('incomplete'), (int, long))):
  352. self.tracker_num_seeds = r['complete']
  353. self.tracker_num_peers = r['incomplete']
  354. else:
  355. self.tracker_num_seeds = None
  356. self.tracker_num_peers = None
  357. if 'failure reason' in r:
  358. self.errorfunc(logging.ERROR,
  359. _("rejected by tracker - %s") % r['failure reason'])
  360. self._fail(rejected=True)
  361. return
  362. self.fail_wait = None
  363. if 'warning message' in r:
  364. self.errorfunc(logging.ERROR,
  365. _("warning from tracker - %s") % r['warning message'])
  366. self.announce_interval = r.get('interval', self.announce_interval)
  367. if 'min interval' in r:
  368. self.config['rerequest_interval'] = r['min interval']
  369. self.trackerid = r.get('tracker id', self.trackerid)
  370. self.last = r.get('last')
  371. p = r['peers']
  372. peers = {}
  373. if isinstance(p, str):
  374. for (ip, port) in IPTools.uncompact_sequence(p):
  375. peers[(ip, port)] = None
  376. else:
  377. for x in p:
  378. peers[(x['ip'], x['port'])] = x.get('peer id')
  379. ps = len(peers) + self.howmany()
  380. if ps < self.config['max_initiate']:
  381. if self.doneflag.isSet():
  382. if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
  383. self.last = None
  384. else:
  385. if r.get('num peers', 1000) > ps * 1.2:
  386. self.last = None
  387. if self.config['log_tracker_info']:
  388. self.errorfunc(logging.INFO, '%s tracker response: %d peers' %
  389. (self.__class__.__name__, len(peers)))
  390. for addr, pid in peers.iteritems():
  391. self.connect(addr, pid)
  392. if self.peerid == self.wanted_peerid:
  393. self.successfunc()
  394. if self.announce_list:
  395. self.announce_list_success()
  396. self._check()
  397. class DHTRerequester(Rerequester):
  398. def __init__(self, config, sched, howmany, connect, externalsched, rawserver,
  399. amount_left, up, down, port, myid, infohash, errorfunc, doneflag,
  400. upratefunc, downratefunc, ever_got_incoming, diefunc, sfunc, dht):
  401. self.dht = dht
  402. Rerequester.__init__(self, "http://localhost/announce", [], config, sched, externalsched, rawserver,
  403. howmany, connect,
  404. amount_left, up, down, port, myid, infohash, errorfunc, doneflag,
  405. upratefunc, downratefunc, ever_got_incoming, diefunc, sfunc, True)
  406. def _announce(self, event=None):
  407. self.current_started = bttime()
  408. self._rerequest("")
  409. def _make_errormsg(self, msg):
  410. return _("Trackerless lookup failed: %s") % msg
  411. def _rerequest(self, url):
  412. self.peers = ""
  413. try:
  414. self.dht.getPeersAndAnnounce(str(self.announce_infohash), self.port,
  415. self._got_peers)
  416. except Exception, e:
  417. self._postrequest(failure=Failure())
  418. def _got_peers(self, peers):
  419. if not self.howmany:
  420. return
  421. if not peers:
  422. self.peerid = self.wanted_peerid
  423. self._postrequest(bencode({'peers':''}))
  424. else:
  425. self.peerid = None
  426. self._postrequest(bencode({'peers':peers[0]}))
  427. def _announced_peers(self, nodes):
  428. pass
  429. def announce_stop(self):
  430. # don't do anything
  431. pass