1
0

khashmir.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. import const
  11. from socket import gethostbyname
  12. from BTL.platform import bttime as time
  13. from sha import sha
  14. import re
  15. from BitTorrent.defaultargs import common_options, rare_options
  16. from BitTorrent.RawServer_twisted import RawServer
  17. from ktable import KTable, K
  18. from knode import *
  19. from kstore import KStore
  20. from khash import newID, newIDInRange
  21. from util import packNodes
  22. from actions import FindNode, GetValue, KeyExpirer, StoreValue
  23. import krpc
  24. import sys
  25. import os
  26. import traceback
  27. from BTL.bencode import bencode, bdecode
  28. from defer import Deferred
  29. from random import randrange
  30. from kstore import sample
  31. from BTL.stackthreading import Event, Thread
  32. ip_pat = re.compile('[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')
  33. class KhashmirDBExcept(Exception):
  34. pass
  35. def foo(bytes):
  36. pass
  37. # this is the base class, has base functionality and find node, no key-value mappings
  38. class KhashmirBase:
  39. _Node = KNodeBase
  40. def __init__(self, host, port, data_dir, rawserver=None, max_ul_rate=1024, checkpoint=True, errfunc=None, rlcount=foo, config={'pause':False, 'max_rate_period':20}):
  41. if rawserver:
  42. self.rawserver = rawserver
  43. else:
  44. self.flag = Event()
  45. d = dict([(x[0],x[1]) for x in common_options + rare_options])
  46. self.rawserver = RawServer(self.flag, d)
  47. self.max_ul_rate = max_ul_rate
  48. self.socket = None
  49. self.config = config
  50. self.setup(host, port, data_dir, rlcount, checkpoint)
  51. def setup(self, host, port, data_dir, rlcount, checkpoint=True):
  52. self.host = host
  53. self.port = port
  54. self.ddir = data_dir
  55. self.store = KStore()
  56. self.pingcache = {}
  57. self.socket = self.rawserver.create_udpsocket(self.port, self.host)
  58. self.udp = krpc.hostbroker(self, (self.host, self.port), self.socket, self.rawserver.add_task, self.max_ul_rate, self.config, rlcount)
  59. self._load()
  60. self.rawserver.start_listening_udp(self.socket, self.udp)
  61. self.last = time()
  62. KeyExpirer(self.store, self.rawserver.add_task)
  63. self.refreshTable(force=1)
  64. if checkpoint:
  65. self.rawserver.add_task(30, self.findCloseNodes, lambda a: a, True)
  66. self.rawserver.add_task(60, self.checkpoint, 1)
  67. def Node(self):
  68. n = self._Node(self.udp.connectionForAddr)
  69. n.table = self
  70. return n
  71. def __del__(self):
  72. if self.socket is not None:
  73. self.rawserver.stop_listening_udp(self.socket)
  74. self.socket.close()
  75. def _load(self):
  76. do_load = False
  77. try:
  78. s = open(os.path.join(self.ddir, "routing_table"), 'r').read()
  79. dict = bdecode(s)
  80. except:
  81. id = newID()
  82. else:
  83. id = dict['id']
  84. do_load = True
  85. self.node = self._Node(self.udp.connectionForAddr).init(id, self.host, self.port)
  86. self.table = KTable(self.node)
  87. if do_load:
  88. self._loadRoutingTable(dict['rt'])
  89. def checkpoint(self, auto=0):
  90. d = {}
  91. d['id'] = self.node.id
  92. d['rt'] = self._dumpRoutingTable()
  93. try:
  94. f = open(os.path.join(self.ddir, "routing_table"), 'wb')
  95. f.write(bencode(d))
  96. f.close()
  97. except Exception, e:
  98. #XXX real error here
  99. print ">>> unable to dump routing table!", str(e)
  100. pass
  101. if auto:
  102. self.rawserver.add_task(randrange(int(const.CHECKPOINT_INTERVAL * .9),
  103. int(const.CHECKPOINT_INTERVAL * 1.1)),
  104. self.checkpoint, 1)
  105. def _loadRoutingTable(self, nodes):
  106. """
  107. load routing table nodes from database
  108. it's usually a good idea to call refreshTable(force=1) after loading the table
  109. """
  110. for rec in nodes:
  111. n = self.Node().initWithDict(rec)
  112. self.table.insertNode(n, contacted=0, nocheck=True)
  113. def _dumpRoutingTable(self):
  114. """
  115. save routing table nodes to the database
  116. """
  117. l = []
  118. for bucket in self.table.buckets:
  119. for node in bucket.l:
  120. l.append({'id':node.id, 'host':node.host, 'port':node.port, 'age':int(node.age)})
  121. return l
  122. def _addContact(self, host, port, callback=None):
  123. """
  124. ping this node and add the contact info to the table on pong!
  125. """
  126. n =self.Node().init(const.NULL_ID, host, port)
  127. try:
  128. self.sendPing(n, callback=callback)
  129. except krpc.KRPCSelfNodeError:
  130. # our own node
  131. pass
  132. #######
  133. ####### LOCAL INTERFACE - use these methods!
  134. def addContact(self, ip, port, callback=None):
  135. """
  136. ping this node and add the contact info to the table on pong!
  137. """
  138. if ip_pat.match(ip):
  139. self._addContact(ip, port)
  140. else:
  141. def go(ip=ip, port=port):
  142. ip = gethostbyname(ip)
  143. self.rawserver.external_add_task(0, self._addContact, ip, port)
  144. t = Thread(target=go)
  145. t.start()
  146. ## this call is async!
  147. def findNode(self, id, callback, errback=None):
  148. """ returns the contact info for node, or the k closest nodes, from the global table """
  149. # get K nodes out of local table/cache, or the node we want
  150. nodes = self.table.findNodes(id, invalid=True)
  151. l = [x for x in nodes if x.invalid]
  152. if len(l) > 4:
  153. nodes = sample(l , 4) + self.table.findNodes(id, invalid=False)[:4]
  154. d = Deferred()
  155. if errback:
  156. d.addCallbacks(callback, errback)
  157. else:
  158. d.addCallback(callback)
  159. if len(nodes) == 1 and nodes[0].id == id :
  160. d.callback(nodes)
  161. else:
  162. # create our search state
  163. state = FindNode(self, id, d.callback, self.rawserver.add_task)
  164. self.rawserver.external_add_task(0, state.goWithNodes, nodes)
  165. def insertNode(self, n, contacted=1):
  166. """
  167. insert a node in our local table, pinging oldest contact in bucket, if necessary
  168. If all you have is a host/port, then use addContact, which calls this method after
  169. receiving the PONG from the remote node. The reason for the seperation is we can't insert
  170. a node into the table without it's peer-ID. That means of course the node passed into this
  171. method needs to be a properly formed Node object with a valid ID.
  172. """
  173. old = self.table.insertNode(n, contacted=contacted)
  174. if old and old != n:
  175. if not old.inPing():
  176. self.checkOldNode(old, n, contacted)
  177. else:
  178. l = self.pingcache.get(old.id, [])
  179. if (len(l) < 10 or contacted) and len(l) < 15:
  180. l.append((n, contacted))
  181. self.pingcache[old.id] = l
  182. def checkOldNode(self, old, new, contacted=False):
  183. ## these are the callbacks used when we ping the oldest node in a bucket
  184. def cmp(a, b):
  185. if a[1] == 1 and b[1] == 0:
  186. return -1
  187. elif b[1] == 1 and a[1] == 0:
  188. return 1
  189. else:
  190. return 0
  191. def _staleNodeHandler(dict, old=old, new=new, contacted=contacted):
  192. """ called if the pinged node never responds """
  193. if old.fails >= 2:
  194. l = self.pingcache.get(old.id, [])
  195. l.sort(cmp)
  196. if l:
  197. n, nc = l[0]
  198. if (not contacted) and nc:
  199. l = l[1:] + [(new, contacted)]
  200. new = n
  201. contacted = nc
  202. o = self.table.replaceStaleNode(old, new)
  203. if o and o != new:
  204. self.checkOldNode(o, new)
  205. try:
  206. self.pingcache[o.id] = self.pingcache[old.id]
  207. del(self.pingcache[old.id])
  208. except KeyError:
  209. pass
  210. else:
  211. if l:
  212. del(self.pingcache[old.id])
  213. l.sort(cmp)
  214. l = l[:5]
  215. for node in l:
  216. self.insertNode(node[0], node[1])
  217. else:
  218. l = self.pingcache.get(old.id, [])
  219. if l:
  220. del(self.pingcache[old.id])
  221. self.insertNode(new, contacted)
  222. l = l[:5]
  223. for node in l:
  224. self.insertNode(node[0], node[1])
  225. def _notStaleNodeHandler(dict, old=old, new=new, contacted=contacted):
  226. """ called when we get a pong from the old node """
  227. self.table.insertNode(old, True)
  228. self.insertNode(new, contacted)
  229. l = self.pingcache.get(old.id, [])
  230. l.sort(cmp)
  231. l = l[:5]
  232. for node in l:
  233. self.insertNode(node[0], node[1])
  234. try:
  235. del(self.pingcache[old.id])
  236. except KeyError:
  237. pass
  238. try:
  239. df = old.ping(self.node.id)
  240. except krpc.KRPCSelfNodeError:
  241. pass
  242. df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
  243. def sendPing(self, node, callback=None):
  244. """
  245. ping a node
  246. """
  247. try:
  248. df = node.ping(self.node.id)
  249. except krpc.KRPCSelfNodeError:
  250. pass
  251. else:
  252. ## these are the callbacks we use when we issue a PING
  253. def _pongHandler(dict, node=node, table=self.table, callback=callback):
  254. _krpc_sender = dict['_krpc_sender']
  255. dict = dict['rsp']
  256. sender = {'id' : dict['id']}
  257. sender['host'] = _krpc_sender[0]
  258. sender['port'] = _krpc_sender[1]
  259. n = self.Node().initWithDict(sender)
  260. table.insertNode(n)
  261. if callback:
  262. callback()
  263. def _defaultPong(err, node=node, table=self.table, callback=callback):
  264. if callback:
  265. callback()
  266. df.addCallbacks(_pongHandler,_defaultPong)
  267. def findCloseNodes(self, callback=lambda a: a, auto=False):
  268. """
  269. This does a findNode on the ID one away from our own.
  270. This will allow us to populate our table with nodes on our network closest to our own.
  271. This is called as soon as we start up with an empty table
  272. """
  273. if not self.config['pause']:
  274. id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
  275. self.findNode(id, callback)
  276. if auto:
  277. if not self.config['pause']:
  278. self.refreshTable()
  279. self.rawserver.external_add_task(randrange(int(const.FIND_CLOSE_INTERVAL *0.9),
  280. int(const.FIND_CLOSE_INTERVAL *1.1)),
  281. self.findCloseNodes, lambda a: True, True)
  282. def refreshTable(self, force=0):
  283. """
  284. force=1 will refresh table regardless of last bucket access time
  285. """
  286. def callback(nodes):
  287. pass
  288. refresh = [bucket for bucket in self.table.buckets if force or (len(bucket.l) < K) or len(filter(lambda a: a.invalid, bucket.l)) or (time() - bucket.lastAccessed > const.BUCKET_STALENESS)]
  289. for bucket in refresh:
  290. id = newIDInRange(bucket.min, bucket.max)
  291. self.findNode(id, callback)
  292. def stats(self):
  293. """
  294. Returns (num_contacts, num_nodes)
  295. num_contacts: number contacts in our routing table
  296. num_nodes: number of nodes estimated in the entire dht
  297. """
  298. num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
  299. num_nodes = const.K * (2**(len(self.table.buckets) - 1))
  300. return {'num_contacts':num_contacts, 'num_nodes':num_nodes}
  301. def krpc_ping(self, id, _krpc_sender):
  302. sender = {'id' : id}
  303. sender['host'] = _krpc_sender[0]
  304. sender['port'] = _krpc_sender[1]
  305. n = self.Node().initWithDict(sender)
  306. self.insertNode(n, contacted=0)
  307. return {"id" : self.node.id}
  308. def krpc_find_node(self, target, id, _krpc_sender):
  309. nodes = self.table.findNodes(target, invalid=False)
  310. nodes = map(lambda node: node.senderDict(), nodes)
  311. sender = {'id' : id}
  312. sender['host'] = _krpc_sender[0]
  313. sender['port'] = _krpc_sender[1]
  314. n = self.Node().initWithDict(sender)
  315. self.insertNode(n, contacted=0)
  316. return {"nodes" : packNodes(nodes), "id" : self.node.id}
  317. ## This class provides read-only access to the DHT, valueForKey
  318. ## you probably want to use this mixin and provide your own write methods
  319. class KhashmirRead(KhashmirBase):
  320. _Node = KNodeRead
  321. def retrieveValues(self, key):
  322. try:
  323. l = self.store[key]
  324. except KeyError:
  325. l = []
  326. return l
  327. ## also async
  328. def valueForKey(self, key, callback, searchlocal = 1):
  329. """ returns the values found for key in global table
  330. callback will be called with a list of values for each peer that returns unique values
  331. final callback will be an empty list - probably should change to 'more coming' arg
  332. """
  333. nodes = self.table.findNodes(key)
  334. # get locals
  335. if searchlocal:
  336. l = self.retrieveValues(key)
  337. if len(l) > 0:
  338. self.rawserver.external_add_task(0, callback, l)
  339. else:
  340. l = []
  341. # create our search state
  342. state = GetValue(self, key, callback, self.rawserver.add_task)
  343. self.rawserver.external_add_task(0, state.goWithNodes, nodes, l)
  344. def krpc_find_value(self, key, id, _krpc_sender):
  345. sender = {'id' : id}
  346. sender['host'] = _krpc_sender[0]
  347. sender['port'] = _krpc_sender[1]
  348. n = self.Node().initWithDict(sender)
  349. self.insertNode(n, contacted=0)
  350. l = self.retrieveValues(key)
  351. if len(l) > 0:
  352. return {'values' : l, "id": self.node.id}
  353. else:
  354. nodes = self.table.findNodes(key, invalid=False)
  355. nodes = map(lambda node: node.senderDict(), nodes)
  356. return {'nodes' : packNodes(nodes), "id": self.node.id}
  357. ### provides a generic write method, you probably don't want to deploy something that allows
  358. ### arbitrary value storage
  359. class KhashmirWrite(KhashmirRead):
  360. _Node = KNodeWrite
  361. ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
  362. def storeValueForKey(self, key, value, callback=None):
  363. """ stores the value for key in the global table, returns immediately, no status
  364. in this implementation, peers respond but don't indicate status to storing values
  365. a key can have many values
  366. """
  367. def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
  368. if not response:
  369. # default callback
  370. def _storedValueHandler(sender):
  371. pass
  372. response=_storedValueHandler
  373. action = StoreValue(self, key, value, response, self.rawserver.add_task)
  374. self.rawserver.external_add_task(0, action.goWithNodes, nodes)
  375. # this call is asynch
  376. self.findNode(key, _storeValueForKey)
  377. def krpc_store_value(self, key, value, id, _krpc_sender):
  378. t = "%0.6f" % time()
  379. self.store[key] = value
  380. sender = {'id' : id}
  381. sender['host'] = _krpc_sender[0]
  382. sender['port'] = _krpc_sender[1]
  383. n = self.Node().initWithDict(sender)
  384. self.insertNode(n, contacted=0)
  385. return {"id" : self.node.id}
  386. # the whole shebang, for testing
  387. class Khashmir(KhashmirWrite):
  388. _Node = KNodeWrite