1
0

krpc.py 8.9 KB


  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. from defer import Deferred
  11. from BTL.bencode import bencode, bdecode
  12. import socket
  13. from BitTorrent.RawServer_twisted import Handler
  14. from BTL.platform import bttime
  15. from BTL.translation import _
  16. import time
  17. from math import log10
  18. import sys
  19. from traceback import print_exc
  20. from khash import distance
  21. from BTL.cache import Cache
  22. from KRateLimiter import KRateLimiter
  23. from hammerlock import Hammerlock
  24. from const import *
  25. # commands
  26. TID = 't'
  27. REQ = 'q'
  28. RSP = 'r'
  29. TYP = 'y'
  30. ARG = 'a'
  31. ERR = 'e'
  32. class KRPCFailSilently(Exception):
  33. pass
  34. class KRPCProtocolError(Exception):
  35. pass
  36. class KRPCServerError(Exception):
  37. pass
  38. class KRPCSelfNodeError(Exception):
  39. pass
  40. class hostbroker(Handler):
  41. def __init__(self, server, addr, transport, call_later, max_ul_rate, config, rlcount):
  42. self.server = server
  43. self.addr = addr
  44. self.transport = transport
  45. self.rltransport = KRateLimiter(transport, max_ul_rate, call_later, rlcount, config['max_rate_period'])
  46. self.call_later = call_later
  47. self.connections = Cache(touch_on_access=True)
  48. self.hammerlock = Hammerlock(100, call_later)
  49. self.expire_connections(loop=True)
  50. self.config = config
  51. if not self.config.has_key('pause'):
  52. self.config['pause'] = False
  53. def expire_connections(self, loop=False):
  54. self.connections.expire(bttime() - KRPC_CONNECTION_CACHE_TIME)
  55. if loop:
  56. self.call_later(KRPC_CONNECTION_CACHE_TIME, self.expire_connections, True)
  57. def data_came_in(self, addr, datagram):
  58. #if addr != self.addr:
  59. if not self.config['pause'] and self.hammerlock.check(addr):
  60. c = self.connectionForAddr(addr)
  61. c.datagramReceived(datagram, addr)
  62. def connection_lost(self, socket):
  63. ## this is like, bad
  64. print ">>> connection lost!", socket
  65. def connectionForAddr(self, addr):
  66. if addr == self.addr:
  67. raise KRPCSelfNodeError()
  68. if not self.connections.has_key(addr):
  69. conn = KRPC(addr, self.server, self.transport, self.rltransport, self.call_later)
  70. self.connections[addr] = conn
  71. else:
  72. conn = self.connections[addr]
  73. return conn
  74. ## connection
  75. class KRPC(object):
  76. __slots__ = ('noisy','call_later','transport','rltransport','factory','addr','tids','mtid','pinging')
  77. noisy = 0
  78. def __init__(self, addr, server, transport, rltransport, call_later):
  79. self.call_later = call_later
  80. self.transport = transport
  81. self.rltransport = rltransport
  82. self.factory = server
  83. self.addr = addr
  84. self.tids = {}
  85. self.mtid = 0
  86. self.pinging = False
  87. def sendErr(self, addr, tid, code, msg):
  88. ## send error
  89. out = bencode({TID:tid, TYP:ERR, ERR :(code, msg)})
  90. olen = len(out)
  91. self.rltransport.sendto(out, 0, addr)
  92. return olen
  93. def datagramReceived(self, str, addr):
  94. # bdecode
  95. try:
  96. msg = bdecode(str)
  97. except Exception, e:
  98. if self.noisy:
  99. print "response decode error: " + `e`, `str`
  100. else:
  101. #if self.noisy:
  102. # print msg
  103. # look at msg type
  104. if msg[TYP] == REQ:
  105. ilen = len(str)
  106. # if request
  107. # tell factory to handle
  108. f = getattr(self.factory ,"krpc_" + msg[REQ], None)
  109. msg[ARG]['_krpc_sender'] = self.addr
  110. if f and callable(f):
  111. try:
  112. ret = apply(f, (), msg[ARG])
  113. except KRPCFailSilently:
  114. pass
  115. except KRPCServerError, e:
  116. olen = self.sendErr(addr, msg[TID], 202, "Server Error: %s" % e.args[0])
  117. except KRPCProtocolError, e:
  118. olen = self.sendErr(addr, msg[TID], 204, "Protocol Error: %s" % e.args[0])
  119. except Exception, e:
  120. print_exc(20)
  121. olen = self.sendErr(addr, msg[TID], 202, "Server Error")
  122. else:
  123. if ret:
  124. # make response
  125. out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
  126. else:
  127. out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
  128. # send response
  129. olen = len(out)
  130. self.rltransport.sendto(out, 0, addr)
  131. else:
  132. if self.noisy:
  133. #print "don't know about method %s" % msg[REQ]
  134. pass
  135. # unknown method
  136. olen = self.sendErr(addr, msg[TID], *KERR_METHOD_UNKNOWN)
  137. if self.noisy:
  138. try:
  139. ndist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, msg[ARG]['id']))
  140. ndist = int(ndist)
  141. except OverflowError:
  142. ndist = 999
  143. h = None
  144. if msg[ARG].has_key('target'):
  145. h = msg[ARG]['target']
  146. elif msg[ARG].has_key('info_hash'):
  147. h = msg[ARG]['info_hash']
  148. else:
  149. tdist = '-'
  150. if h != None:
  151. try:
  152. tdist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, h))
  153. tdist = int(tdist)
  154. except OverflowError:
  155. tdist = 999
  156. t = time.localtime()
  157. t = "%2d-%2d-%2d %2d:%2d:%2d" % (t[0], t[1], t[2], t[3], t[4], t[5])
  158. print "%s %s %s >>> %s - %s %s %s - %s %s" % (t,
  159. msg[ARG]['id'].encode('base64')[:4],
  160. addr,
  161. self.factory.node.port,
  162. ilen,
  163. msg[REQ],
  164. olen,
  165. ndist,
  166. tdist)
  167. elif msg[TYP] == RSP:
  168. # if response
  169. # lookup tid
  170. if self.tids.has_key(msg[TID]):
  171. df = self.tids[msg[TID]]
  172. # callback
  173. del(self.tids[msg[TID]])
  174. df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
  175. else:
  176. # no tid, this transaction timed out already...
  177. pass
  178. elif msg[TYP] == ERR:
  179. # if error
  180. # lookup tid
  181. if self.tids.has_key(msg[TID]):
  182. df = self.tids[msg[TID]]
  183. # callback
  184. df.errback(msg[ERR])
  185. del(self.tids[msg[TID]])
  186. else:
  187. # day late and dollar short
  188. pass
  189. else:
  190. # unknown message type
  191. df = self.tids[msg[TID]]
  192. # callback
  193. df.errback((KRPC_ERROR_RECEIVED_UNKNOWN, _("received unknown message type")))
  194. del(self.tids[msg[TID]])
  195. def sendRequest(self, method, args):
  196. # make message
  197. # send it
  198. msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
  199. self.mtid = (self.mtid + 1) % 256
  200. s = bencode(msg)
  201. d = Deferred()
  202. self.tids[msg[TID]] = d
  203. self.call_later(KRPC_TIMEOUT, self.timeOut, msg[TID])
  204. self.call_later(0, self._send, s, d)
  205. return d
  206. def timeOut(self, id):
  207. if self.tids.has_key(id):
  208. df = self.tids[id]
  209. del(self.tids[id])
  210. df.errback((KRPC_ERROR_TIMEOUT, _("timeout")))
  211. def _send(self, s, d):
  212. try:
  213. self.transport.sendto(s, 0, self.addr)
  214. except socket.error:
  215. d.errback((KRPC_SOCKET_ERROR, _("socket error")))