RTTMonitor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # By Greg Hazel and a smigden by Dave Harrison
  11. debug = False
  12. import os
  13. import Queue
  14. import socket
  15. import random
  16. from pprint import pprint
  17. from BTL.platform import bttime
  18. import BTL.stackthreading as threading
  19. from BTL.HostIP import get_host_ip, get_host_ips
  20. from BTL.exceptions import str_exc
  21. if os.name == 'nt':
  22. from BTL import win32icmp
  23. def daemon_thread(target, args=()):
  24. t = threading.Thread(target=target, args=args)
  25. t.setDaemon(True)
  26. return t
  27. def izip_some(p, *a):
  28. for i in a:
  29. if len(i) > p:
  30. yield i[p]
  31. def izip_any(*a):
  32. m = max([len(i) for i in a])
  33. for x in xrange(m):
  34. yield izip_some(x, *a)
  35. def in_common(routes):
  36. """routes is a list of lists, each containing a route to a peer."""
  37. r = []
  38. branch = False
  39. for n in izip_any(*routes): #itertools.izip(*routes):
  40. # strip dead nodes
  41. f = [i for i in n if i != '*']
  42. # ignore all dead nodes
  43. if len(f) == 0:
  44. continue
  45. c = [ (f.count(x), x) for x in f ]
  46. c.sort()
  47. if debug:
  48. pprint(c)
  49. top = c[-1][0]
  50. # majority wins
  51. if top > 2 and top > (len(f) * 0.50):
  52. f = [c[-1][1]]
  53. if len(set(f)) == 1:
  54. r.append(f[0])
  55. else:
  56. # more than one unique node, the tree has branched
  57. branch = True
  58. break
  59. return (branch, r)
  60. class RTTMonitorBase(object):
  61. def __init__(self, new_rtt=None):
  62. self.instantanious_rtt = None
  63. self.min_rtt = None
  64. self.max_rtt = None
  65. def f(rtt):
  66. pass
  67. if new_rtt:
  68. self.new_rtt = new_rtt
  69. else:
  70. self.new_rtt = f
  71. def set_nodes_restart(self, nodes):
  72. pass
  73. def get_min_rtt(self):
  74. return self.min_rtt
  75. def get_max_rtt(self):
  76. return self.max_rtt
  77. def get_current_rtt(self):
  78. return self.instantanious_rtt
  79. class RTTMonitorUnix(RTTMonitorBase):
  80. # I assume this will have a unix implementation some day
  81. pass
  82. def _set_min(x, y):
  83. if x is None:
  84. return y
  85. if y is None:
  86. return x
  87. return min(x, y)
  88. _set_max = max
  89. class RTTMonitorWin32(RTTMonitorBase):
  90. def __init__(self, new_rtt, interval = 0.5, timeout = 6.0):
  91. self.timeout = int(1000 * timeout)
  92. self.interval = interval
  93. self.stop_event = threading.Event()
  94. self.abort_traceroute = threading.Event()
  95. self.finished_event = threading.Event()
  96. # the thread is finished because it hasn't started
  97. self.finished_event.set()
  98. RTTMonitorBase.__init__(self, new_rtt)
  99. def set_nodes_restart(self, nodes):
  100. if len(nodes) > 10:
  101. nodes = random.sample(nodes, 10)
  102. else:
  103. nodes = list(nodes)
  104. t = threading.Thread(target=self.run, args=(nodes,))
  105. t.setDaemon(True)
  106. t.start()
  107. def get_route(self, q, dst):
  108. try:
  109. dst = socket.gethostbyname(dst)
  110. self.traceroute(dst, self.timeout, lambda n : q.put((dst, n)))
  111. except socket.gaierror:
  112. # if hostbyname lookup fails, it's not a node we can use.
  113. # maybe this should be a warning or something, but a downed
  114. # internet connection will cause a lot of these
  115. pass
  116. def run(self, nodes):
  117. q = Queue.Queue()
  118. dst = None
  119. # handy for hard-coding common node
  120. #dst = '68.87.195.50'; nodes = [dst,]; common = nodes
  121. if not dst:
  122. threads = []
  123. for i in nodes:
  124. t = daemon_thread(target=self.get_route, args=(q, i, ))
  125. threads.append(t)
  126. t.start()
  127. waiter_done_event = threading.Event()
  128. def waiter(threads, waiter_done_event):
  129. try:
  130. for thread in threads:
  131. thread.join() # blocks until thread terminates.
  132. except Exception, e:
  133. print "waiter hiccupped", e
  134. waiter_done_event.set()
  135. waiting_thread = daemon_thread(target=waiter,
  136. args=(threads, waiter_done_event, ))
  137. waiting_thread.start()
  138. common = []
  139. routes = {}
  140. while not waiter_done_event.isSet():
  141. try:
  142. msg = q.get(True, 1.0)
  143. except Queue.Empty:
  144. pass
  145. else:
  146. dst = msg[0]
  147. # nodes appear in the queue in
  148. # increasing order of TTL.
  149. new_node = msg[1]
  150. routes.setdefault(dst, []).append(new_node)
  151. branch, common = in_common(routes.values())
  152. if branch:
  153. break
  154. self.abort_traceroute.set()
  155. waiter_done_event.wait()
  156. self.abort_traceroute.clear()
  157. local_ips = get_host_ips()
  158. new_common = []
  159. for c in common:
  160. if c not in local_ips:
  161. new_common.append(c)
  162. common = new_common
  163. if debug:
  164. pprint(common)
  165. if len(common) == 0:
  166. # this should be inspected, it's not a simple debug message
  167. if debug:
  168. print "No common node"
  169. pprint(routes)
  170. return
  171. del routes
  172. dst = common[-1]
  173. # kill the old thread
  174. self.stop_event.set()
  175. # wait for it to finish
  176. self.finished_event.wait()
  177. # clear to indicate we're running
  178. self.finished_event.clear()
  179. self.stop_event.clear()
  180. if debug:
  181. print "Farthest common hop [%s]" % dst
  182. # range can change if the node in common changes
  183. self.min_rtt = None
  184. self.max_rtt = None
  185. # Ping a representative peer but set the ttl to the length of the
  186. # common path so that the farthest common hop responds with
  187. # ICMP time exceeded. (Some routers will send time exceeded
  188. # messages, but they won't respond to ICMP pings directly)
  189. representative = random.sample(nodes, 1)[0]
  190. if debug:
  191. print ("pinging representative %s ttl=%d" %
  192. (representative, len(common)))
  193. try:
  194. while not self.stop_event.isSet():
  195. start = bttime()
  196. rtt = self.ping(representative, 5000, ttl=len(common))
  197. self.instantanious_rtt = rtt
  198. self.min_rtt = _set_min(self.min_rtt, rtt)
  199. self.max_rtt = _set_max(self.max_rtt, rtt)
  200. delta = bttime() - start
  201. self.stop_event.wait(self.interval - delta)
  202. if debug: print "RTTMonitor.py: new_rtt %s" % rtt
  203. self.new_rtt(self.instantanious_rtt)
  204. except Exception, e:
  205. import traceback
  206. traceback.print_exc()
  207. print "ABORTING", e
  208. self.finished_event.set()
  209. def traceroute(self, dst, timeout, report=None):
  210. """If report is None then this returns the route as a list of IP
  211. addresses. If report is not None then this calls report as each
  212. node is discovered in the path (e.g., if there are 6 hops in the
  213. path then report gets called 6 times)."""
  214. if debug:
  215. print "Tracing route to [%s]" % dst
  216. i = win32icmp.IcmpCreateFile()
  217. o = win32icmp.Options()
  218. route = None
  219. if report == None:
  220. route = []
  221. def add_node(node):
  222. route.append(node)
  223. report = add_node
  224. for ttl in xrange(64):
  225. o.Ttl = ttl
  226. try:
  227. if ttl == 0:
  228. addr = get_host_ip()
  229. status = -1
  230. rtt = 0
  231. else:
  232. addr, status, rtt = win32icmp.IcmpSendEcho(i, dst, None, o,
  233. timeout)
  234. if debug:
  235. print "ttl", ttl, "\t", rtt, "ms\t", addr
  236. report(addr)
  237. if status == win32icmp.IP_SUCCESS:
  238. if debug:
  239. print "Traceroute complete in", ttl, "hops"
  240. break
  241. except Exception, e:
  242. report('*')
  243. if debug:
  244. print "Hop", ttl, "failed:", str_exc(e)
  245. if self.abort_traceroute.isSet():
  246. break
  247. win32icmp.IcmpCloseHandle(i)
  248. if route:
  249. return route
  250. def ping(self, dst, timeout, ttl = None):
  251. """Returns ICMP echo round-trip time to dst or returns None if a
  252. timeout occurs. timeout is measured in milliseconds.
  253. The TTL is useful if the caller wants to ping the router that
  254. is a number of hops in the direction of the dst, e.g., when a
  255. router will not respond to pings directly but will generate
  256. ICMP Time Exceeded messages."""
  257. i = win32icmp.IcmpCreateFile()
  258. rtt = None
  259. try:
  260. o = None
  261. if ttl is not None:
  262. o = win32icmp.Options()
  263. o.Ttl = ttl
  264. addr, status, rtt = win32icmp.IcmpSendEcho(i, dst, None, o, timeout)
  265. if debug:
  266. if status == win32icmp.IP_SUCCESS:
  267. print "Reply from", addr + ":", "time=" + str(rtt)
  268. elif status == win32icmp.IP_TTL_EXPIRED_TRANSIT:
  269. print "Ping ttl expired %d: from %s time=%s" %(
  270. status, str(addr), str(rtt))
  271. else:
  272. print "Ping failed", win32icmp.status[status]
  273. except KeyboardInterrupt:
  274. raise
  275. except Exception, e:
  276. if debug:
  277. print "Ping failed:", str_exc(e)
  278. win32icmp.IcmpCloseHandle(i)
  279. return rtt
  280. if os.name == 'nt':
  281. RTTMonitor = RTTMonitorWin32
  282. else:
  283. RTTMonitor = RTTMonitorUnix