Rerequester.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. # Written by Bram Cohen
  2. # modified for multitracker operation by John Hoffman
  3. # see LICENSE.txt for license information
  4. from BitTornado.zurllib import urlopen, quote
  5. from urlparse import urlparse, urlunparse
  6. from socket import gethostbyname
  7. from btformats import check_peers
  8. from BitTornado.bencode import bdecode
  9. from threading import Thread, Lock
  10. from cStringIO import StringIO
  11. from traceback import print_exc
  12. from socket import error, gethostbyname
  13. from random import shuffle
  14. from sha import sha
  15. from time import time
  16. try:
  17. from os import getpid
  18. except ImportError:
  19. def getpid():
  20. return 1
  21. try:
  22. True
  23. except:
  24. True = 1
  25. False = 0
  26. mapbase64 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz.-'
  27. keys = {}
  28. basekeydata = str(getpid()) + repr(time()) + 'tracker'
  29. def add_key(tracker):
  30. key = ''
  31. for i in sha(basekeydata+tracker).digest()[-6:]:
  32. key += mapbase64[ord(i) & 0x3F]
  33. keys[tracker] = key
  34. def get_key(tracker):
  35. try:
  36. return "&key="+keys[tracker]
  37. except:
  38. add_key(tracker)
  39. return "&key="+keys[tracker]
  40. class fakeflag:
  41. def __init__(self, state=False):
  42. self.state = state
  43. def wait(self):
  44. pass
  45. def isSet(self):
  46. return self.state
  47. class Rerequester:
  48. def __init__( self, port, myid, infohash, trackerlist, config,
  49. sched, externalsched, errorfunc, excfunc, connect,
  50. howmany, amount_left, up, down, upratefunc, downratefunc,
  51. doneflag, unpauseflag = fakeflag(True),
  52. seededfunc = None, force_rapid_update = False ):
  53. self.sched = sched
  54. self.externalsched = externalsched
  55. self.errorfunc = errorfunc
  56. self.excfunc = excfunc
  57. self.connect = connect
  58. self.howmany = howmany
  59. self.amount_left = amount_left
  60. self.up = up
  61. self.down = down
  62. self.upratefunc = upratefunc
  63. self.downratefunc = downratefunc
  64. self.doneflag = doneflag
  65. self.unpauseflag = unpauseflag
  66. self.seededfunc = seededfunc
  67. self.force_rapid_update = force_rapid_update
  68. self.ip = config.get('ip','')
  69. self.minpeers = config['min_peers']
  70. self.maxpeers = config['max_initiate']
  71. self.interval = config['rerequest_interval']
  72. self.timeout = config['http_timeout']
  73. newtrackerlist = []
  74. for tier in trackerlist:
  75. if len(tier)>1:
  76. shuffle(tier)
  77. newtrackerlist += [tier]
  78. self.trackerlist = newtrackerlist
  79. self.lastsuccessful = ''
  80. self.rejectedmessage = 'rejected by tracker - '
  81. self.url = ('info_hash=%s&peer_id=%s' %
  82. (quote(infohash), quote(myid)))
  83. if not config.get('crypto_allowed'):
  84. self.url += "&port="
  85. else:
  86. self.url += "&supportcrypto=1"
  87. if not config.get('crypto_only'):
  88. self.url += "&port="
  89. else:
  90. self.url += "&requirecrypto=1"
  91. if not config.get('crypto_stealth'):
  92. self.url += "&port="
  93. else:
  94. self.url += "&port=0&cryptoport="
  95. self.url += str(port)
  96. seed_id = config.get('dedicated_seed_id')
  97. if seed_id:
  98. self.url += '&seed_id='+quote(seed_id)
  99. if self.seededfunc:
  100. self.url += '&check_seeded=1'
  101. self.last = None
  102. self.trackerid = None
  103. self.announce_interval = 30 * 60
  104. self.last_failed = True
  105. self.never_succeeded = True
  106. self.errorcodes = {}
  107. self.lock = SuccessLock()
  108. self.special = None
  109. self.stopped = False
  110. def start(self):
  111. self.sched(self.c, self.interval/2)
  112. self.d(0)
  113. def c(self):
  114. if self.stopped:
  115. return
  116. if not self.unpauseflag.isSet() and (
  117. self.howmany() < self.minpeers or self.force_rapid_update ):
  118. self.announce(3, self._c)
  119. else:
  120. self._c()
  121. def _c(self):
  122. self.sched(self.c, self.interval)
  123. def d(self, event = 3):
  124. if self.stopped:
  125. return
  126. if not self.unpauseflag.isSet():
  127. self._d()
  128. return
  129. self.announce(event, self._d)
  130. def _d(self):
  131. if self.never_succeeded:
  132. self.sched(self.d, 60) # retry in 60 seconds
  133. elif self.force_rapid_update:
  134. return
  135. else:
  136. self.sched(self.d, self.announce_interval)
  137. def hit(self, event = 3):
  138. if not self.unpauseflag.isSet() and (
  139. self.howmany() < self.minpeers or self.force_rapid_update ):
  140. self.announce(event)
  141. def announce(self, event = 3, callback = lambda: None, specialurl = None):
  142. if specialurl is not None:
  143. s = self.url+'&uploaded=0&downloaded=0&left=1' # don't add to statistics
  144. if self.howmany() >= self.maxpeers:
  145. s += '&numwant=0'
  146. else:
  147. s += '&no_peer_id=1&compact=1'
  148. self.last_failed = True # force true, so will display an error
  149. self.special = specialurl
  150. self.rerequest(s, callback)
  151. return
  152. else:
  153. s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
  154. (self.url, str(self.up()), str(self.down()),
  155. str(self.amount_left())))
  156. if self.last is not None:
  157. s += '&last=' + quote(str(self.last))
  158. if self.trackerid is not None:
  159. s += '&trackerid=' + quote(str(self.trackerid))
  160. if self.howmany() >= self.maxpeers:
  161. s += '&numwant=0'
  162. else:
  163. s += '&no_peer_id=1&compact=1'
  164. if event != 3:
  165. s += '&event=' + ['started', 'completed', 'stopped'][event]
  166. if event == 2:
  167. self.stopped = True
  168. self.rerequest(s, callback)
  169. def snoop(self, peers, callback = lambda: None): # tracker call support
  170. self.rerequest(self.url
  171. +'&event=stopped&port=0&uploaded=0&downloaded=0&left=1&tracker=1&numwant='
  172. +str(peers), callback)
  173. def rerequest(self, s, callback):
  174. if not self.lock.isfinished(): # still waiting for prior cycle to complete??
  175. def retry(self = self, s = s, callback = callback):
  176. self.rerequest(s, callback)
  177. self.sched(retry,5) # retry in 5 seconds
  178. return
  179. self.lock.reset()
  180. rq = Thread(target = self._rerequest, args = [s, callback])
  181. rq.setDaemon(False)
  182. rq.start()
  183. def _rerequest(self, s, callback):
  184. try:
  185. def fail (self = self, callback = callback):
  186. self._fail(callback)
  187. if self.ip:
  188. try:
  189. s += '&ip=' + gethostbyname(self.ip)
  190. except:
  191. self.errorcodes['troublecode'] = 'unable to resolve: '+self.ip
  192. self.externalsched(fail)
  193. self.errorcodes = {}
  194. if self.special is None:
  195. for t in range(len(self.trackerlist)):
  196. for tr in range(len(self.trackerlist[t])):
  197. tracker = self.trackerlist[t][tr]
  198. if self.rerequest_single(tracker, s, callback):
  199. if not self.last_failed and tr != 0:
  200. del self.trackerlist[t][tr]
  201. self.trackerlist[t] = [tracker] + self.trackerlist[t]
  202. return
  203. else:
  204. tracker = self.special
  205. self.special = None
  206. if self.rerequest_single(tracker, s, callback):
  207. return
  208. # no success from any tracker
  209. self.externalsched(fail)
  210. except:
  211. self.exception(callback)
  212. def _fail(self, callback):
  213. if ( (self.upratefunc() < 100 and self.downratefunc() < 100)
  214. or not self.amount_left() ):
  215. for f in ['rejected', 'bad_data', 'troublecode']:
  216. if self.errorcodes.has_key(f):
  217. r = self.errorcodes[f]
  218. break
  219. else:
  220. r = 'Problem connecting to tracker - unspecified error'
  221. self.errorfunc(r)
  222. self.last_failed = True
  223. self.lock.give_up()
  224. self.externalsched(callback)
  225. def rerequest_single(self, t, s, callback):
  226. l = self.lock.set()
  227. rq = Thread(target = self._rerequest_single, args = [t, s+get_key(t), l, callback])
  228. rq.setDaemon(False)
  229. rq.start()
  230. self.lock.wait()
  231. if self.lock.success:
  232. self.lastsuccessful = t
  233. self.last_failed = False
  234. self.never_succeeded = False
  235. return True
  236. if not self.last_failed and self.lastsuccessful == t:
  237. # if the last tracker hit was successful, and you've just tried the tracker
  238. # you'd contacted before, don't go any further, just fail silently.
  239. self.last_failed = True
  240. self.externalsched(callback)
  241. self.lock.give_up()
  242. return True
  243. return False # returns true if it wants rerequest() to exit
  244. def _rerequest_single(self, t, s, l, callback):
  245. try:
  246. closer = [None]
  247. def timedout(self = self, l = l, closer = closer):
  248. if self.lock.trip(l):
  249. self.errorcodes['troublecode'] = 'Problem connecting to tracker - timeout exceeded'
  250. self.lock.unwait(l)
  251. try:
  252. closer[0]()
  253. except:
  254. pass
  255. self.externalsched(timedout, self.timeout)
  256. err = None
  257. try:
  258. url,q = t.split('?',1)
  259. q += '&'+s
  260. except:
  261. url = t
  262. q = s
  263. try:
  264. h = urlopen(url+'?'+q)
  265. closer[0] = h.close
  266. data = h.read()
  267. except (IOError, error), e:
  268. err = 'Problem connecting to tracker - ' + str(e)
  269. except:
  270. err = 'Problem connecting to tracker'
  271. try:
  272. h.close()
  273. except:
  274. pass
  275. if err:
  276. if self.lock.trip(l):
  277. self.errorcodes['troublecode'] = err
  278. self.lock.unwait(l)
  279. return
  280. if data == '':
  281. if self.lock.trip(l):
  282. self.errorcodes['troublecode'] = 'no data from tracker'
  283. self.lock.unwait(l)
  284. return
  285. try:
  286. r = bdecode(data, sloppy=1)
  287. check_peers(r)
  288. except ValueError, e:
  289. if self.lock.trip(l):
  290. self.errorcodes['bad_data'] = 'bad data from tracker - ' + str(e)
  291. self.lock.unwait(l)
  292. return
  293. if r.has_key('failure reason'):
  294. if self.lock.trip(l):
  295. self.errorcodes['rejected'] = self.rejectedmessage + r['failure reason']
  296. self.lock.unwait(l)
  297. return
  298. if self.lock.trip(l, True): # success!
  299. self.lock.unwait(l)
  300. else:
  301. callback = lambda: None # attempt timed out, don't do a callback
  302. # even if the attempt timed out, go ahead and process data
  303. def add(self = self, r = r, callback = callback):
  304. self.postrequest(r, callback)
  305. self.externalsched(add)
  306. except:
  307. self.exception(callback)
  308. def postrequest(self, r, callback):
  309. if r.has_key('warning message'):
  310. self.errorfunc('warning from tracker - ' + r['warning message'])
  311. self.announce_interval = r.get('interval', self.announce_interval)
  312. self.interval = r.get('min interval', self.interval)
  313. self.trackerid = r.get('tracker id', self.trackerid)
  314. self.last = r.get('last')
  315. # ps = len(r['peers']) + self.howmany()
  316. p = r['peers']
  317. peers = []
  318. if type(p) == type(''):
  319. lenpeers = len(p)/6
  320. else:
  321. lenpeers = len(p)
  322. cflags = r.get('crypto_flags')
  323. if type(cflags) != type('') or len(cflags) != lenpeers:
  324. cflags = None
  325. if cflags is None:
  326. cflags = [None for i in xrange(lenpeers)]
  327. else:
  328. cflags = [ord(x) for x in cflags]
  329. if type(p) == type(''):
  330. for x in xrange(0, len(p), 6):
  331. ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
  332. port = (ord(p[x+4]) << 8) | ord(p[x+5])
  333. peers.append(((ip, port), 0, cflags[int(x/6)]))
  334. else:
  335. for i in xrange(len(p)):
  336. x = p[i]
  337. peers.append(((x['ip'].strip(), x['port']),
  338. x.get('peer id',0), cflags[i]))
  339. ps = len(peers) + self.howmany()
  340. if ps < self.maxpeers:
  341. if self.doneflag.isSet():
  342. if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
  343. self.last = None
  344. else:
  345. if r.get('num peers', 1000) > ps * 1.2:
  346. self.last = None
  347. if self.seededfunc and r.get('seeded'):
  348. self.seededfunc()
  349. elif peers:
  350. shuffle(peers)
  351. self.connect(peers)
  352. callback()
  353. def exception(self, callback):
  354. data = StringIO()
  355. print_exc(file = data)
  356. def r(s = data.getvalue(), callback = callback):
  357. if self.excfunc:
  358. self.excfunc(s)
  359. else:
  360. print s
  361. callback()
  362. self.externalsched(r)
  363. class SuccessLock:
  364. def __init__(self):
  365. self.lock = Lock()
  366. self.pause = Lock()
  367. self.code = 0L
  368. self.success = False
  369. self.finished = True
  370. def reset(self):
  371. self.success = False
  372. self.finished = False
  373. def set(self):
  374. self.lock.acquire()
  375. if not self.pause.locked():
  376. self.pause.acquire()
  377. self.first = True
  378. self.code += 1L
  379. self.lock.release()
  380. return self.code
  381. def trip(self, code, s = False):
  382. self.lock.acquire()
  383. try:
  384. if code == self.code and not self.finished:
  385. r = self.first
  386. self.first = False
  387. if s:
  388. self.finished = True
  389. self.success = True
  390. return r
  391. finally:
  392. self.lock.release()
  393. def give_up(self):
  394. self.lock.acquire()
  395. self.success = False
  396. self.finished = True
  397. self.lock.release()
  398. def wait(self):
  399. self.pause.acquire()
  400. def unwait(self, code):
  401. if code == self.code and self.pause.locked():
  402. self.pause.release()
  403. def isfinished(self):
  404. self.lock.acquire()
  405. x = self.finished
  406. self.lock.release()
  407. return x