T2T.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # Written by John Hoffman
  2. # see LICENSE.txt for license information
  3. from Rerequester import Rerequester
  4. from urllib import quote
  5. from threading import Event
  6. from random import randrange
  7. from string import lower
  8. import sys
  9. import __init__
  10. try:
  11. True
  12. except:
  13. True = 1
  14. False = 0
  15. DEBUG = True
  16. def excfunc(x):
  17. print x
  18. R_0 = lambda: 0
  19. R_1 = lambda: 1
  20. class T2TConnection:
  21. def __init__(self, myid, tracker, hash, interval, peers, timeout,
  22. rawserver, disallow, isdisallowed):
  23. self.tracker = tracker
  24. self.interval = interval
  25. self.hash = hash
  26. self.operatinginterval = interval
  27. self.peers = peers
  28. self.rawserver = rawserver
  29. self.disallow = disallow
  30. self.isdisallowed = isdisallowed
  31. self.active = True
  32. self.busy = False
  33. self.errors = 0
  34. self.rejected = 0
  35. self.trackererror = False
  36. self.peerlists = []
  37. cfg = { 'min_peers': peers,
  38. 'max_initiate': peers,
  39. 'rerequest_interval': interval,
  40. 'http_timeout': timeout }
  41. self.rerequester = Rerequester( 0, myid, hash, [[tracker]], cfg,
  42. rawserver.add_task, rawserver.add_task, self.errorfunc, excfunc,
  43. self.addtolist, R_0, R_1, R_0, R_0, R_0, R_0,
  44. Event() )
  45. if self.isactive():
  46. rawserver.add_task(self.refresh, randrange(int(self.interval/10), self.interval))
  47. # stagger announces
  48. def isactive(self):
  49. if self.isdisallowed(self.tracker): # whoops!
  50. self.deactivate()
  51. return self.active
  52. def deactivate(self):
  53. self.active = False
  54. def refresh(self):
  55. if not self.isactive():
  56. return
  57. self.lastsuccessful = True
  58. self.newpeerdata = []
  59. if DEBUG:
  60. print 'contacting %s for info_hash=%s' % (self.tracker, quote(self.hash))
  61. self.rerequester.snoop(self.peers, self.callback)
  62. def callback(self):
  63. self.busy = False
  64. if self.lastsuccessful:
  65. self.errors = 0
  66. self.rejected = 0
  67. if self.rerequester.announce_interval > (3*self.interval):
  68. # I think I'm stripping from a regular tracker; boost the number of peers requested
  69. self.peers = int(self.peers * (self.rerequester.announce_interval / self.interval))
  70. self.operatinginterval = self.rerequester.announce_interval
  71. if DEBUG:
  72. print ("%s with info_hash=%s returned %d peers" %
  73. (self.tracker, quote(self.hash), len(self.newpeerdata)))
  74. self.peerlists.append(self.newpeerdata)
  75. self.peerlists = self.peerlists[-10:] # keep up to the last 10 announces
  76. if self.isactive():
  77. self.rawserver.add_task(self.refresh, self.operatinginterval)
  78. def addtolist(self, peers):
  79. for peer in peers:
  80. self.newpeerdata.append((peer[1],peer[0][0],peer[0][1]))
  81. def errorfunc(self, r):
  82. self.lastsuccessful = False
  83. if DEBUG:
  84. print "%s with info_hash=%s gives error: '%s'" % (self.tracker, quote(self.hash), r)
  85. if r == self.rerequester.rejectedmessage + 'disallowed': # whoops!
  86. if DEBUG:
  87. print ' -- disallowed - deactivating'
  88. self.deactivate()
  89. self.disallow(self.tracker) # signal other torrents on this tracker
  90. return
  91. if lower(r[:8]) == 'rejected': # tracker rejected this particular torrent
  92. self.rejected += 1
  93. if self.rejected == 3: # rejected 3 times
  94. if DEBUG:
  95. print ' -- rejected 3 times - deactivating'
  96. self.deactivate()
  97. return
  98. self.errors += 1
  99. if self.errors >= 3: # three or more errors in a row
  100. self.operatinginterval += self.interval # lengthen the interval
  101. if DEBUG:
  102. print ' -- lengthening interval to '+str(self.operatinginterval)+' seconds'
  103. def harvest(self):
  104. x = []
  105. for list in self.peerlists:
  106. x += list
  107. self.peerlists = []
  108. return x
  109. class T2TList:
  110. def __init__(self, enabled, trackerid, interval, maxpeers, timeout, rawserver):
  111. self.enabled = enabled
  112. self.trackerid = trackerid
  113. self.interval = interval
  114. self.maxpeers = maxpeers
  115. self.timeout = timeout
  116. self.rawserver = rawserver
  117. self.list = {}
  118. self.torrents = {}
  119. self.disallowed = {}
  120. self.oldtorrents = []
  121. def parse(self, allowed_list):
  122. if not self.enabled:
  123. return
  124. # step 1: Create a new list with all tracker/torrent combinations in allowed_dir
  125. newlist = {}
  126. for hash, data in allowed_list.items():
  127. if data.has_key('announce-list'):
  128. for tier in data['announce-list']:
  129. for tracker in tier:
  130. self.disallowed.setdefault(tracker, False)
  131. newlist.setdefault(tracker, {})
  132. newlist[tracker][hash] = None # placeholder
  133. # step 2: Go through and copy old data to the new list.
  134. # if the new list has no place for it, then it's old, so deactivate it
  135. for tracker, hashdata in self.list.items():
  136. for hash, t2t in hashdata.items():
  137. if not newlist.has_key(tracker) or not newlist[tracker].has_key(hash):
  138. t2t.deactivate() # this connection is no longer current
  139. self.oldtorrents += [t2t]
  140. # keep it referenced in case a thread comes along and tries to access.
  141. else:
  142. newlist[tracker][hash] = t2t
  143. if not newlist.has_key(tracker):
  144. self.disallowed[tracker] = False # reset when no torrents on it left
  145. self.list = newlist
  146. newtorrents = {}
  147. # step 3: If there are any entries that haven't been initialized yet, do so.
  148. # At the same time, copy all entries onto the by-torrent list.
  149. for tracker, hashdata in newlist.items():
  150. for hash, t2t in hashdata.items():
  151. if t2t is None:
  152. hashdata[hash] = T2TConnection(self.trackerid, tracker, hash,
  153. self.interval, self.maxpeers, self.timeout,
  154. self.rawserver, self._disallow, self._isdisallowed)
  155. newtorrents.setdefault(hash,[])
  156. newtorrents[hash] += [hashdata[hash]]
  157. self.torrents = newtorrents
  158. # structures:
  159. # list = {tracker: {hash: T2TConnection, ...}, ...}
  160. # torrents = {hash: [T2TConnection, ...]}
  161. # disallowed = {tracker: flag, ...}
  162. # oldtorrents = [T2TConnection, ...]
  163. def _disallow(self,tracker):
  164. self.disallowed[tracker] = True
  165. def _isdisallowed(self,tracker):
  166. return self.disallowed[tracker]
  167. def harvest(self,hash):
  168. harvest = []
  169. if self.enabled:
  170. for t2t in self.torrents[hash]:
  171. harvest += t2t.harvest()
  172. return harvest