RTTMonitor2.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # By Greg Hazel and David Harrison
  11. debug = False
  12. #debug = True
  13. import os
  14. import Queue
  15. import socket
  16. import itertools
  17. import random
  18. from pprint import pprint
  19. from BTL.platform import bttime
  20. import BTL.stackthreading as threading
  21. from BTL.HostIP import get_host_ip, get_host_ips
  22. from BitTorrent.platform import spawn, app_root
  23. #from twisted.web.xmlrpc import Proxy
  24. if os.name == 'nt':
  25. import win32icmp
  26. IP_TTL_EXPIRED_TRANSIT = 11013
  27. IP_SUCCESS = 0
  28. def daemon_thread(target, args=()):
  29. t = threading.Thread(target=target, args=args)
  30. t.setDaemon(True)
  31. return t
  32. def in_common(routes):
  33. """routes is a list of lists, each containing a route to a peer."""
  34. # Greg: This is a little weird. Nodes appear in the queue in
  35. # increasing order of TTL among nodes in the path to
  36. # a given IP. However, there is no guarantee of ordering
  37. # between IP's. As a result, a closer branching
  38. # may be missed if the traceroute following that branch
  39. # is delayed longer than the traceroutes to other IPs.
  40. # --Dave
  41. r = []
  42. branch = False
  43. for n in itertools.izip(*routes):
  44. # strip dead nodes
  45. f = [i for i in n if i !='*']
  46. # ignore all dead nodes
  47. if len(f) == 0:
  48. continue
  49. if len(set(f)) == 1:
  50. r.append(f[0])
  51. else:
  52. # more than one unique node, the tree has branched
  53. branch = True
  54. break
  55. return (branch, r)
  56. class RTTMonitorBase(object):
  57. def __init__(self, new_rtt=None):
  58. self.instantanious_rtt = None
  59. def f(rtt):
  60. pass
  61. if new_rtt:
  62. self.new_rtt = new_rtt
  63. else:
  64. self.new_rtt = f
  65. def set_nodes_restart(self, nodes):
  66. pass
  67. def get_current_rtt(self):
  68. return self.instantanious_rtt
  69. # someday I'll write this using twisted. --D. Harrison
  70. #class RTTMonitorUnix(RTTMonitorBase):
  71. import xmlrpclib # blech. Better with twisted, but RTTMonitorWin32
  72. # was already written to handle blocking Icmp calls.
  73. class Icmp(object):
  74. """Implements ICMP."""
  75. def create_file(self):
  76. return 0
  77. def ping(self, fid, addr, ttl, timeout):
  78. # returns addr, status, rtt.
  79. pass
  80. def close(self,fid):
  81. pass
  82. class UnixIcmp(Icmp):
  83. def __init__(self, external_add_task, port):
  84. assert callable(external_add_task) # rawserver's
  85. assert type(port) in (int,long) and port > 0 and port <= 65535
  86. #pid = os.spawnl(os.P_NOWAIT, "xicmp", str(port))
  87. print "Spawning xicmp on port ", port
  88. xicmp = os.path.join( app_root, "icmp", "xicmp" )
  89. spawn( xicmp, str(port) )
  90. def _start_proxy(port):
  91. self.proxy = xmlrpclib.ServerProxy('http://localhost:%d' % port)
  92. external_add_task(4.0, _start_proxy, port) # allow time to spawn.
  93. def create_file(self):
  94. return self.proxy.IcmpCreateFile()
  95. def ping(self, fid, addr, ttl, timeout):
  96. try:
  97. return self.proxy.IcmpSendEcho( fid, addr, ttl, timeout )
  98. except xmlrpclib.Fault:
  99. return None
  100. def close(self,fid):
  101. #print "UnixIcmp: close: fid=", fid
  102. return self.proxy.IcmpCloseHandle( fid )
  103. class Options(object):
  104. pass
  105. class Win32Icmp(Icmp):
  106. def __init__(self):
  107. self.o = Options()
  108. def create_file(self):
  109. i = win32icmp.IcmpCreateFile()
  110. def ping(self, fid, addr, ttl, timeout):
  111. self.o.Ttl = ttl
  112. return win32icmp.IcmpSendEcho(fid, addr, None, self.o, timeout)
  113. def close(self,fid):
  114. win32icmp.IcmpCloseHandle(fid)
  115. def _set_min(x, y):
  116. if x is None:
  117. return y
  118. if y is None:
  119. return x
  120. return min(x, y)
  121. _set_max = max
  122. #class RTTMonitorWin32(RTTMonitorBase):
  123. class RTTMonitorBlocking(RTTMonitorBase):
  124. def __init__(self, new_rtt, icmp, interval = 0.5, timeout = 6.0 ):
  125. """
  126. @param new_rtt called every time a ping arrives.
  127. @param icmp is the ICMP implementation.
  128. @param timeout (currently uses a static timeout threshold)
  129. """
  130. assert callable(new_rtt)
  131. assert isinstance(icmp, Icmp)
  132. self.icmp = icmp
  133. self.nodes = []
  134. self.timeout = int(1000 * timeout) # in ms.
  135. self.interval = interval
  136. self.stop_event = threading.Event()
  137. self.abort_traceroute = threading.Event()
  138. self.finished_event = threading.Event()
  139. # the thread is finished because it hasn't started
  140. self.finished_event.set()
  141. RTTMonitorBase.__init__(self, new_rtt)
  142. def set_nodes_restart(self, nodes):
  143. if debug:
  144. pprint( "set_nodes_restart: nodes=%s" % str(nodes))
  145. self.nodes = []
  146. for node in nodes:
  147. self.add_node(node)
  148. t = threading.Thread(target=self.run, args=(list(self.nodes),))
  149. t.setDaemon(True)
  150. t.start()
  151. def add_node(self, node):
  152. self.nodes.append(node)
  153. def get_route(self, q, dst):
  154. try:
  155. dst = socket.gethostbyname(dst)
  156. self.traceroute(dst, self.timeout, lambda n : q.put((dst, n)))
  157. except socket.gaierror:
  158. # if hostbyname lookup fails, it's not a node we can use
  159. # maybe this should be a warning or something, but a downed
  160. # internet connection will cause a lot of these
  161. pass
  162. def run(self, nodes):
  163. q = Queue.Queue()
  164. dst = None
  165. # handy for hard-coding common node
  166. #dst = '68.87.195.50'
  167. if not dst:
  168. threads = []
  169. for i in nodes:
  170. t = daemon_thread(target=self.get_route, args=(q, i, ))
  171. threads.append(t)
  172. t.start()
  173. waiter_done_event = threading.Event()
  174. def waiter(threads, waiter_done_event):
  175. try:
  176. for thread in threads:
  177. thread.join() # blocks until thread terminates.
  178. except Exception, e:
  179. print "waiter hiccupped", e
  180. waiter_done_event.set()
  181. waiting_thread = daemon_thread(target=waiter,
  182. args=(threads, waiter_done_event, ))
  183. waiting_thread.start()
  184. common = []
  185. routes = {}
  186. #print "tracerouting..."
  187. hop_check = 0 # distance (in hops) being checked for branch.
  188. hop_cnt = {} # number responses received at the given distance.
  189. farthest_possible = 1000 # farthest distance possible as
  190. # determined by the shortest number of
  191. # hops to a node in the passed nodes.
  192. branch = False
  193. while not waiter_done_event.isSet():
  194. try:
  195. msg = q.get(True, 1.0)
  196. except Queue.Empty:
  197. pass
  198. else:
  199. dst = msg[0]
  200. # nodes appear in the queue in
  201. # increasing order of TTL.
  202. new_node = msg[1]
  203. if dst not in routes:
  204. l = []
  205. routes[dst] = l
  206. else:
  207. l = routes[dst]
  208. l.append(new_node)
  209. #print "calling in_common with ", routes.values()
  210. # BEGIN replaces in_common
  211. #hops_so_far = len(routes[dst])
  212. ## It is not possible for the common path to be longer
  213. ## than the closest node.
  214. #if dst == new_node and hops_so_far < farthest_possible:
  215. # farthest_possible = hops_so_far
  216. #if hop_cnt.has_key(hops_so_far):
  217. # hop_cnt[hops_so_far] += 1
  218. #else:
  219. # hop_cnt[hops_so_far] = 1
  220. #
  221. #if hops_so_far == hop_check:
  222. # # if got all pings for a given distance then see if
  223. # # there is a branch.
  224. # while hop_cnt[hop_check] == len(nodes) and \
  225. # hop_check <= farthest_possible:
  226. # n = None
  227. # for r in routes:
  228. # if n is not None and n != routes[d]:
  229. # branch = True
  230. # break
  231. # if routes[hop_check] != '*':
  232. # n = routes[hop_check]
  233. # else:
  234. # common.append(n)
  235. # hop_check += 1
  236. # if hop_check > farthest_possible:
  237. # branch = True
  238. ## END
  239. branch, common = in_common(routes.values())
  240. if branch:
  241. break
  242. #print "done tracerouting..."
  243. self.abort_traceroute.set()
  244. waiter_done_event.wait()
  245. self.abort_traceroute.clear()
  246. local_ips = get_host_ips()
  247. new_common = []
  248. for c in common:
  249. if c not in local_ips:
  250. new_common.append(c)
  251. common = new_common
  252. if debug:
  253. pprint(common)
  254. if len(common) == 0:
  255. # this should be inspected, it's not a simple debug message
  256. if debug:
  257. print "No common node", routes
  258. return
  259. del routes
  260. dst = common[-1]
  261. # kill the old thread
  262. self.stop_event.set()
  263. # wait for it to finish
  264. self.finished_event.wait()
  265. # clear to indicate we're running
  266. self.finished_event.clear()
  267. self.stop_event.clear()
  268. if debug:
  269. print "Farthest common hop [%s]" % dst
  270. # Ping a representative peer but set the ttl to the length of the
  271. # common path so that the farthest common hop responds with
  272. # ICMP time exceeded. (Some routers will send time exceeded
  273. # messages, but they won't respond to ICMP pings directly)
  274. representative = nodes[random.randrange(0, len(nodes))]
  275. if debug:
  276. print "pinging representative %s ttl=%d" % (
  277. representative,len(common))
  278. try:
  279. while not self.stop_event.isSet():
  280. start = bttime()
  281. rtt = self.ping(representative, 5000, ttl=len(common))
  282. self.instantanious_rtt = rtt
  283. delta = bttime() - start
  284. self.stop_event.wait(self.interval - delta)
  285. if debug: print "RTTMonitor.py: new_rtt %s" % rtt
  286. self.new_rtt(self.instantanious_rtt)
  287. except Exception, e:
  288. import traceback
  289. traceback.print_exc()
  290. print "ABORTING", e
  291. self.finished_event.set()
  292. def traceroute(self, dst, timeout, report=None):
  293. """If report is None then this returns the route as a list of IP
  294. addresses. If report is not None then this calls report as each
  295. node is discovered in the path (e.g., if there are 6 hops in the
  296. path then report gets called 6 times)."""
  297. if debug:
  298. print "Tracing route to [%s]" % dst
  299. i = self.icmp.create_file()
  300. o = Options()
  301. route = None
  302. if report == None:
  303. route = []
  304. def add_node(node):
  305. route.append(node)
  306. report = add_node
  307. for ttl in xrange(64):
  308. try:
  309. if ttl == 0:
  310. addr = get_host_ip()
  311. status = -1
  312. rtt = 0
  313. else:
  314. addr, status, rtt = self.icmp.ping(i,dst,ttl,timeout)
  315. if debug:
  316. print "ttl", ttl, "\t", rtt, "ms\t", addr
  317. report(addr)
  318. if status == IP_SUCCESS:
  319. if debug:
  320. print "Traceroute complete in", ttl, "hops"
  321. break
  322. except Exception, e:
  323. report('*')
  324. if debug:
  325. print "Hop", ttl, "failed:", str(e)
  326. if self.abort_traceroute.isSet():
  327. break
  328. self.icmp.close(i)
  329. if route:
  330. return route
  331. def ping(self, dst, timeout, ttl = None):
  332. """Returns ICMP echo round-trip time to dst or returns None if a
  333. timeout occurs. timeout is measured in milliseconds.
  334. The TTL is useful if the caller wants to ping the router that
  335. is a number of hops in the direction of the dst, e.g., when a
  336. router will not respond to pings directly but will generate
  337. ICMP Time Exceeded messages."""
  338. i = self.icmp.create_file()
  339. rtt = None
  340. try:
  341. addr, status, rtt = self.icmp.ping(i, dst, ttl, timeout)
  342. if debug:
  343. if status == IP_SUCCESS:
  344. print "Reply from", addr + ":", "time=" + str(rtt)
  345. elif status == IP_TTL_EXPIRED_TRANSIT:
  346. print "Ping ttl expired %d: from %s time=%s" %(
  347. status, str(addr), str(rtt))
  348. else:
  349. print "Ping failed", status
  350. except Exception, e:
  351. if debug:
  352. print "Ping failed:", str(e)
  353. self.icmp.close(i)
  354. return rtt
  355. #if os.name == 'nt':
  356. # RTTMonitor = RTTMonitorWin32
  357. #else:
  358. # RTTMonitor = RTTMonitorUnix
  359. RTTMonitor = RTTMonitorBlocking