1
0

utkhashmir.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. import khashmir, knode
  11. from actions import *
  12. from khash import newID
  13. from krpc import KRPCProtocolError, KRPCFailSilently
  14. from BTL.cache import Cache
  15. from sha import sha
  16. from util import *
  17. from BTL.stackthreading import Thread
  18. from socket import gethostbyname
  19. from const import *
  20. from kstore import sample
  21. TOKEN_UPDATE_INTERVAL = 5 * 60 # five minutes
  22. NUM_PEERS = 50 # number of peers to return
  23. class UTNode(knode.KNodeBase):
  24. def announcePeer(self, info_hash, port, khashmir_id):
  25. assert type(port) == type(1)
  26. assert type(info_hash) == type('')
  27. assert type(khashmir_id) == type('')
  28. assert len(info_hash) == 20
  29. assert len(khashmir_id) == 20
  30. try:
  31. token = self.table.tcache[self.id]
  32. except:
  33. token = None
  34. if token:
  35. assert type(token) == type(""), repr(token)
  36. # not true
  37. #assert len(token) == 20, repr(token)
  38. df = self.conn().sendRequest('announce_peer', {'info_hash':info_hash,
  39. 'port':port,
  40. 'id':khashmir_id,
  41. 'token':token})
  42. else:
  43. raise KRPCProtocolError("no write token for node")
  44. df.addErrback(self.errBack)
  45. df.addCallback(self.checkSender)
  46. return df
  47. def getPeers(self, info_hash, khashmir_id):
  48. df = self.conn().sendRequest('get_peers', {'info_hash':info_hash, 'id':khashmir_id})
  49. df.addErrback(self.errBack)
  50. df.addCallback(self.checkSender)
  51. return df
  52. def checkSender(self, dict):
  53. d = knode.KNodeBase.checkSender(self, dict)
  54. try:
  55. token = d['rsp']['token']
  56. assert type(token) == type(""), repr(token)
  57. # not true
  58. #assert len(token) == 20, repr(token)
  59. self.table.tcache[d['rsp']['id']] = token
  60. except KeyError:
  61. pass
  62. return d
  63. class UTStoreValue(StoreValue):
  64. def callNode(self, node, f):
  65. return f(self.target, self.value, node.token, self.table.node.id)
  66. class UTKhashmir(khashmir.KhashmirBase):
  67. _Node = UTNode
  68. def setup(self, host, port, data_dir, rlcount, checkpoint=True):
  69. khashmir.KhashmirBase.setup(self, host, port,data_dir, rlcount, checkpoint)
  70. self.cur_token = self.last_token = sha('')
  71. self.tcache = Cache()
  72. self.gen_token(loop=True)
  73. self.expire_cached_tokens(loop=True)
  74. def expire_cached_tokens(self, loop=False):
  75. self.tcache.expire(time() - TOKEN_UPDATE_INTERVAL)
  76. if loop:
  77. self.rawserver.external_add_task(TOKEN_UPDATE_INTERVAL,
  78. self.expire_cached_tokens, True)
  79. def gen_token(self, loop=False):
  80. self.last_token = self.cur_token
  81. self.cur_token = sha(newID())
  82. if loop:
  83. self.rawserver.external_add_task(TOKEN_UPDATE_INTERVAL,
  84. self.gen_token, True)
  85. def get_token(self, host, port):
  86. x = self.cur_token.copy()
  87. x.update("%s%s" % (host, port))
  88. h = x.digest()
  89. return h
  90. def val_token(self, token, host, port):
  91. x = self.cur_token.copy()
  92. x.update("%s%s" % (host, port))
  93. a = x.digest()
  94. if token == a:
  95. return True
  96. x = self.last_token.copy()
  97. x.update("%s%s" % (host, port))
  98. b = x.digest()
  99. if token == b:
  100. return True
  101. return False
  102. def addContact(self, host, port, callback=None):
  103. # use dns on host, then call khashmir.addContact
  104. Thread(target=self._get_host, args=[host, port, callback]).start()
  105. def _get_host(self, host, port, callback):
  106. # this exception catch can go away once we actually fix the bug
  107. try:
  108. ip = gethostbyname(host)
  109. except TypeError, e:
  110. raise TypeError(str(e) + (": host(%s) port(%s)" % (repr(host), repr(port))))
  111. self.rawserver.external_add_task(0, self._got_host, ip, port, callback)
  112. def _got_host(self, host, port, callback):
  113. khashmir.KhashmirBase.addContact(self, host, port, callback)
  114. def announcePeer(self, info_hash, port, callback=None):
  115. """ stores the value for key in the global table, returns immediately, no status
  116. in this implementation, peers respond but don't indicate status to storing values
  117. a key can have many values
  118. """
  119. def _storeValueForKey(nodes, key=info_hash, value=port, response=callback , table=self.table):
  120. if not response:
  121. # default callback
  122. def _storedValueHandler(sender):
  123. pass
  124. response=_storedValueHandler
  125. action = UTStoreValue(self, key, value, response, self.rawserver.add_task, "announcePeer")
  126. self.rawserver.external_add_task(0, action.goWithNodes, nodes)
  127. # this call is asynch
  128. self.findNode(info_hash, _storeValueForKey)
  129. def krpc_announce_peer(self, info_hash, port, id, token, _krpc_sender):
  130. sender = {'id' : id}
  131. sender['host'] = _krpc_sender[0]
  132. sender['port'] = _krpc_sender[1]
  133. if not self.val_token(token, sender['host'], sender['port']):
  134. raise KRPCProtocolError("Invalid Write Token")
  135. value = compact_peer_info(_krpc_sender[0], port)
  136. self.store[info_hash] = value
  137. n = self.Node().initWithDict(sender)
  138. self.insertNode(n, contacted=0)
  139. return {"id" : self.node.id}
  140. def retrieveValues(self, key):
  141. try:
  142. l = self.store.sample(key, NUM_PEERS)
  143. except KeyError:
  144. l = []
  145. return l
  146. def getPeers(self, info_hash, callback, searchlocal = 1):
  147. """ returns the values found for key in global table
  148. callback will be called with a list of values for each peer that returns unique values
  149. final callback will be an empty list - probably should change to 'more coming' arg
  150. """
  151. nodes = self.table.findNodes(info_hash, invalid=True)
  152. l = [x for x in nodes if x.invalid]
  153. if len(l) > 4:
  154. nodes = sample(l , 4) + self.table.findNodes(info_hash, invalid=False)[:4]
  155. # get locals
  156. if searchlocal:
  157. l = self.retrieveValues(info_hash)
  158. if len(l) > 0:
  159. self.rawserver.external_add_task(0, callback, [reducePeers(l)])
  160. else:
  161. l = []
  162. # create our search state
  163. state = GetValue(self, info_hash, callback, self.rawserver.add_task, 'getPeers')
  164. self.rawserver.external_add_task(0, state.goWithNodes, nodes, l)
  165. def getPeersAndAnnounce(self, info_hash, port, callback, searchlocal = 1):
  166. """ returns the values found for key in global table
  167. callback will be called with a list of values for each peer that returns unique values
  168. final callback will be an empty list - probably should change to 'more coming' arg
  169. """
  170. nodes = self.table.findNodes(info_hash, invalid=False)
  171. nodes += self.table.findNodes(info_hash, invalid=True)
  172. # get locals
  173. if searchlocal:
  174. l = self.retrieveValues(info_hash)
  175. if len(l) > 0:
  176. self.rawserver.external_add_task(0, callback, [reducePeers(l)])
  177. else:
  178. l = []
  179. # create our search state
  180. x = lambda a: a
  181. state = GetAndStore(self, info_hash, port, callback, x, self.rawserver.add_task, 'getPeers', "announcePeer")
  182. self.rawserver.external_add_task(0, state.goWithNodes, nodes, l)
  183. def krpc_get_peers(self, info_hash, id, _krpc_sender):
  184. sender = {'id' : id}
  185. sender['host'] = _krpc_sender[0]
  186. sender['port'] = _krpc_sender[1]
  187. n = self.Node().initWithDict(sender)
  188. self.insertNode(n, contacted=0)
  189. l = self.retrieveValues(info_hash)
  190. if len(l) > 0:
  191. return {'values' : [reducePeers(l)],
  192. "id": self.node.id,
  193. "token" : self.get_token(sender['host'], sender['port'])}
  194. else:
  195. nodes = self.table.findNodes(info_hash, invalid=False)
  196. nodes = [node.senderDict() for node in nodes]
  197. return {'nodes' : packNodes(nodes),
  198. "id": self.node.id,
  199. "token" : self.get_token(sender['host'], sender['port'])}