BandwidthManager.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # By Greg Hazel
  11. from __future__ import division
  12. debug = False
  13. stats = False # collect statistics and dump to files.
  14. import os # for stats
  15. from BitTorrent import platform # for stats
  16. import time # for stats
  17. import math
  18. import itertools
  19. from BTL.obsoletepythonsupport import set
  20. from BitTorrent.RTTMonitor import RTTMonitor
  21. #from BitTorrent.RTTMonitor2 import RTTMonitor, Win32Icmp, UnixIcmp
  22. from BTL.HostIP import get_deferred_host_ips
  23. from BTL.platform import bttime
  24. from BTL.Lists import SizedList
  25. def fp(flt):
  26. return "%.2f" % flt
  27. class NodeFeeder(object):
  28. """Every few minutes, this will obtain the set of known peers from
  29. the running torrents and then tell the RTTMonitor to retraceroute
  30. to these peers to reidentify the common path."""
  31. def __init__(self, external_add_task,
  32. get_remote_endpoints, rttmonitor):
  33. self.external_add_task = external_add_task
  34. self.get_remote_endpoints = get_remote_endpoints
  35. self.rttmonitor = rttmonitor
  36. self.external_add_task(3, self._start)
  37. def _start(self):
  38. df = get_deferred_host_ips()
  39. df.addCallback(self._collect_nodes)
  40. def _collect_nodes(self, local_ips):
  41. addrs = self.get_remote_endpoints()
  42. ips = set()
  43. for (ip, port) in addrs:
  44. if ip is not None and ip != "0.0.0.0" and ip not in local_ips:
  45. assert isinstance(ip, str)
  46. assert isinstance(port, int)
  47. ips.add(ip)
  48. self.rttmonitor.set_nodes_restart(ips)
  49. delay = 5
  50. if len(ips) > 0:
  51. delay = 300
  52. self.external_add_task(delay, self._collect_nodes, local_ips)
  53. def ratio_sum_lists(a, b):
  54. tx = float(sum(a))
  55. ty = float(sum(b))
  56. return tx / max(ty, 0.0001)
  57. def standard_deviation(l):
  58. N = len(l)
  59. x = 0
  60. total = float(sum(l))
  61. mean = total/N
  62. # Subtract the mean from each data item and square the difference.
  63. # Sum all the squared deviations.
  64. for i in l:
  65. x += (i - mean)**2.0
  66. try:
  67. if False:
  68. # Divide sum of squares by N-1 (sample variance).
  69. variance = x / (N - 1)
  70. else:
  71. # Divide sum of squares by N (population variance).
  72. variance = x / N
  73. except ZeroDivisionError:
  74. variance = 0
  75. stddev = math.sqrt(variance)
  76. return stddev
  77. class BandwidthManager(object):
  78. def __init__(self, external_add_task, config, set_option,
  79. get_remote_endpoints, get_rates):
  80. if debug:
  81. if config['bandwidth_management']:
  82. print "bandwidth management is up."
  83. else:
  84. print "!@#!@#!@#!@#!@# bandwidth management is down."
  85. self.external_add_task = external_add_task
  86. self.config = config
  87. self.set_option = set_option
  88. self.get_rates = get_rates
  89. # Next few lines were added by David Harrison to use RTTMonitor2
  90. #if os.name == 'nt':
  91. # icmp_impl = Win32Icmp()
  92. #elif os.name == 'posix':
  93. # icmp_impl = UnixIcmp(external_add_task, config['xicmp_port'])
  94. def got_new_rtt(rtt):
  95. self.external_add_task(0, self._inspect_rates, rtt)
  96. #self.rttmonitor = RTTMonitor(got_new_rtt, icmp_impl)
  97. self.rttmonitor = RTTMonitor(got_new_rtt)
  98. self.nodefeeder = NodeFeeder(external_add_task=external_add_task,
  99. get_remote_endpoints=get_remote_endpoints,
  100. rttmonitor=self.rttmonitor)
  101. self.start_time = None
  102. self.max_samples = 10 # hmm...
  103. self.u = SizedList(self.max_samples)
  104. self.d = SizedList(self.max_samples)
  105. self.t = SizedList(self.max_samples * 2)
  106. self.ur = SizedList(self.max_samples)
  107. self.dr = SizedList(self.max_samples)
  108. self.current_std = 0.001
  109. self.max_std = 0.001
  110. self.last_max = bttime()
  111. self.max_rates = {}
  112. self.max_rates["upload"] = 1.0
  113. self.max_rates["download"] = 1.0
  114. self.max_p = 1.0
  115. self.min_p = 2**500
  116. self.mid_p = ((self.max_p - self.min_p) / 2.0) + self.min_p
  117. self.old_p = None
  118. # I pulled these numbers out of my ass.
  119. if stats:
  120. tmp_dir = platform.get_temp_dir()
  121. timestr = "%d_%d_%d_%d_%d" % time.localtime()[1:6]
  122. stats_dir = os.path.join( tmp_dir, "bittorrent%s_%d" %
  123. (timestr, os.getpid()) )
  124. os.mkdir(stats_dir)
  125. if debug: print "BandwidthManager: stats_dir = %s" % stats_dir
  126. rate_vs_time = os.path.join( stats_dir, "rate_vs_time.plotdata" )
  127. self.rfp = open( rate_vs_time, "w" )
  128. delay_vs_time = os.path.join( stats_dir, "delay_vs_time.plotdata" )
  129. self.dfp = open( delay_vs_time, "w" )
  130. sdev_vs_time = os.path.join( stats_dir,
  131. "stddev_vs_time.plotdata" )
  132. self.sdevfp = open( sdev_vs_time, "w" )
  133. def _method_1(self, type, t, c, old_c, rate):
  134. # This concept is:
  135. # if the correlation is high and the latency is high
  136. # then lower the bandwidth limit.
  137. # otherwise, raise it.
  138. if ((c > 0.96) and (t > 100)):
  139. rate /= 2.0
  140. if debug:
  141. print type, "down to", rate
  142. else:
  143. rate += 500 # hmm
  144. if debug:
  145. print type, "up to", rate
  146. return rate
  147. def _method_2(self, type, t, c, old_c, rate):
  148. # This concept is:
  149. # if the correlation is low and the latency is high, lower the limit
  150. # otherwise raise it
  151. if ((c < 0.60) and (t > 100)):
  152. rate /= 2.0
  153. if debug:
  154. print type, "down to", rate
  155. else:
  156. rate += 500 # hmm
  157. if debug:
  158. print type, "up to", rate
  159. return rate
  160. def _method_vegasish(self, type, t, c, old_c, rate):
  161. middle_rtt = ((self.rttmonitor.get_min_rtt() +
  162. self.rttmonitor.get_max_rtt()) / 2.0)
  163. if t > middle_rtt:
  164. rate *= 1.0/8.0
  165. if debug:
  166. print type, "down to", rate
  167. else:
  168. rate += 1000 # hmm
  169. if debug:
  170. print type, "up to", rate
  171. return rate
  172. def _method_vegas_greg(self, type, t, c, old_c, rate):
  173. middle_rtt = ((self.rttmonitor.get_min_rtt() +
  174. self.rttmonitor.get_max_rtt()) / 2.0)
  175. if t > middle_rtt and c < 0.5:
  176. rate *= 1.0/8.0
  177. if debug:
  178. print type, "down to", rate
  179. else:
  180. rate += 1000 # hmm
  181. if debug:
  182. print type, "up to", rate
  183. return rate
  184. def _method_ratio(self, type, t, p, min_p, max_p, rate):
  185. ratio = p / max_p
  186. if debug:
  187. print "RATIO", ratio
  188. if ratio < 0.5:
  189. rate = ratio * self.max_rates[type]
  190. if debug:
  191. print type.upper(), "SET to", rate
  192. else:
  193. rate += rate * (ratio/10.0) # hmm
  194. if debug:
  195. print type.upper(), "UP to", rate
  196. return max(rate, 1000)
  197. def _method_stddev(self, type, std, max_std, rate):
  198. top = 0.80 # FUDGE
  199. if std > (max_std * top):
  200. center = 1.0 + top - ((1.0 - top) * 0.5)
  201. s = min(std/max(0.0001, max_std), center)
  202. s = center - s
  203. rate *= s
  204. if debug:
  205. print type.upper(), "DOWN *", s, "to", rate
  206. else:
  207. s = 1000.0 # FUDGE
  208. s *= min(max_std/max(0.0001, std), 4) / 4.0
  209. s = int(s)
  210. rate += s
  211. if debug:
  212. print type.upper(), "UP +", s, "to", rate
  213. return max(rate, 1000) # FUDGE
  214. def _affect_rate(self, type, std, max_std, rate, set):
  215. rate = self._method_stddev(type, std, max_std, rate)
  216. rock_bottom = False
  217. if rate <= 4096:
  218. if debug:
  219. print "Rock bottom"
  220. rock_bottom = True
  221. rate = 4096
  222. set(int(rate))
  223. if stats:
  224. print "BandwidthManager._affect_rate(%f)" % rate
  225. self.rfp.write( "%d %d" % (bttime(),int(rate)) )
  226. self.sdevfp.write( "%d %f" % (bttime(), std ) )
  227. return rock_bottom
  228. def _inspect_rates(self, t = None):
  229. if t == None:
  230. t = self.rttmonitor.get_current_rtt()
  231. if t == None:
  232. # this makes timeouts reduce the maximum std deviation
  233. self.max_std *= 0.80 # FUDGE
  234. return
  235. if self.start_time == None:
  236. self.start_time = bttime()
  237. if debug:
  238. print "BandwidthManager._inspect_rates rtt: %d" % t
  239. if stats:
  240. self.dfp.write( "%d %d" % (bttime(),t) )
  241. def set_if_enabled(option, value):
  242. if not self.config['bandwidth_management']:
  243. return
  244. if debug:
  245. print "Setting %s to: %s" % (option, value)
  246. self.set_option(option, value)
  247. # TODO: slow start should be smarter than this
  248. if self.start_time + 20 > bttime():
  249. if debug:
  250. print 'SLOW START', fp(self.start_time + 20), fp(bttime())
  251. set_if_enabled('max_upload_rate', 10000000)
  252. set_if_enabled('max_download_rate', 10000000)
  253. if t < 3:
  254. # I simply don't believe you. Go away.
  255. return
  256. tup = self.get_rates()
  257. if tup == None:
  258. return
  259. u, d = tup
  260. #print "udt", u, d, t
  261. #print "uprate, downrate=", u, d
  262. self.u.append(u)
  263. self.d.append(d)
  264. self.t.append(t)
  265. self.ur.append(self.config['max_upload_rate'])
  266. self.dr.append(self.config['max_download_rate'])
  267. #s = time.time()
  268. #cu = correlation(self.u, self.t)
  269. #cd = correlation(self.d, self.t)
  270. #cur = correlation(self.u, self.ur)
  271. #cdr = correlation(self.d, self.dr)
  272. #e = time.time()
  273. self.current_std = standard_deviation(self.t)
  274. pu = ratio_sum_lists(self.u, self.t)
  275. pd = ratio_sum_lists(self.d, self.t)
  276. if len(self.u) > 2:
  277. lp = [ x/y for x, y in itertools.izip(self.u, self.t) ]
  278. min_pu = min(*lp)
  279. max_pu = max(*lp)
  280. else:
  281. min_pu = u / t
  282. max_pu = u / t
  283. pu = u / t
  284. self.max_rates["upload"] = max(max(self.u), self.max_rates["upload"])
  285. self.max_rates["download"] = max(max(self.d), self.max_rates["download"])
  286. if debug:
  287. print 'urate:', fp(u), 'umax:', self.config['max_upload_rate'], \
  288. 'maxstd:', fp(self.max_std), 'std:', fp(self.current_std), \
  289. 'pu:', fp(pu), 'pd:', fp(pd)
  290. rb = self._affect_rate("upload", self.current_std, self.max_std,
  291. self.config['max_upload_rate'],
  292. lambda r : set_if_enabled('max_upload_rate', r))
  293. # don't adjust download rate, it's not nearly correlated enough
  294. ## if rb:
  295. ## v = int(self.config['max_download_rate'] * 0.90) # FUDGE
  296. ## v = max(v, 2000) # FUDGE
  297. ## set_if_enabled('max_download_rate', v)
  298. ## else:
  299. ## v = int(self.config['max_download_rate'] + 6000) # FUDGE
  300. ## set_if_enabled('max_download_rate', v)
  301. ## if debug:
  302. ## print "DOWNLOAD SET to", v
  303. #self._affect_rate("download", t, cd, self.last_cd, pd,
  304. # self.config['max_download_rate'],
  305. # lambda r : self.set_option('max_download_rate', r))
  306. if self.current_std > self.max_std:
  307. self.max_std = self.current_std
  308. self.last_max = bttime()
  309. elif bttime() - self.last_max > 10:
  310. # decay old maximums, to recover from flakey connections
  311. self.max_std *= 0.90 # FUDGE
  312. self.last_max = bttime()
  313. #self.last_cu = cu
  314. #self.last_cd = cd
  315. # we're re-called by the pinging thing
  316. #self.external_add_task(0.1, self._inspect_rates)