1
0

actions.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. from BTL.platform import bttime as time
  11. import const
  12. from khash import intify
  13. from ktable import KTable, K
  14. from util import unpackNodes
  15. from krpc import KRPCProtocolError, KRPCSelfNodeError
  16. from bisect import insort
  17. class NodeWrap(object):
  18. def __init__(self, node, target):
  19. self.target = target
  20. self.node = node
  21. def __cmp__(self, o):
  22. """ this function is for sorting nodes relative to the ID we are looking for """
  23. y, x = self.target ^ o.node.num, self.target ^ self.node.num
  24. if x > y:
  25. return 1
  26. elif x < y:
  27. return -1
  28. return 0
  29. class ActionBase(object):
  30. """ base class for some long running asynchronous proccesses like finding nodes or values """
  31. def __init__(self, table, target, callback, callLater):
  32. self.table = table
  33. self.target = target
  34. self.callLater = callLater
  35. self.num = intify(target)
  36. self.found = {}
  37. self.foundq = []
  38. self.queried = {}
  39. self.queriedip = {}
  40. self.answered = {}
  41. self.answeredq = []
  42. self.callback = callback
  43. self.outstanding = 0
  44. self.finished = 0
  45. def sort(self, a, b):
  46. """ this function is for sorting nodes relative to the ID we are looking for """
  47. x, y = self.num ^ a.num, self.num ^ b.num
  48. if x > y:
  49. return 1
  50. elif x < y:
  51. return -1
  52. return 0
  53. def shouldQuery(self, node):
  54. if node.id == self.table.node.id:
  55. return False
  56. elif (node.host, node.port) not in self.queriedip and node.id not in self.queried:
  57. self.queriedip[(node.host, node.port)] = 1
  58. self.queried[node.id] = 1
  59. return True
  60. return False
  61. def _cleanup(self):
  62. self.foundq = None
  63. self.found = None
  64. self.queried = None
  65. self.queriedip = None
  66. def goWithNodes(self, t):
  67. pass
  68. FIND_NODE_TIMEOUT = 15
  69. class FindNode(ActionBase):
  70. """ find node action merits it's own class as it is a long running stateful process """
  71. def handleGotNodes(self, dict):
  72. _krpc_sender = dict['_krpc_sender']
  73. dict = dict['rsp']
  74. sender = {'id' : dict["id"]}
  75. sender['port'] = _krpc_sender[1]
  76. sender['host'] = _krpc_sender[0]
  77. sender = self.table.Node().initWithDict(sender)
  78. try:
  79. l = unpackNodes(dict.get("nodes", []))
  80. if not self.answered.has_key(sender.id):
  81. self.answered[sender.id] = sender
  82. insort(self.answeredq, NodeWrap(sender, self.num))
  83. except:
  84. l = []
  85. self.table.invalidateNode(sender)
  86. if self.finished:
  87. # a day late and a dollar short
  88. return
  89. self.outstanding = self.outstanding - 1
  90. for node in l:
  91. n = self.table.Node().initWithDict(node)
  92. if not self.found.has_key(n.id) and not self.queried.has_key(n.id):
  93. self.found[n.id] = n
  94. insort(self.foundq, NodeWrap(n, self.num))
  95. self.table.insertNode(n, contacted=0)
  96. self.schedule()
  97. def finish(self, result):
  98. self.finished=1
  99. self._cleanup()
  100. self.callLater(0, self.callback, result)
  101. def schedule(self):
  102. """
  103. send messages to new peers, if necessary
  104. """
  105. if self.finished:
  106. return
  107. for wrapper in self.foundq:
  108. node = wrapper.node
  109. if self.shouldQuery(node):
  110. if len(self.answeredq) >= K and self.answeredq[K-1] < wrapper:
  111. break
  112. #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
  113. try:
  114. df = node.findNode(self.target, self.table.node.id)
  115. except KRPCSelfNodeError:
  116. pass
  117. else:
  118. df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
  119. self.outstanding = self.outstanding + 1
  120. if self.outstanding >= const.CONCURRENT_REQS:
  121. break
  122. assert(self.outstanding) >=0
  123. if self.outstanding == 0:
  124. self.finish(self.answeredq[:K])
  125. def makeMsgFailed(self, node):
  126. return lambda err : self._defaultGotNodes(err, node)
  127. def _defaultGotNodes(self, err, node):
  128. self.outstanding = self.outstanding - 1
  129. self.schedule()
  130. def goWithNodes(self, nodes):
  131. """
  132. this starts the process, our argument is a transaction with t.extras being our list of nodes
  133. it's a transaction since we got called from the dispatcher
  134. """
  135. for node in nodes:
  136. if node.id == self.table.node.id:
  137. continue
  138. else:
  139. self.found[node.id] = node
  140. insort(self.foundq, NodeWrap(node, self.num))
  141. self.schedule()
  142. get_value_timeout = 15
  143. class GetValue(FindNode):
  144. def __init__(self, table, target, callback, callLater, find="findValue"):
  145. FindNode.__init__(self, table, target, callback, callLater)
  146. self.findValue = find
  147. """ get value task """
  148. def handleGotNodes(self, dict):
  149. _krpc_sender = dict['_krpc_sender']
  150. dict = dict['rsp']
  151. sender = {'id' : dict["id"]}
  152. sender['port'] = _krpc_sender[1]
  153. sender['host'] = _krpc_sender[0]
  154. sender = self.table.Node().initWithDict(sender)
  155. if self.finished or self.answered.has_key(sender.id):
  156. # a day late and a dollar short
  157. return
  158. self.outstanding = self.outstanding - 1
  159. answered = True
  160. # go through nodes
  161. # if we have any closer than what we already got, query them
  162. if dict.has_key('nodes'):
  163. try:
  164. l = unpackNodes(dict.get('nodes',[]))
  165. except:
  166. # considered an incorrect answer
  167. answered = False
  168. l = []
  169. for node in l:
  170. n = self.table.Node().initWithDict(node)
  171. if not self.found.has_key(n.id):
  172. self.table.insertNode(n)
  173. self.found[n.id] = n
  174. insort(self.foundq, NodeWrap(n, self.num))
  175. elif dict.has_key('values'):
  176. def x(y, z=self.results):
  177. if not z.has_key(y):
  178. z[y] = 1
  179. return y
  180. else:
  181. return None
  182. z = len(dict.get('values', []))
  183. v = filter(None, map(x, dict.get('values',[])))
  184. if(len(v)):
  185. self.callLater(0, self.callback, v)
  186. if answered:
  187. self.answered[sender.id] = sender
  188. insort(self.answeredq, NodeWrap(sender, self.num))
  189. self.schedule()
  190. ## get value
  191. def schedule(self):
  192. if self.finished:
  193. return
  194. for wrapper in self.foundq:
  195. node = wrapper.node
  196. if self.shouldQuery(node):
  197. if len(self.answeredq) >= K and self.answeredq[K-1] < wrapper:
  198. # done searching
  199. break
  200. #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
  201. try:
  202. f = getattr(node, self.findValue)
  203. except AttributeError:
  204. print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
  205. else:
  206. try:
  207. df = f(self.target, self.table.node.id)
  208. df.addCallback(self.handleGotNodes)
  209. df.addErrback(self.makeMsgFailed(node))
  210. self.outstanding = self.outstanding + 1
  211. self.queried[node.id] = 1
  212. except KRPCSelfNodeError:
  213. pass
  214. if self.outstanding >= const.CONCURRENT_REQS:
  215. break
  216. assert(self.outstanding) >=0
  217. if self.outstanding == 0:
  218. ## all done
  219. self.finish([])
  220. ## get value
  221. def goWithNodes(self, nodes, found=None):
  222. self.results = {}
  223. if found:
  224. for n in found:
  225. self.results[n] = 1
  226. for node in nodes:
  227. if node.id == self.table.node.id:
  228. continue
  229. else:
  230. self.found[node.id] = node
  231. insort(self.foundq, NodeWrap(node, self.num))
  232. self.schedule()
  233. class StoreValue(ActionBase):
  234. def __init__(self, table, target, value, callback, callLater, store="storeValue"):
  235. ActionBase.__init__(self, table, target, callback, callLater)
  236. self.value = value
  237. self.stored = []
  238. self.store = store
  239. def storedValue(self, t, node):
  240. self.outstanding -= 1
  241. if self.finished:
  242. return
  243. self.stored.append(t)
  244. if len(self.stored) >= const.STORE_REDUNDANCY:
  245. self.finished=1
  246. self.callback(self.stored)
  247. else:
  248. if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
  249. self.schedule()
  250. return t
  251. def storeFailed(self, t, node):
  252. self.outstanding -= 1
  253. if self.finished:
  254. return t
  255. self.schedule()
  256. return t
  257. def schedule(self):
  258. if self.finished:
  259. return
  260. num = const.CONCURRENT_REQS - self.outstanding
  261. if num > const.STORE_REDUNDANCY - len(self.stored):
  262. num = const.STORE_REDUNDANCY - len(self.stored)
  263. if num == 0 and not self.finished:
  264. self.finished=1
  265. self.callback(self.stored)
  266. while num > 0:
  267. try:
  268. node = self.nodes.pop()
  269. except IndexError:
  270. if self.outstanding == 0:
  271. self.finished = 1
  272. self._cleanup()
  273. self.callback(self.stored)
  274. return
  275. else:
  276. if not node.id == self.table.node.id:
  277. try:
  278. f = getattr(node, self.store)
  279. except AttributeError:
  280. print ">>> %s doesn't have a %s method!" % (node, self.store)
  281. else:
  282. try:
  283. df = f(self.target, self.value, self.table.node.id)
  284. except KRPCProtocolError:
  285. self.table.table.invalidateNode(node)
  286. except KRPCSelfNodeError:
  287. pass
  288. else:
  289. df.addCallback(self.storedValue, node=node)
  290. df.addErrback(self.storeFailed, node=node)
  291. self.outstanding += 1
  292. num -= 1
  293. def goWithNodes(self, nodes):
  294. self.nodes = nodes
  295. self.nodes.sort(self.sort)
  296. self.schedule()
  297. class GetAndStore(GetValue):
  298. def __init__(self, table, target, value, callback, storecallback, callLater, find="findValue", store="storeValue"):
  299. self.store = store
  300. self.value = value
  301. self.cb2 = callback
  302. self.storecallback = storecallback
  303. def cb(res):
  304. self.cb2(res)
  305. if not(res):
  306. n = StoreValue(self.table, self.target, self.value, self.doneStored, self.callLater, self.store)
  307. n.goWithNodes(self.answered.values())
  308. GetValue.__init__(self, table, target, cb, callLater, find)
  309. def doneStored(self, dict):
  310. self.storecallback(dict)
  311. class KeyExpirer:
  312. def __init__(self, store, callLater):
  313. self.store = store
  314. self.callLater = callLater
  315. self.callLater(const.KEINITIAL_DELAY, self.doExpire)
  316. def doExpire(self):
  317. self.cut = time() - const.KE_AGE
  318. self.store.expire(self.cut)
  319. self.callLater(const.KE_DELAY, self.doExpire)