1
0

BandwidthManager2.py 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # By Greg Hazel and David Harrison
  11. import sys
  12. if __name__ == "__main__":
  13. sys.path = [".."] + sys.path
  14. debug = True
  15. stats = True # collect statistics and dump to files.
  16. import os # for stats
  17. import platform # for stats
  18. import time # for stats
  19. import math
  20. import socket
  21. import itertools
  22. from BitTorrent.obsoletepythonsupport import set
  23. from BitTorrent.RTTMonitor import RTTMonitor, Win32Icmp, UnixIcmp
  24. from BitTorrent.HostIP import get_host_ips
  25. from BitTorrent.platform import bttime, get_temp_subdir
  26. from BitTorrent.Lists import SizedList, LargestDropSizedList, \
  27. RandomDropSizedList
  28. stats_dir = None
  29. if stats:
  30. stats_dir = os.path.join(platform.get_temp_subdir(), "stats")
  31. os.mkdir(stats_dir)
  32. if debug: print "BandwidthManager: stats_dir = %s" % stats_dir
  33. class NodeFeeder(object):
  34. """Every few minutes, this will obtain the set of known peers from
  35. the running torrents and then tell the RTTMonitor to retraceroute
  36. to these peers to reidentify the common path."""
  37. def __init__(self, add_task, get_remote_endpoints, rttmonitor):
  38. self.add_task = add_task
  39. self.get_remote_endpoints = get_remote_endpoints
  40. self.rttmonitor = rttmonitor
  41. self.add_task(3, self._collect_nodes)
  42. def _collect_nodes(self):
  43. addrs = self.get_remote_endpoints()
  44. print "_collect_nodes: addrs=",addrs
  45. local_ips = get_host_ips()
  46. ips = set()
  47. for (ip, port) in addrs:
  48. if ip is not None and ip != "0.0.0.0" and ip not in local_ips:
  49. ips.add(ip)
  50. self.rttmonitor.set_nodes_restart(ips)
  51. delay = 5
  52. if len(ips) > 0:
  53. delay = 300
  54. self.add_task(delay, self._collect_nodes)
  55. def _copy_gnuplot(fname):
  56. if os.getenv("BT"):
  57. src = os.path.join(os.getenv("BT"), "test",
  58. "test_BandwdithManager", fname )
  59. dst = os.path.join( stats_dir, fname )
  60. shutil.copy(src,dst)
  61. def ratio_sum_lists(a, b):
  62. tx = float(sum(a))
  63. ty = float(sum(b))
  64. return tx / max(ty, 0.0001)
  65. def mean(l):
  66. N = len(l)
  67. total = float(sum(l))
  68. return total/N
  69. def median(l):
  70. assert len(l)> 0
  71. if len(l) == 1:
  72. return l[0]
  73. elif len(l) % 2 == 0: # even number of elements.
  74. return (l[len(l)/2-1] + l[len(l)/2])/2.
  75. else: # odd number of elements.
  76. return l[len(l)/2]
  77. def variance(l):
  78. N = len(l)
  79. x = 0
  80. total = float(sum(l))
  81. mean = total/N
  82. # Subtract the mean from each data item and square the difference.
  83. # Sum all the squared deviations.
  84. for i in l:
  85. x += (i - mean)**2.0
  86. try:
  87. if False:
  88. # Divide sum of squares by N-1 (sample variance).
  89. variance = x / (N - 1)
  90. else:
  91. # Divide sum of squares by N (population variance).
  92. variance = x / N
  93. except ZeroDivisionError:
  94. variance = 0
  95. return variance
  96. def standard_deviation(l):
  97. return math.sqrt(variance)
  98. class Tracer(object):
  99. """Baseclass for sampler wrapper objects."""
  100. def __init__( self, func ):
  101. self._func = func
  102. def __call__(self, *args, **kwargs):
  103. return self._func(*args, **kwargs)
  104. class StreamTracer(Tracer):
  105. """outputs monitored samples to a stream."""
  106. def __init__( self, func, stream ):
  107. Tracer.__init__(self,func)
  108. self._stream = stream
  109. self._i = 0
  110. def __call__(self, *args, **kwargs):
  111. val = self._func( *args, **kwargs)
  112. self._stream.write("%f\t%f\n" % (bttime(),val))
  113. self._i = (self._i+1) % 8 # flush a little more often.
  114. if self._i == 0:
  115. self._stream.flush()
  116. return val
  117. class MinWindow(object):
  118. """Maintains a list of the 'window_size' smallest
  119. samples seen. After every_nth sample, the smallest is dropped.
  120. Dropping allows the sampler to eventually recover from spurious
  121. outliers. This class is meant to be derived. See
  122. MedianOfMinWindow, etc.
  123. """
  124. def __init__(self, window_size, every_nth ):
  125. self._window = LargestDropSizedList( window_size )
  126. self._i = 0
  127. self._every_nth = every_nth
  128. def __getitem__(self, i):
  129. return self._window[i]
  130. def __len__( self ):
  131. return len(self._window)
  132. def __str__(self):
  133. return str(self._window)
  134. def __iter__(self):
  135. return self._window.__iter__()
  136. def update(self, sample):
  137. self._i = (self._i+1)%self._every_nth
  138. if self._i == 0: # drop smallest sample.
  139. self._window.popleft()
  140. self._window.insort( sample )
  141. def timeout(self):
  142. """If a timeout occurs then it is possible that our propagation
  143. estimate is too large. We therefore drop the top sample."""
  144. if len(self._window) > 1:
  145. self._window.pop() # drop largest sample.
  146. class MedianOfMinWindow(MinWindow):
  147. """Computes the median of a MinWindow.
  148. By using the median of the smallest, we add some
  149. additional resistance to outliers.
  150. If fp is provided then the medians of the min window
  151. are output to a file."""
  152. def __init__( self, window_size, every_nth ):
  153. MinWindow.__init__(self,window_size, every_nth)
  154. def __call__( self, sample = None):
  155. if sample is not None: self.update(sample)
  156. #print "MedianOfMinWindow:"
  157. #for i in xrange(len(self._window)):
  158. # print " ", self[i]
  159. return median(self)
  160. class AverageOfMinWindow(MinWindow):
  161. """Computes the average of a MinWindow. By using the average of
  162. the smallest, we add some additional resistance to outliers."""
  163. def __init__( self, window_size, every_nth ):
  164. MinWindow.__init__(self, window_size, every_nth)
  165. def __call__( self, sample = None):
  166. if sample is not None: self.update(sample)
  167. return mean(self._window)
  168. class MaxWindow(object):
  169. """Maintains a list of the 'window_size' largest
  170. samples seen. After every_nth sample, the largest is dropped.
  171. Dropping allows the sampler to eventually recover from spurious
  172. outliers. This class is meant to be derived. See
  173. MedianOfMaxWindow, etc."""
  174. def __init__(self, window_size, every_nth ):
  175. self._window = LargestDropSizedList( window_size ) # reverse w/ -sample
  176. self._i = 0
  177. self._every_nth = every_nth
  178. def __len__( self ):
  179. return len(self._window)
  180. def __str__(self):
  181. l = list(self._window)
  182. l.reverse()
  183. l = [-x for x in l]
  184. return str(l)
  185. def __iter__(self):
  186. class Iterator:
  187. def __init__(self,minwindow):
  188. self._minwindow = minwindow
  189. self._i = 0
  190. def next(self):
  191. if self._i < len(self._minwindow):
  192. val = self._minwindow[self._i]
  193. self._i += 1
  194. return val
  195. raise StopIteration()
  196. return Iterator(self)
  197. def __eq__(self,l):
  198. if isinstance(l,MaxWindow):
  199. return l._window == self._window
  200. elif isinstance(l,list):
  201. return l == [x for x in self]
  202. else:
  203. return False
  204. def __ne__(self,l):
  205. return not l == self
  206. def __getitem__(self,i):
  207. """retrieves the ith element in the window where element at index 0
  208. is the smallest."""
  209. return -self._window[len(self._window)-i-1]
  210. def update(self, sample):
  211. self._i = (self._i+1)%self._every_nth
  212. if self._i == 0: # drop largest sample.
  213. self._window.popleft()
  214. # maintain order. last element should be smallest so insert -sample.
  215. self._window.insort(-sample)
  216. class MedianOfMaxWindow(MaxWindow):
  217. """Computes the median of a MaxWindow.
  218. By using the median of the largest, we add some
  219. resistance to outliers."""
  220. def __init__( self, window_size, every_nth ):
  221. MaxWindow.__init__(self,window_size, every_nth)
  222. def __call__( self, sample = None ):
  223. if sample is not None: self.update(sample)
  224. return median(self)
  225. class AverageOfMaxWindow(MaxWindow):
  226. """Computes the average of a MaxWindow. By using the average of
  227. the largest, we add some resistance to outliers."""
  228. def __init__( self, window_size, every_nth ):
  229. MaxWindow.__init__(self, window_size, every_nth)
  230. def __call__( self, sample = None ):
  231. if sample is not None: self.update(sample)
  232. return mean(self._window)
  233. class AverageOfLastWindow(object):
  234. def __init__( self, window_size ):
  235. self._window = SizedList(window_size)
  236. def __call__( self, sample = None ):
  237. if sample is not None:
  238. self._window.append(sample)
  239. return mean(self._window)
  240. class MedianOfLastWindow(object):
  241. def __init__( self, window_size ):
  242. self._window = RandomDropSizedList(window_size)
  243. def __call__( self, sample = None ):
  244. """If passed no sample then returns current median."""
  245. if sample is not None:
  246. # maintain order.
  247. self._window.insort(sample)
  248. # randomly expels one sample to keep size. Dropping a random
  249. # sample exhibits no bias (as opposed to dropping smallest or
  250. # largest as done with MinWindow and MaxWindow)
  251. return median(self._window)
  252. class EWMA(object):
  253. def __init__( self, alpha, init_avg = None ):
  254. """Exponentially Weighted Moving Average (EWMA) functor.
  255. @param alpha: the weight used in the EWMA using 'smaller is
  256. slower' convention.
  257. @param init_avg: starting value of the average. If None then
  258. the average is only defined after the first sample and in
  259. the first call is set to the sample.
  260. """
  261. self._alpha = alpha
  262. self._avg = init_avg
  263. def __call__( self, sample = None ):
  264. """Computes the moving average after taking into account the passed
  265. sample. If passed nothing then it just returns the current average
  266. value."""
  267. if sample == None:
  268. if self._avg == None:
  269. raise ValueError( "Tried to retrieve value from EWMA before "
  270. "first sample." )
  271. else:
  272. if self._avg == None:
  273. self._avg = sample
  274. else:
  275. self._avg = (1-self._alpha) * self._avg + self._alpha * sample
  276. return self._avg
  277. class BinaryCongestionEstimator(object):
  278. """Abstract baseclass for congestion estimators that measure congestion
  279. based on rate, rtt, and/or loss. It's up to the
  280. subclass to decide whether congestion is occuring."""
  281. def timeout(self):
  282. """Called when a timeout occurs."""
  283. pass
  284. def __call__( self, rtt, uprate ):
  285. pass
  286. class VarianceCongestionEstimator(BinaryCongestionEstimator):
  287. """Congestion is assumed iff the stddev exceeds a threshold
  288. fraction of the maximum standard deviation."""
  289. # OBJECTION: Variance maximization works so long as the rate remains
  290. # below (to the left) of the peak in the variance versus rate curve.
  291. # In this regime, increasing rate causes an increase in variance.
  292. # When measured variance exceeds a threshold the system backs off causing
  293. # the variance to diminish. The system oscillates about this
  294. # optimal point.
  295. #
  296. # HOWEVER, the system behaves quite differently when rates are high, i.e.,
  297. # close to the bottleneck capacity. The graph of delay versus send rate
  298. # looks similar to a bellcurve. Our system increases the send rate
  299. # whenever the variance is below a threshold and decreases when
  300. # above the threshold. This is the correct behavior when on the
  301. # left-hand side of the bell curve. However, it is the OPPOSITE
  302. # of the desired behavior when to the right of the peak of the
  303. # bell curve. As a result, when rates get too high the system
  304. # continues to increase send rate until loss occurs.
  305. #
  306. # When loss occurs, the system backs off. When the control law is
  307. # AIMDControlLaw, the system backs off multiplicatively. This backoff
  308. # moves the system to the left on the bell curve. If the move is large
  309. # enough the system climbs over the hump and the system resumes
  310. # the proper behavior of increasing rate whenever variance is below
  311. # a threshold. If however, the backoff is insufficient to reach
  312. # the peak of the bell curve then the system slides back to the right
  313. # and resumes increasing send rate until loss occurs.
  314. #
  315. # This system is not guaranteed to converge on the equilibrium from all
  316. # feasible points.
  317. # -- David Harrison
  318. def __init__(self, window_size):
  319. self._window = SizedList(window_size)
  320. self._window_size = window_size
  321. self._max_var = 0.0
  322. if stats:
  323. delay_var_vs_time = os.path.join( stats_dir,
  324. "delay_var_vs_time.plotdata" )
  325. self._var_fp = open( delay_var_vs_time, "w" )
  326. max_var_vs_time = os.path.join( stats_dir,
  327. "max_var_vs_time.plotdata" )
  328. self._max_var_fp = open( max_var_vs_time, "w" )
  329. _copy_gnuplot( "var_vs_time.gnuplot" )
  330. def timeout(self):
  331. self._max_var *= 0.64 # FUDGE. same as using 0.8 * max stddev.
  332. if stats:
  333. self._max_var_fp.write( "%f\t%f\n" % (bttime(), self._max_var) )
  334. def __call__( self, rtt, rate ):
  335. self._window.append(rtt)
  336. var = variance(self._window)
  337. if stats:
  338. self._var_fp.write( "%f\t%f\n" % (bttime(), var) )
  339. if var > self._max_var:
  340. self._max_var = var
  341. if stats:
  342. self._max_var_fp.write( "%f\t%f\n" % (bttime(), self._max_var))
  343. # won't signal congestion until we have at least a full window's
  344. # worth of samples.
  345. if self._window < self._window_size:
  346. return False
  347. if var > (self._max_var * 0.64): # FUDGE
  348. return True
  349. else:
  350. return False
  351. class ChebyshevCongestionEstimator(BinaryCongestionEstimator):
  352. """This congestion estimator first estimates the variance
  353. and mean of the conditional delay distribution for the condition
  354. when the bottleneck is uncongested. We then test for the
  355. congested state based on delay samples exceeding a threshold.
  356. We set the threshold based on the Chebyshev Inequality:
  357. Chebysev Inequality:
  358. P[(X-u)^2 >= k^2] <= sigma^2 / k^2 (1)
  359. More details are given in source code comments."""
  360. #
  361. # Here u and sigma are the conditional mean and stddev, X is a single
  362. # sample. We thus can bound the probability of a single sample exceeding
  363. # a threshold. If we set the delay threshold to k such that we trigger
  364. # congestion detection whenever the delay threshold is exceeded then
  365. # this inequality can be interpreted as an UPPER BOUND ON THE PROBABILITY
  366. # OF A FALSE POSITIVE CONGESTION DETECTION.
  367. #
  368. # Because we are dealing with a single bottleneck, we can know the
  369. # min (propagation) delay and max (buffer full) delay. We thus know
  370. # the worst case variance occurs when half the samples are at the upper
  371. # bound and half at the lower bound resulting in
  372. #
  373. # 2
  374. # (max-min)
  375. # var_sample approx ------- (2)
  376. # 4
  377. #
  378. # Substituting (2) into (1) yields
  379. # 2
  380. # (max-min)
  381. # P[(X-u)^2 >= k^2] <= --------- (3)
  382. # 4 k^2
  383. #
  384. # When the worst-case variance is achieved, the mean u = (max-min)/2.
  385. # If we then set the threshold equal to max then k = (max-min)/2 and
  386. # (3) becomes
  387. #
  388. # P[(X-u)^2 >= k^2] <= 1 (4)
  389. #
  390. # This means that in the worst-case, the Chebyshev inequality provides
  391. # a useless bound. However, for anything less than the worst-case,
  392. # the false positive probability is less than 1. But wait, how is it
  393. # useful to have a false positive probability near 1?
  394. #
  395. # If delay samples are independent when in the uncongested state,
  396. # the probability that n consecutive samples exceed the theshold
  397. # becomes,
  398. #
  399. # P[For n consecutive samples, (X-u)^2 >= k^2] <= (sigma^2 / k^2)^n
  400. # (5)
  401. #
  402. # If we only signal congestion when n consecutive samples are
  403. # above the threshold and if sigma^2 / k^2 < 1, then we can set
  404. # the probability of a false positive to anything we like by
  405. # setting n sufficiently large.
  406. #
  407. # I argue that during the uncongested state, the samples should be
  408. # approximately independent, because when the network is
  409. # uncongested, delay is not driven by queueing or at least not
  410. # by queueing that persists across a significant portion of our
  411. # sample window.
  412. #
  413. # To allow for the most distance above and below the threshold,
  414. # we try to set the threshold to (max-min)/2, but we will increase
  415. # the threshold if necessary should the number of consecutive samples
  416. # needed become too large (i.e., exceed max_consecutive) or if
  417. # the conditional mean exceeds (max-min)/2.
  418. #
  419. # k = thresh - u
  420. # k = (max-min)/2 - u
  421. # let P = P[For n consecutive samples,(X-u)^2 >= k^2] = false_pos_prob
  422. #
  423. # 2n
  424. # sigma
  425. # P <= -----------------
  426. # max-min 2n
  427. # ( -------- - u )
  428. # 2
  429. #
  430. # log P <= 2n log ( sigma / ((max-min)/2 - u))
  431. #
  432. # 1 log P
  433. # n >= - ------------------------------ . (6)
  434. # 2 log (sigma / ((max-min)/2 - u)
  435. #
  436. # We then turn >= in (6) to an assignment to derive the n used in
  437. # detecting congestion. If n exceeds max_consecutive then we adjust
  438. # the threshold keeping n at max_consecutive.
  439. #
  440. # 2n
  441. # sigma
  442. # P >= ---------------
  443. # 2n
  444. # (thresh - u )
  445. #
  446. #
  447. # sigma
  448. # thresh >= ------- + u (7)
  449. # P^(1/2n)
  450. #
  451. # The threshold is not allowed to exceed max_thresh of the way
  452. # between min and max. When threshold reaches this point, the
  453. # thresholds become less meaningful and the performance of the
  454. # congestion estimator is likely to suffer.
  455. #
  456. # Implementation Complexity:
  457. # The computational burden of this congestion estimator is
  458. # significantly higher than the TCP Reno/Tahoe loss based or TCP Vegas
  459. # delay-based congestion estimators, but this algorithm is applied
  460. # on the aggregate of ALL connections passing through the access point.
  461. # State maintainence for the 100+ connections created by BitTorrent
  462. # dwarfs the computational burden of this estimator.
  463. def __init__(self, window_size, drop_every_nth,
  464. false_pos_prob, max_consecutive, max_thresh, ewma ):
  465. assert drop_every_nth > 0
  466. assert false_pos_prob > 0.0 and false_pos_prob < 1.0
  467. assert max_consecutive > 1
  468. assert max_thresh > 0.0 and max_thresh < 1.0
  469. assert ewma > 0.0 and ewma < 1.0
  470. # parameters
  471. self._window_size = window_size
  472. self._false_pos_prob = false_pos_prob
  473. self._max_consecutive = max_consecutive
  474. self._thresh = max_thresh
  475. # estimators
  476. self._window = SizedList(window_size)
  477. self._propagation_estimator = \
  478. MedianOfMinWindow(window_size, drop_every_nth)
  479. self._delay_on_full_estimator = \
  480. MedianOfMaxWindow(window_size, drop_every_nth)
  481. self._cond_var = EWMA(alpha = ewma) # variance when uncongested.
  482. self._cond_mean = EWMA(alpha = ewma) # mean when uncongested.
  483. # counters
  484. self._init_samples = 0 # count of first samples.
  485. self._consecutive = 0 # consecutive samples above the threshold.
  486. # computed thresholds
  487. self._n = None
  488. self._thresh = None
  489. if stats:
  490. prop_vs_time = os.path.join( stats_dir, "prop_vs_time.plotdata" )
  491. fp = open( prop_vs_time, "w" )
  492. self._propagation_estimator = \
  493. StreamTracer( self._propagation_estimator, fp )
  494. full_vs_time = os.path.join( stats_dir, "full_vs_time.plotdata" )
  495. fp = open( full_vs_time, "w" )
  496. self._delay_on_full_estimator = \
  497. StreamTracer( self._delay_on_full_estimator, fp )
  498. cmean_vs_time = os.path.join( stats_dir, "cmean_vs_time.plotdata" )
  499. fp = open( cmean_vs_time, "w" )
  500. self._cond_mean = StreamTracer( self._cond_mean, fp )
  501. cvar_vs_time = os.path.join( stats_dir, "cvar_vs_time.plotdata" )
  502. fp = open( cvar_vs_time, "w" )
  503. self._cond_var = StreamTracer( self._cond_var, fp )
  504. thresh_vs_time = os.path.join(stats_dir,"thresh_vs_time.plotdata")
  505. self._thfp = open( thresh_vs_time, "w" )
  506. n_v_time = os.path.join( stats_dir, "n_vs_time.plotdata" )
  507. self._nfp = open( n_vs_time, "w" )
  508. def timeout(self):
  509. self._delay_on_full_estimator.timeout()
  510. def __call__( self, rtt, rate ):
  511. self._window.append(rtt)
  512. full = self._delay_on_full_estimator(rtt)
  513. prop = self._propagation_estimator(rtt)
  514. if ( self._init_samples < self._window_size ):
  515. # too few samples to determine whether there is congestion...
  516. self._init_samples += 1
  517. return False
  518. # enough samples to initialize conditional estimators.
  519. elif self._init_samples == self._window_size:
  520. self._init_samples += 1
  521. self._update(rtt)
  522. return False
  523. assert self._n is not None and self._thresh is not None
  524. epsilon = ( full - prop ) * 0.05
  525. # if delay is within epsilon of the propagation delay then
  526. # assume that we are in the uncongested state. We use the
  527. # window's middle sample to reduce bias.
  528. if self._window[len(self._window)/2] < prop + epsilon:
  529. self._update(rtt) # updates thresholds.
  530. if rtt > self._thresh:
  531. self._consecutive += 1
  532. if self._consecutive >= self._n:
  533. self._consecutive = 0 # don't generate multiple detections
  534. # for single period of congestion unless
  535. # it persists across separate trials
  536. # of n samples each.
  537. return True # congestion detected
  538. else:
  539. self._consecutive = 0
  540. return False # no congestion detected
  541. def _update(self, rtt):
  542. """update thresholds when delay is within epsilon of the
  543. estimated propagation delay."""
  544. var = self._cond_var(variance(self._window))
  545. u = self._cond_mean(mean(self._window))
  546. # 1 log P
  547. # n >= - ------------------------------ . (6)
  548. # 2 log (sigma / ((max-min)/2 - u)
  549. sigma = math.sqrt(var)
  550. max = self._delay_on_full_estimator()
  551. min = self._propagation_estimator()
  552. p = self._false_pos_prob
  553. thresh = (max-min)/2
  554. if thresh > u:
  555. n = int(0.5 * math.log(p) / math.log(sigma/(thresh-u)))
  556. if n < 1:
  557. n = 1
  558. if thresh <= u or n > self._max_consecutive:
  559. n = self._max_consecutive
  560. # sigma
  561. # thresh >= ------- + u (7)
  562. # P^(1/2n)
  563. thresh = sigma / p**(0.5*n) + u
  564. if thresh > self._max_thresh:
  565. # this is a bad state. if we are forced to set thresh to
  566. # max thresh then the rate of false positives will
  567. # inexorably increase. What else to do?
  568. thresh = self._max_thresh
  569. self._thresh = thresh
  570. self._n = n
  571. if stats:
  572. self._thfp.write( "%f\t%f\n" % (bttime(), self._thresh) )
  573. self._nfp.write( "%f\t%d\n" % (bttime(), self._n) )
  574. class VegasishCongestionEstimator(BinaryCongestionEstimator):
  575. """Delay-based congestion control with static threshold
  576. set at 1/2 distance between propagation delay and delay on full
  577. buffer."""
  578. def __init__(self, window_size, drop_every_nth ):
  579. self._rtt_estimator = AverageOfLastWindow(window_size)
  580. self._propagation_estimator = \
  581. MedianOfMinWindow(window_size, drop_every_nth)
  582. self._delay_on_full_estimator = \
  583. MedianOfMaxWindow(window_size, drop_every_nth)
  584. def __call__(self, rtt, rate):
  585. middle_rtt = ((self._propagation_estimator(rtt) +
  586. self._delay_on_full_estimator(rtt)) / 2.0 )
  587. if rtt > middle_rtt:
  588. return True
  589. else:
  590. return False
  591. class BinaryControlLaw(object):
  592. """Increases or decreases rate limit based on a binary congestion
  593. indication"""
  594. def __call__( self, is_congested, rate ):
  595. """Passed rate is the averaged upload rate. Returned rate
  596. is a rate limit."""
  597. pass
  598. class AIADControlLaw(BinaryControlLaw):
  599. """Additive Increase with Additive Decrease"""
  600. def __init__( self, increase_delta, decrease_delta ):
  601. assert type(increase_delta) in [float,int,long] and \
  602. increase_delta > 0.0
  603. assert type(decrease_delta) in [float,int,long] and \
  604. increase_delta > 0.0
  605. self._increase_delta = increase_delta
  606. self._decrease_delta = decrease_delta
  607. self._ssthresh = 1.e50 # infinity
  608. def __call__( self, is_congested, rate ):
  609. """Passed rate is the averaged upload rate. Returned rate
  610. is a rate limit."""
  611. if is_congested:
  612. limit = rate - self._decrease_delta
  613. rate = max(rate, 1000)
  614. self._ssthresh = limit # congestion resets slow-start threshold
  615. elif rate > self._ssthresh - self._increase_delta:
  616. self._ssthresh += self._increase_delta
  617. # allow slow-start
  618. limit = min( self._ssthresh, 2.0 * rate )
  619. return rate
  620. class AIMDControlLaw(BinaryControlLaw):
  621. """Rate-based Additive Increase with Multiplicative Decrease w/
  622. multiplicative slow-start."""
  623. def __init__( self, increase_delta, decrease_factor ):
  624. assert type(increase_delta) in [float,int,long] and \
  625. increase_delta > 0.0
  626. assert type(decrease_factor) == float and decrease_factor > 0.0 and \
  627. decrease_factor < 1.0
  628. self._increase_delta = increase_delta
  629. self._decrease_factor = decrease_factor
  630. self._ssthresh = 1.e50 # infinity
  631. def __call__( self, is_congested, rate ):
  632. """Passed rate is the averaged upload rate. Returned rate
  633. is a rate limit."""
  634. if is_congested:
  635. print "CONGESTION!"
  636. limit = rate * self._decrease_factor
  637. self._ssthresh = limit # congestion resets slow-start threshold
  638. elif rate > self._ssthresh - self._increase_delta:
  639. self._ssthresh += self._increase_delta
  640. # allow slow-start
  641. limit = min( self._ssthresh, 2.0 * rate )
  642. if debug:
  643. print "AIMD: time=%f rate=%f ssthresh=%f limit=%f" % \
  644. (bttime(), rate, self._ssthresh, limit)
  645. return limit
  646. class StarvationPrevention(object):
  647. """Baseclass for objects that bound rate limit to prevent starvation."""
  648. def __call__( self, rate ): # may have to add additional args.
  649. """Passed rate is current average rate. Returned rate is a rate
  650. limit."""
  651. pass
  652. class FixedStarvationPrevention(StarvationPrevention):
  653. def __init__( self, lower_rate_bound ):
  654. self._lower_rate_bound = lower_rate_bound
  655. def __call__( self, rate ):
  656. print "starvation rate=", max(rate,self._lower_rate_bound)
  657. return max( rate, self._lower_rate_bound )
  658. class BandwidthManager(object):
  659. """Controls allocation of bandwidth between foreground and background
  660. traffic. Currently all BitTorrent traffic is considered background.
  661. Background traffic is subjected to a global rate limit that is
  662. reduced during congestion to allow foreground traffic to takeover.
  663. A 'starvation prevention' building block applies a lower bound.
  664. """
  665. def __init__(self, external_add_task, config, set_config,
  666. get_remote_endpoints, get_rates):
  667. if debug:
  668. if config['bandwidth_management']:
  669. print "bandwidth management is up."
  670. else:
  671. print "!@#!@#!@#!@#!@# bandwidth management is down."
  672. self.external_add_task = external_add_task
  673. self.config = config
  674. self.set_config = set_config
  675. self.get_rates = get_rates
  676. if os.name == 'nt':
  677. icmp_impl = Win32Icmp()
  678. elif os.name == 'posix':
  679. icmp_impl = UnixIcmp(external_add_task, config['xicmp_port'])
  680. def got_new_rtt(rtt):
  681. print "got_new_rtt, rtt=", rtt
  682. self.external_add_task(0, self._inspect_rates, rtt)
  683. self.rttmonitor = RTTMonitor(got_new_rtt, icmp_impl)
  684. self.nodefeeder = NodeFeeder(add_task=external_add_task,
  685. get_remote_endpoints=get_remote_endpoints,
  686. rttmonitor=self.rttmonitor)
  687. self.start_time = bttime()
  688. if config['control_law'] == 'aimd':
  689. self.control_law = AIMDControlLaw(config['increase_delta'],
  690. config['decrease_factor'])
  691. elif config['control_law'] == 'aiad':
  692. self.control_law = AIADControlLaw(config['increase_delta'],
  693. config['decrease_delta'])
  694. # This configurability is temporary during testing/tuning. --Dave
  695. if config['congestion_estimator'] == "chebyshev":
  696. self.congestion_estimator = ChebyshevCongestionEstimator(
  697. config['window_size'], config['drop_every_nth'],
  698. config['cheby_max_probability'],
  699. config['cheby_max_consecutive'],
  700. config['cheby_max_threshold'], config['ewma'])
  701. elif config['congestion_estimator'] == "variance":
  702. self.congestion_estimator = VarianceCongestionEstimator(
  703. config['window_size'])
  704. elif config['congestion_estimator'] == "vegasish":
  705. self.congestion_estimator = VegasishCongestionEstimator(
  706. config['window_size'], config['drop_every_nth'])
  707. else:
  708. raise BTFailure(_("Unrecognized congestion estimator '%s'.") %
  709. config['congestion_estimator'])
  710. self.starvation_prevention = FixedStarvationPrevention(
  711. config['min_upload_rate_limit'] )
  712. if stats:
  713. rlimit_vs_time = \
  714. os.path.join( stats_dir, "rlimit_vs_time.plotdata" )
  715. fp = open( rlimit_vs_time, "w" )
  716. self.control_law = StreamTracer(self.control_law, fp)
  717. _copy_gnuplot( "rlimit_vs_time.gnuplot" )
  718. # samples are max(min_upload_rate_limit,rate).
  719. slimit_vs_time = \
  720. os.path.join( stats_dir, "slimit_vs_time.plotdata" )
  721. fp = open( slimit_vs_time, "w" )
  722. self.starvation_prevention = StreamTracer(
  723. self.starvation_prevention, fp)
  724. delay_vs_time = os.path.join( stats_dir, "delay_vs_time.plotdata" )
  725. self.dfp = open( delay_vs_time, "w" )
  726. _copy_gnuplot( "delay_vs_time.gnuplot" )
  727. #def congestion_estimator_vegas_greg(self, rtt, rate):
  728. #
  729. # middle_rtt = ((self.propagation_estimator(rtt) +
  730. # self.delay_on_full_estimator(rtt)) / 2.0 )
  731. # if t > middle_rtt and c < 0.5:
  732. # rate *= 0.5
  733. # if debug:
  734. # print type, "down to", rate
  735. # else:
  736. # rate += 1000 # hmm
  737. # if debug:
  738. # print type, "up to", rate
  739. # return rate
  740. #def congestion_estimator_ratio(self, type, t, p, min_p, max_p, rate):
  741. # ratio = p / max_p
  742. # if debug:
  743. # print "RATIO", ratio
  744. # if ratio < 0.5:
  745. # rate = ratio * self.max_rates[type]
  746. # if debug:
  747. # print type.upper(), "SET to", rate
  748. # else:
  749. # rate += rate * (ratio/10.0) # hmm
  750. # if debug:
  751. # print type.upper(), "UP to", rate
  752. #
  753. # return max(rate, 1000)
  754. #def congestion_estimator_stddev(self, type, std, max_std, rate):
  755. # if std > (max_std * 0.80): # FUDGE
  756. # rate *= 0.80 # FUDGE
  757. # if debug:
  758. # print type.upper(), "DOWN to", rate
  759. # else:
  760. # rate += 1000 # FUDGE
  761. # if debug:
  762. # print type.upper(), "UP to", rate
  763. #
  764. # return max(rate, 1000) # FUDGE
  765. #def _affect_rate(self, type, std, max_std, rate, set):
  766. # rate = self._congestion_estimator_stddev(type, std, max_std, rate)
  767. #
  768. # rock_bottom = False
  769. # if rate <= 1000:
  770. # if debug:
  771. # print "Rock bottom"
  772. # rock_bottom = True
  773. # rate = 1000
  774. #
  775. # set(int(rate))
  776. # if stats:
  777. # print "BandwidthManager._affect_rate(%f)" % rate
  778. # self.rfp.write( "%d\t%d\n" % (bttime(),int(rate)) )
  779. # self.sdevfp.write( "%d\t%f\n" % (bttime(), std ) )
  780. #
  781. # return rock_bottom
  782. def _inspect_rates(self, t = None):
  783. """Called whenever an RTT sample arrives. If t == None then
  784. a timeout occurred."""
  785. if t == None:
  786. t = self.rttmonitor.get_current_rtt()
  787. if t == None:
  788. # this makes timeouts reduce the maximum std deviation
  789. self.congestion_estimator.timeout()
  790. return
  791. if debug:
  792. print "BandwidthManager._inspect_rates: %d" % t
  793. if stats:
  794. self.dfp.write( "%f\t%f\n" % (bttime(),t) )
  795. if not self.config['bandwidth_management']:
  796. return
  797. # TODO: slow start should be smarter than this
  798. #if self.start_time < bttime() + 20:
  799. # self.config['max_upload_rate'] = 10000000
  800. # self.config['max_dowload_rate'] = 10000000
  801. #if t < 3:
  802. # # I simply don't believe you. Go away.
  803. # return
  804. tup = self.get_rates()
  805. if tup == None:
  806. return
  807. uprate, downrate = tup
  808. # proceed through the building blocks. (We can swap in various
  809. # implementations of each based on config).
  810. is_congested = self.congestion_estimator(t,uprate)
  811. rate_limit = self.control_law(is_congested,uprate)
  812. rate_limit = self.starvation_prevention(rate_limit)
  813. self._set_rate_limit(rate_limit)
  814. def _set_rate_limit(self,rate_limit):
  815. self.set_config('max_upload_rate', rate_limit)
  816. if __name__ == "__main__":
  817. # perform unit tests.
  818. n_tests = 0
  819. n_tests_passed = 0
  820. n_tests += 1
  821. medmin = MedianOfMinWindow(3, 7)
  822. if medmin(5) != 5: # [5], i = 1
  823. print "FAILED. Median [5] %s should be 5, but it is %d." % \
  824. (medmin,medmin())
  825. else:
  826. n_tests_passed += 1
  827. n_tests += 1
  828. m = medmin(6) # [5,6], i = 2
  829. if medmin._window != [5,6] or m < 5.5-0.01 or m > 5.5 + 0.01:
  830. print ( "FAILED. medmin should be [5,6] with median 5.5, but it is "
  831. "%s with median %f." % (medmin,medmin()))
  832. else:
  833. n_tests_passed += 1
  834. n_tests += 1
  835. m = medmin(7) # [5,6,7], i = 3
  836. if medmin._window != [5,6,7] or m < 6-0.01 or m > 6+0.01:
  837. print ( "FAILED. medmin should be [5,6,7] with median 6, but it is %s"
  838. " with median %f." ) % (medmin,medmin())
  839. else:
  840. n_tests_passed += 1
  841. n_tests += 1
  842. medmin(8) # [5,6,7], discard 8, i = 4
  843. medmin(9) # [5,6,7], discard 9, i = 5
  844. m = medmin(10) # [5,6,7], discard 10, i = 6
  845. if medmin._window != [5,6,7] or m < 6-0.01 or m > 6+0.01:
  846. print ( "FAILED. After inserting [5..10], we should've dropped 8,9 "
  847. "and 10 leaving us with [5,6,7], but the list is %s with "
  848. "median %d." ) % (medmin,medmin())
  849. else:
  850. n_tests_passed += 1
  851. n_tests += 1
  852. m = medmin(11)
  853. if medmin._window != [6,7,11] or m < 7-0.01 or m > 7+0.01:
  854. print ( "FAILED. After inserting [5..11], we should've dropped "
  855. "8,9 and 10, but when 11 is added this is the 'every_nth'"
  856. " and thus we should've dropped 5 instead leaving us with "
  857. "[6,7,11] and thus a median of 7." )
  858. else:
  859. n_tests_passed += 1
  860. n_tests += 1
  861. avgmin = AverageOfMinWindow(3,7)
  862. m = avgmin(2)
  863. if m < 2-0.01 or m > 2+0.01: # [2], i = 1
  864. print ("FAILED. Average of [2] should be 2, but average of %s is "
  865. "%f." ) % (avgmin, avgmin(2))
  866. else:
  867. n_tests_passed += 1
  868. n_tests += 1
  869. m = avgmin() # [2], i = 1
  870. if m < 2-0.01 or m > 2+0.01:
  871. print ( "FAILED. Average after inserting no new elements should "
  872. "remain unchanged at 2, but it is %d." ) % val
  873. else:
  874. n_tests_passed += 1
  875. n_tests += 1
  876. avgmin(3) # [2,3], i = 2
  877. val = avgmin(5) # [2,3,5], i = 3
  878. if avgmin._window != [2,3,5] or val < 10/3.-0.01 or val > 10/3.+0.01:
  879. print ( "FAILED: after avgmin(3), avgmin's window should contain "
  880. "[2,3,5] and have average %f, but it is %s with average %f." %
  881. ( (2+3+5)/3., avgmin, avgmin() ) )
  882. else:
  883. n_tests_passed += 1
  884. n_tests += 1
  885. val = avgmin(0) # [0,2,3], i = 4
  886. if avgmin._window != [0,2,3] or val < 5/3.-.01 or val > 5/3.+0.01:
  887. print ( "FAILED: avgmin's window should contain [0,2,3] and have "
  888. "average %f, but it is %s with average %f." %
  889. ( (0+2+3)/3., avgmin._window, avgmin() ) )
  890. else:
  891. n_tests_passed += 1
  892. n_tests += 1
  893. avgmin(2) # [0,2,2], i = 5
  894. avgmin(4) # [0,2,2], i = 6, discarded 4
  895. avgmin(1) # [1,2,2], i = 7, dropped smallest 0
  896. if avgmin._window != [1,2,2] or val < 5/3.-0.01 or val > 5/.3+0.01:
  897. print ( "FAILED: avgmin's window should contain [1,2,2] and have "
  898. "average %f, but it is %s with average %f." %
  899. ( (1+2+2)/3., avgmin, avgmin() ) )
  900. else:
  901. n_tests_passed += 1
  902. n_tests += 1
  903. medmax = MedianOfMaxWindow(3,7)
  904. val = medmax(2) # i = 1
  905. if medmax != [2] or val < 2-0.01 or val > 2+0.01:
  906. print ( "FAILED: medmax should be [2] and median 2, but medmax is %s "
  907. "and median is %f." % (medmax, medmax()) )
  908. else:
  909. n_tests_passed += 1
  910. n_tests += 1
  911. medmax(4)
  912. val = medmax(1) # i = 3
  913. if medmax != [1,2,4] or val < 2-0.01 or val > 2+0.01:
  914. print ( "FAILED: medmax should be [1,2,4] and median 2, but medmax "
  915. "is %s and median is %f." % (medmax, medmax()) )
  916. else:
  917. n_tests_passed += 1
  918. n_tests += 1
  919. val = medmax(5) # i = 4
  920. if medmax != [2,4,5] or val < 4-0.01 or val > 4+0.01:
  921. print ( "FAILED: medmax should be [2] and median 2, but medmax is "
  922. "%s and median is %f." % (medmax, medmax()) )
  923. else:
  924. n_tests_passed += 1
  925. n_tests += 1
  926. medmax(1) # i = 5
  927. medmax(1) # i = 6
  928. val = medmax(3) # i = 7
  929. if medmax != [2,3,4] or val < 3-0.01 or val > 3+0.01:
  930. print ( "FAILED: medmax should be [3] and median 3, but medmax is "
  931. "%s and median is %f." % (medmax,medmax()) )
  932. else:
  933. n_tests_passed += 1
  934. if n_tests == n_tests_passed:
  935. print "Passed all %d tests." % n_tests