| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # By Greg Hazel and David Harrison
- import sys
- if __name__ == "__main__":
- sys.path = [".."] + sys.path
-
- debug = True
- stats = True # collect statistics and dump to files.
- import os # for stats
- import platform # for stats
- import time # for stats
- import math
- import socket
- import itertools
- from BitTorrent.obsoletepythonsupport import set
- from BitTorrent.RTTMonitor import RTTMonitor, Win32Icmp, UnixIcmp
- from BitTorrent.HostIP import get_host_ips
- from BitTorrent.platform import bttime, get_temp_subdir
- from BitTorrent.Lists import SizedList, LargestDropSizedList, \
- RandomDropSizedList
- stats_dir = None
- if stats:
- stats_dir = os.path.join(platform.get_temp_subdir(), "stats")
- os.mkdir(stats_dir)
- if debug: print "BandwidthManager: stats_dir = %s" % stats_dir
- class NodeFeeder(object):
- """Every few minutes, this will obtain the set of known peers from
- the running torrents and then tell the RTTMonitor to retraceroute
- to these peers to reidentify the common path."""
- def __init__(self, add_task, get_remote_endpoints, rttmonitor):
- self.add_task = add_task
- self.get_remote_endpoints = get_remote_endpoints
- self.rttmonitor = rttmonitor
- self.add_task(3, self._collect_nodes)
- def _collect_nodes(self):
- addrs = self.get_remote_endpoints()
- print "_collect_nodes: addrs=",addrs
-
- local_ips = get_host_ips()
-
- ips = set()
- for (ip, port) in addrs:
- if ip is not None and ip != "0.0.0.0" and ip not in local_ips:
- ips.add(ip)
-
- self.rttmonitor.set_nodes_restart(ips)
- delay = 5
- if len(ips) > 0:
- delay = 300
-
- self.add_task(delay, self._collect_nodes)
- def _copy_gnuplot(fname):
- if os.getenv("BT"):
- src = os.path.join(os.getenv("BT"), "test",
- "test_BandwdithManager", fname )
- dst = os.path.join( stats_dir, fname )
- shutil.copy(src,dst)
- def ratio_sum_lists(a, b):
- tx = float(sum(a))
- ty = float(sum(b))
- return tx / max(ty, 0.0001)
- def mean(l):
- N = len(l)
- total = float(sum(l))
- return total/N
- def median(l):
- assert len(l)> 0
- if len(l) == 1:
- return l[0]
- elif len(l) % 2 == 0: # even number of elements.
- return (l[len(l)/2-1] + l[len(l)/2])/2.
- else: # odd number of elements.
- return l[len(l)/2]
-
-
- def variance(l):
- N = len(l)
- x = 0
-
- total = float(sum(l))
-
- mean = total/N
-
- # Subtract the mean from each data item and square the difference.
- # Sum all the squared deviations.
- for i in l:
- x += (i - mean)**2.0
-
- try:
- if False:
- # Divide sum of squares by N-1 (sample variance).
- variance = x / (N - 1)
- else:
- # Divide sum of squares by N (population variance).
- variance = x / N
- except ZeroDivisionError:
- variance = 0
- return variance
- def standard_deviation(l):
- return math.sqrt(variance)
- class Tracer(object):
- """Baseclass for sampler wrapper objects."""
- def __init__( self, func ):
- self._func = func
- def __call__(self, *args, **kwargs):
- return self._func(*args, **kwargs)
- class StreamTracer(Tracer):
- """outputs monitored samples to a stream."""
- def __init__( self, func, stream ):
- Tracer.__init__(self,func)
- self._stream = stream
- self._i = 0
- def __call__(self, *args, **kwargs):
- val = self._func( *args, **kwargs)
- self._stream.write("%f\t%f\n" % (bttime(),val))
- self._i = (self._i+1) % 8 # flush a little more often.
- if self._i == 0:
- self._stream.flush()
- return val
- class MinWindow(object):
- """Maintains a list of the 'window_size' smallest
- samples seen. After every_nth sample, the smallest is dropped.
- Dropping allows the sampler to eventually recover from spurious
- outliers. This class is meant to be derived. See
- MedianOfMinWindow, etc.
- """
- def __init__(self, window_size, every_nth ):
- self._window = LargestDropSizedList( window_size )
- self._i = 0
- self._every_nth = every_nth
- def __getitem__(self, i):
- return self._window[i]
-
- def __len__( self ):
- return len(self._window)
- def __str__(self):
- return str(self._window)
-
- def __iter__(self):
- return self._window.__iter__()
-
- def update(self, sample):
- self._i = (self._i+1)%self._every_nth
- if self._i == 0: # drop smallest sample.
- self._window.popleft()
- self._window.insort( sample )
-
- def timeout(self):
- """If a timeout occurs then it is possible that our propagation
- estimate is too large. We therefore drop the top sample."""
- if len(self._window) > 1:
- self._window.pop() # drop largest sample.
-
- class MedianOfMinWindow(MinWindow):
- """Computes the median of a MinWindow.
- By using the median of the smallest, we add some
- additional resistance to outliers.
- If fp is provided then the medians of the min window
- are output to a file."""
- def __init__( self, window_size, every_nth ):
- MinWindow.__init__(self,window_size, every_nth)
-
- def __call__( self, sample = None):
- if sample is not None: self.update(sample)
- #print "MedianOfMinWindow:"
- #for i in xrange(len(self._window)):
- # print " ", self[i]
- return median(self)
- class AverageOfMinWindow(MinWindow):
- """Computes the average of a MinWindow. By using the average of
- the smallest, we add some additional resistance to outliers."""
- def __init__( self, window_size, every_nth ):
- MinWindow.__init__(self, window_size, every_nth)
-
- def __call__( self, sample = None):
- if sample is not None: self.update(sample)
- return mean(self._window)
-
- class MaxWindow(object):
- """Maintains a list of the 'window_size' largest
- samples seen. After every_nth sample, the largest is dropped.
- Dropping allows the sampler to eventually recover from spurious
- outliers. This class is meant to be derived. See
- MedianOfMaxWindow, etc."""
- def __init__(self, window_size, every_nth ):
- self._window = LargestDropSizedList( window_size ) # reverse w/ -sample
- self._i = 0
- self._every_nth = every_nth
- def __len__( self ):
- return len(self._window)
-
- def __str__(self):
- l = list(self._window)
- l.reverse()
- l = [-x for x in l]
- return str(l)
-
- def __iter__(self):
- class Iterator:
- def __init__(self,minwindow):
- self._minwindow = minwindow
- self._i = 0
- def next(self):
- if self._i < len(self._minwindow):
- val = self._minwindow[self._i]
- self._i += 1
- return val
- raise StopIteration()
- return Iterator(self)
- def __eq__(self,l):
- if isinstance(l,MaxWindow):
- return l._window == self._window
- elif isinstance(l,list):
- return l == [x for x in self]
- else:
- return False
- def __ne__(self,l):
- return not l == self
-
- def __getitem__(self,i):
- """retrieves the ith element in the window where element at index 0
- is the smallest."""
- return -self._window[len(self._window)-i-1]
-
- def update(self, sample):
- self._i = (self._i+1)%self._every_nth
- if self._i == 0: # drop largest sample.
- self._window.popleft()
- # maintain order. last element should be smallest so insert -sample.
- self._window.insort(-sample)
- class MedianOfMaxWindow(MaxWindow):
- """Computes the median of a MaxWindow.
- By using the median of the largest, we add some
- resistance to outliers."""
- def __init__( self, window_size, every_nth ):
- MaxWindow.__init__(self,window_size, every_nth)
-
- def __call__( self, sample = None ):
- if sample is not None: self.update(sample)
- return median(self)
-
- class AverageOfMaxWindow(MaxWindow):
- """Computes the average of a MaxWindow. By using the average of
- the largest, we add some resistance to outliers."""
- def __init__( self, window_size, every_nth ):
- MaxWindow.__init__(self, window_size, every_nth)
-
- def __call__( self, sample = None ):
- if sample is not None: self.update(sample)
- return mean(self._window)
- class AverageOfLastWindow(object):
- def __init__( self, window_size ):
- self._window = SizedList(window_size)
- def __call__( self, sample = None ):
- if sample is not None:
- self._window.append(sample)
- return mean(self._window)
- class MedianOfLastWindow(object):
- def __init__( self, window_size ):
- self._window = RandomDropSizedList(window_size)
- def __call__( self, sample = None ):
- """If passed no sample then returns current median."""
- if sample is not None:
- # maintain order.
- self._window.insort(sample)
- # randomly expels one sample to keep size. Dropping a random
- # sample exhibits no bias (as opposed to dropping smallest or
- # largest as done with MinWindow and MaxWindow)
- return median(self._window)
- class EWMA(object):
- def __init__( self, alpha, init_avg = None ):
- """Exponentially Weighted Moving Average (EWMA) functor.
- @param alpha: the weight used in the EWMA using 'smaller is
- slower' convention.
- @param init_avg: starting value of the average. If None then
- the average is only defined after the first sample and in
- the first call is set to the sample.
- """
- self._alpha = alpha
- self._avg = init_avg
- def __call__( self, sample = None ):
- """Computes the moving average after taking into account the passed
- sample. If passed nothing then it just returns the current average
- value."""
- if sample == None:
- if self._avg == None:
- raise ValueError( "Tried to retrieve value from EWMA before "
- "first sample." )
- else:
- if self._avg == None:
- self._avg = sample
- else:
- self._avg = (1-self._alpha) * self._avg + self._alpha * sample
- return self._avg
- class BinaryCongestionEstimator(object):
- """Abstract baseclass for congestion estimators that measure congestion
- based on rate, rtt, and/or loss. It's up to the
- subclass to decide whether congestion is occuring."""
- def timeout(self):
- """Called when a timeout occurs."""
- pass
- def __call__( self, rtt, uprate ):
- pass
- class VarianceCongestionEstimator(BinaryCongestionEstimator):
- """Congestion is assumed iff the stddev exceeds a threshold
- fraction of the maximum standard deviation."""
- # OBJECTION: Variance maximization works so long as the rate remains
- # below (to the left) of the peak in the variance versus rate curve.
- # In this regime, increasing rate causes an increase in variance.
- # When measured variance exceeds a threshold the system backs off causing
- # the variance to diminish. The system oscillates about this
- # optimal point.
- #
- # HOWEVER, the system behaves quite differently when rates are high, i.e.,
- # close to the bottleneck capacity. The graph of delay versus send rate
- # looks similar to a bellcurve. Our system increases the send rate
- # whenever the variance is below a threshold and decreases when
- # above the threshold. This is the correct behavior when on the
- # left-hand side of the bell curve. However, it is the OPPOSITE
- # of the desired behavior when to the right of the peak of the
- # bell curve. As a result, when rates get too high the system
- # continues to increase send rate until loss occurs.
- #
- # When loss occurs, the system backs off. When the control law is
- # AIMDControlLaw, the system backs off multiplicatively. This backoff
- # moves the system to the left on the bell curve. If the move is large
- # enough the system climbs over the hump and the system resumes
- # the proper behavior of increasing rate whenever variance is below
- # a threshold. If however, the backoff is insufficient to reach
- # the peak of the bell curve then the system slides back to the right
- # and resumes increasing send rate until loss occurs.
- #
- # This system is not guaranteed to converge on the equilibrium from all
- # feasible points.
- # -- David Harrison
- def __init__(self, window_size):
- self._window = SizedList(window_size)
- self._window_size = window_size
- self._max_var = 0.0
- if stats:
- delay_var_vs_time = os.path.join( stats_dir,
- "delay_var_vs_time.plotdata" )
- self._var_fp = open( delay_var_vs_time, "w" )
- max_var_vs_time = os.path.join( stats_dir,
- "max_var_vs_time.plotdata" )
- self._max_var_fp = open( max_var_vs_time, "w" )
- _copy_gnuplot( "var_vs_time.gnuplot" )
- def timeout(self):
- self._max_var *= 0.64 # FUDGE. same as using 0.8 * max stddev.
- if stats:
- self._max_var_fp.write( "%f\t%f\n" % (bttime(), self._max_var) )
-
- def __call__( self, rtt, rate ):
- self._window.append(rtt)
- var = variance(self._window)
- if stats:
- self._var_fp.write( "%f\t%f\n" % (bttime(), var) )
- if var > self._max_var:
- self._max_var = var
- if stats:
- self._max_var_fp.write( "%f\t%f\n" % (bttime(), self._max_var))
- # won't signal congestion until we have at least a full window's
- # worth of samples.
- if self._window < self._window_size:
- return False
- if var > (self._max_var * 0.64): # FUDGE
- return True
- else:
- return False
- class ChebyshevCongestionEstimator(BinaryCongestionEstimator):
- """This congestion estimator first estimates the variance
- and mean of the conditional delay distribution for the condition
- when the bottleneck is uncongested. We then test for the
- congested state based on delay samples exceeding a threshold.
- We set the threshold based on the Chebyshev Inequality:
-
- Chebysev Inequality:
- P[(X-u)^2 >= k^2] <= sigma^2 / k^2 (1)
- More details are given in source code comments."""
- #
- # Here u and sigma are the conditional mean and stddev, X is a single
- # sample. We thus can bound the probability of a single sample exceeding
- # a threshold. If we set the delay threshold to k such that we trigger
- # congestion detection whenever the delay threshold is exceeded then
- # this inequality can be interpreted as an UPPER BOUND ON THE PROBABILITY
- # OF A FALSE POSITIVE CONGESTION DETECTION.
- #
- # Because we are dealing with a single bottleneck, we can know the
- # min (propagation) delay and max (buffer full) delay. We thus know
- # the worst case variance occurs when half the samples are at the upper
- # bound and half at the lower bound resulting in
- #
- # 2
- # (max-min)
- # var_sample approx ------- (2)
- # 4
- #
- # Substituting (2) into (1) yields
- # 2
- # (max-min)
- # P[(X-u)^2 >= k^2] <= --------- (3)
- # 4 k^2
- #
- # When the worst-case variance is achieved, the mean u = (max-min)/2.
- # If we then set the threshold equal to max then k = (max-min)/2 and
- # (3) becomes
- #
- # P[(X-u)^2 >= k^2] <= 1 (4)
- #
- # This means that in the worst-case, the Chebyshev inequality provides
- # a useless bound. However, for anything less than the worst-case,
- # the false positive probability is less than 1. But wait, how is it
- # useful to have a false positive probability near 1?
- #
- # If delay samples are independent when in the uncongested state,
- # the probability that n consecutive samples exceed the theshold
- # becomes,
- #
- # P[For n consecutive samples, (X-u)^2 >= k^2] <= (sigma^2 / k^2)^n
- # (5)
- #
- # If we only signal congestion when n consecutive samples are
- # above the threshold and if sigma^2 / k^2 < 1, then we can set
- # the probability of a false positive to anything we like by
- # setting n sufficiently large.
- #
- # I argue that during the uncongested state, the samples should be
- # approximately independent, because when the network is
- # uncongested, delay is not driven by queueing or at least not
- # by queueing that persists across a significant portion of our
- # sample window.
- #
- # To allow for the most distance above and below the threshold,
- # we try to set the threshold to (max-min)/2, but we will increase
- # the threshold if necessary should the number of consecutive samples
- # needed become too large (i.e., exceed max_consecutive) or if
- # the conditional mean exceeds (max-min)/2.
- #
- # k = thresh - u
- # k = (max-min)/2 - u
- # let P = P[For n consecutive samples,(X-u)^2 >= k^2] = false_pos_prob
- #
- # 2n
- # sigma
- # P <= -----------------
- # max-min 2n
- # ( -------- - u )
- # 2
- #
- # log P <= 2n log ( sigma / ((max-min)/2 - u))
- #
- # 1 log P
- # n >= - ------------------------------ . (6)
- # 2 log (sigma / ((max-min)/2 - u)
- #
- # We then turn >= in (6) to an assignment to derive the n used in
- # detecting congestion. If n exceeds max_consecutive then we adjust
- # the threshold keeping n at max_consecutive.
- #
- # 2n
- # sigma
- # P >= ---------------
- # 2n
- # (thresh - u )
- #
- #
- # sigma
- # thresh >= ------- + u (7)
- # P^(1/2n)
- #
- # The threshold is not allowed to exceed max_thresh of the way
- # between min and max. When threshold reaches this point, the
- # thresholds become less meaningful and the performance of the
- # congestion estimator is likely to suffer.
- #
- # Implementation Complexity:
- # The computational burden of this congestion estimator is
- # significantly higher than the TCP Reno/Tahoe loss based or TCP Vegas
- # delay-based congestion estimators, but this algorithm is applied
- # on the aggregate of ALL connections passing through the access point.
- # State maintainence for the 100+ connections created by BitTorrent
- # dwarfs the computational burden of this estimator.
- def __init__(self, window_size, drop_every_nth,
- false_pos_prob, max_consecutive, max_thresh, ewma ):
- assert drop_every_nth > 0
- assert false_pos_prob > 0.0 and false_pos_prob < 1.0
- assert max_consecutive > 1
- assert max_thresh > 0.0 and max_thresh < 1.0
- assert ewma > 0.0 and ewma < 1.0
- # parameters
- self._window_size = window_size
- self._false_pos_prob = false_pos_prob
- self._max_consecutive = max_consecutive
- self._thresh = max_thresh
- # estimators
- self._window = SizedList(window_size)
- self._propagation_estimator = \
- MedianOfMinWindow(window_size, drop_every_nth)
- self._delay_on_full_estimator = \
- MedianOfMaxWindow(window_size, drop_every_nth)
- self._cond_var = EWMA(alpha = ewma) # variance when uncongested.
- self._cond_mean = EWMA(alpha = ewma) # mean when uncongested.
- # counters
- self._init_samples = 0 # count of first samples.
- self._consecutive = 0 # consecutive samples above the threshold.
- # computed thresholds
- self._n = None
- self._thresh = None
- if stats:
- prop_vs_time = os.path.join( stats_dir, "prop_vs_time.plotdata" )
- fp = open( prop_vs_time, "w" )
- self._propagation_estimator = \
- StreamTracer( self._propagation_estimator, fp )
- full_vs_time = os.path.join( stats_dir, "full_vs_time.plotdata" )
- fp = open( full_vs_time, "w" )
- self._delay_on_full_estimator = \
- StreamTracer( self._delay_on_full_estimator, fp )
- cmean_vs_time = os.path.join( stats_dir, "cmean_vs_time.plotdata" )
- fp = open( cmean_vs_time, "w" )
- self._cond_mean = StreamTracer( self._cond_mean, fp )
- cvar_vs_time = os.path.join( stats_dir, "cvar_vs_time.plotdata" )
- fp = open( cvar_vs_time, "w" )
- self._cond_var = StreamTracer( self._cond_var, fp )
- thresh_vs_time = os.path.join(stats_dir,"thresh_vs_time.plotdata")
- self._thfp = open( thresh_vs_time, "w" )
- n_v_time = os.path.join( stats_dir, "n_vs_time.plotdata" )
- self._nfp = open( n_vs_time, "w" )
-
- def timeout(self):
- self._delay_on_full_estimator.timeout()
- def __call__( self, rtt, rate ):
- self._window.append(rtt)
- full = self._delay_on_full_estimator(rtt)
- prop = self._propagation_estimator(rtt)
- if ( self._init_samples < self._window_size ):
- # too few samples to determine whether there is congestion...
- self._init_samples += 1
- return False
- # enough samples to initialize conditional estimators.
- elif self._init_samples == self._window_size:
- self._init_samples += 1
- self._update(rtt)
- return False
- assert self._n is not None and self._thresh is not None
- epsilon = ( full - prop ) * 0.05
- # if delay is within epsilon of the propagation delay then
- # assume that we are in the uncongested state. We use the
- # window's middle sample to reduce bias.
- if self._window[len(self._window)/2] < prop + epsilon:
- self._update(rtt) # updates thresholds.
- if rtt > self._thresh:
- self._consecutive += 1
- if self._consecutive >= self._n:
- self._consecutive = 0 # don't generate multiple detections
- # for single period of congestion unless
- # it persists across separate trials
- # of n samples each.
- return True # congestion detected
- else:
- self._consecutive = 0
- return False # no congestion detected
-
-
- def _update(self, rtt):
- """update thresholds when delay is within epsilon of the
- estimated propagation delay."""
-
- var = self._cond_var(variance(self._window))
- u = self._cond_mean(mean(self._window))
- # 1 log P
- # n >= - ------------------------------ . (6)
- # 2 log (sigma / ((max-min)/2 - u)
- sigma = math.sqrt(var)
- max = self._delay_on_full_estimator()
- min = self._propagation_estimator()
- p = self._false_pos_prob
- thresh = (max-min)/2
- if thresh > u:
- n = int(0.5 * math.log(p) / math.log(sigma/(thresh-u)))
- if n < 1:
- n = 1
-
- if thresh <= u or n > self._max_consecutive:
- n = self._max_consecutive
-
- # sigma
- # thresh >= ------- + u (7)
- # P^(1/2n)
- thresh = sigma / p**(0.5*n) + u
- if thresh > self._max_thresh:
- # this is a bad state. if we are forced to set thresh to
- # max thresh then the rate of false positives will
- # inexorably increase. What else to do?
- thresh = self._max_thresh
- self._thresh = thresh
- self._n = n
- if stats:
- self._thfp.write( "%f\t%f\n" % (bttime(), self._thresh) )
- self._nfp.write( "%f\t%d\n" % (bttime(), self._n) )
-
- class VegasishCongestionEstimator(BinaryCongestionEstimator):
- """Delay-based congestion control with static threshold
- set at 1/2 distance between propagation delay and delay on full
- buffer."""
- def __init__(self, window_size, drop_every_nth ):
- self._rtt_estimator = AverageOfLastWindow(window_size)
- self._propagation_estimator = \
- MedianOfMinWindow(window_size, drop_every_nth)
- self._delay_on_full_estimator = \
- MedianOfMaxWindow(window_size, drop_every_nth)
-
- def __call__(self, rtt, rate):
- middle_rtt = ((self._propagation_estimator(rtt) +
- self._delay_on_full_estimator(rtt)) / 2.0 )
- if rtt > middle_rtt:
- return True
- else:
- return False
- class BinaryControlLaw(object):
- """Increases or decreases rate limit based on a binary congestion
- indication"""
- def __call__( self, is_congested, rate ):
- """Passed rate is the averaged upload rate. Returned rate
- is a rate limit."""
- pass
- class AIADControlLaw(BinaryControlLaw):
- """Additive Increase with Additive Decrease"""
- def __init__( self, increase_delta, decrease_delta ):
- assert type(increase_delta) in [float,int,long] and \
- increase_delta > 0.0
- assert type(decrease_delta) in [float,int,long] and \
- increase_delta > 0.0
- self._increase_delta = increase_delta
- self._decrease_delta = decrease_delta
- self._ssthresh = 1.e50 # infinity
-
- def __call__( self, is_congested, rate ):
- """Passed rate is the averaged upload rate. Returned rate
- is a rate limit."""
- if is_congested:
- limit = rate - self._decrease_delta
- rate = max(rate, 1000)
- self._ssthresh = limit # congestion resets slow-start threshold
- elif rate > self._ssthresh - self._increase_delta:
- self._ssthresh += self._increase_delta
- # allow slow-start
- limit = min( self._ssthresh, 2.0 * rate )
- return rate
-
- class AIMDControlLaw(BinaryControlLaw):
- """Rate-based Additive Increase with Multiplicative Decrease w/
- multiplicative slow-start."""
- def __init__( self, increase_delta, decrease_factor ):
- assert type(increase_delta) in [float,int,long] and \
- increase_delta > 0.0
- assert type(decrease_factor) == float and decrease_factor > 0.0 and \
- decrease_factor < 1.0
- self._increase_delta = increase_delta
- self._decrease_factor = decrease_factor
- self._ssthresh = 1.e50 # infinity
-
- def __call__( self, is_congested, rate ):
- """Passed rate is the averaged upload rate. Returned rate
- is a rate limit."""
- if is_congested:
- print "CONGESTION!"
- limit = rate * self._decrease_factor
- self._ssthresh = limit # congestion resets slow-start threshold
- elif rate > self._ssthresh - self._increase_delta:
- self._ssthresh += self._increase_delta
- # allow slow-start
- limit = min( self._ssthresh, 2.0 * rate )
- if debug:
- print "AIMD: time=%f rate=%f ssthresh=%f limit=%f" % \
- (bttime(), rate, self._ssthresh, limit)
- return limit
- class StarvationPrevention(object):
- """Baseclass for objects that bound rate limit to prevent starvation."""
- def __call__( self, rate ): # may have to add additional args.
- """Passed rate is current average rate. Returned rate is a rate
- limit."""
- pass
-
- class FixedStarvationPrevention(StarvationPrevention):
- def __init__( self, lower_rate_bound ):
- self._lower_rate_bound = lower_rate_bound
- def __call__( self, rate ):
- print "starvation rate=", max(rate,self._lower_rate_bound)
- return max( rate, self._lower_rate_bound )
-
- class BandwidthManager(object):
- """Controls allocation of bandwidth between foreground and background
- traffic. Currently all BitTorrent traffic is considered background.
- Background traffic is subjected to a global rate limit that is
- reduced during congestion to allow foreground traffic to takeover.
- A 'starvation prevention' building block applies a lower bound.
- """
- def __init__(self, external_add_task, config, set_config,
- get_remote_endpoints, get_rates):
- if debug:
- if config['bandwidth_management']:
- print "bandwidth management is up."
- else:
- print "!@#!@#!@#!@#!@# bandwidth management is down."
-
- self.external_add_task = external_add_task
- self.config = config
- self.set_config = set_config
- self.get_rates = get_rates
- if os.name == 'nt':
- icmp_impl = Win32Icmp()
- elif os.name == 'posix':
- icmp_impl = UnixIcmp(external_add_task, config['xicmp_port'])
- def got_new_rtt(rtt):
- print "got_new_rtt, rtt=", rtt
- self.external_add_task(0, self._inspect_rates, rtt)
- self.rttmonitor = RTTMonitor(got_new_rtt, icmp_impl)
- self.nodefeeder = NodeFeeder(add_task=external_add_task,
- get_remote_endpoints=get_remote_endpoints,
- rttmonitor=self.rttmonitor)
- self.start_time = bttime()
- if config['control_law'] == 'aimd':
- self.control_law = AIMDControlLaw(config['increase_delta'],
- config['decrease_factor'])
- elif config['control_law'] == 'aiad':
- self.control_law = AIADControlLaw(config['increase_delta'],
- config['decrease_delta'])
- # This configurability is temporary during testing/tuning. --Dave
- if config['congestion_estimator'] == "chebyshev":
- self.congestion_estimator = ChebyshevCongestionEstimator(
- config['window_size'], config['drop_every_nth'],
- config['cheby_max_probability'],
- config['cheby_max_consecutive'],
- config['cheby_max_threshold'], config['ewma'])
-
- elif config['congestion_estimator'] == "variance":
- self.congestion_estimator = VarianceCongestionEstimator(
- config['window_size'])
- elif config['congestion_estimator'] == "vegasish":
- self.congestion_estimator = VegasishCongestionEstimator(
- config['window_size'], config['drop_every_nth'])
- else:
- raise BTFailure(_("Unrecognized congestion estimator '%s'.") %
- config['congestion_estimator'])
- self.starvation_prevention = FixedStarvationPrevention(
- config['min_upload_rate_limit'] )
- if stats:
- rlimit_vs_time = \
- os.path.join( stats_dir, "rlimit_vs_time.plotdata" )
- fp = open( rlimit_vs_time, "w" )
- self.control_law = StreamTracer(self.control_law, fp)
- _copy_gnuplot( "rlimit_vs_time.gnuplot" )
- # samples are max(min_upload_rate_limit,rate).
- slimit_vs_time = \
- os.path.join( stats_dir, "slimit_vs_time.plotdata" )
- fp = open( slimit_vs_time, "w" )
- self.starvation_prevention = StreamTracer(
- self.starvation_prevention, fp)
-
- delay_vs_time = os.path.join( stats_dir, "delay_vs_time.plotdata" )
- self.dfp = open( delay_vs_time, "w" )
- _copy_gnuplot( "delay_vs_time.gnuplot" )
- #def congestion_estimator_vegas_greg(self, rtt, rate):
- #
- # middle_rtt = ((self.propagation_estimator(rtt) +
- # self.delay_on_full_estimator(rtt)) / 2.0 )
- # if t > middle_rtt and c < 0.5:
- # rate *= 0.5
- # if debug:
- # print type, "down to", rate
- # else:
- # rate += 1000 # hmm
- # if debug:
- # print type, "up to", rate
- # return rate
- #def congestion_estimator_ratio(self, type, t, p, min_p, max_p, rate):
- # ratio = p / max_p
- # if debug:
- # print "RATIO", ratio
- # if ratio < 0.5:
- # rate = ratio * self.max_rates[type]
- # if debug:
- # print type.upper(), "SET to", rate
- # else:
- # rate += rate * (ratio/10.0) # hmm
- # if debug:
- # print type.upper(), "UP to", rate
- #
- # return max(rate, 1000)
- #def congestion_estimator_stddev(self, type, std, max_std, rate):
- # if std > (max_std * 0.80): # FUDGE
- # rate *= 0.80 # FUDGE
- # if debug:
- # print type.upper(), "DOWN to", rate
- # else:
- # rate += 1000 # FUDGE
- # if debug:
- # print type.upper(), "UP to", rate
- #
- # return max(rate, 1000) # FUDGE
-
- #def _affect_rate(self, type, std, max_std, rate, set):
- # rate = self._congestion_estimator_stddev(type, std, max_std, rate)
- #
- # rock_bottom = False
- # if rate <= 1000:
- # if debug:
- # print "Rock bottom"
- # rock_bottom = True
- # rate = 1000
- #
- # set(int(rate))
- # if stats:
- # print "BandwidthManager._affect_rate(%f)" % rate
- # self.rfp.write( "%d\t%d\n" % (bttime(),int(rate)) )
- # self.sdevfp.write( "%d\t%f\n" % (bttime(), std ) )
- #
- # return rock_bottom
- def _inspect_rates(self, t = None):
- """Called whenever an RTT sample arrives. If t == None then
- a timeout occurred."""
- if t == None:
- t = self.rttmonitor.get_current_rtt()
- if t == None:
- # this makes timeouts reduce the maximum std deviation
- self.congestion_estimator.timeout()
- return
- if debug:
- print "BandwidthManager._inspect_rates: %d" % t
- if stats:
- self.dfp.write( "%f\t%f\n" % (bttime(),t) )
- if not self.config['bandwidth_management']:
- return
- # TODO: slow start should be smarter than this
- #if self.start_time < bttime() + 20:
- # self.config['max_upload_rate'] = 10000000
- # self.config['max_dowload_rate'] = 10000000
- #if t < 3:
- # # I simply don't believe you. Go away.
- # return
- tup = self.get_rates()
- if tup == None:
- return
- uprate, downrate = tup
- # proceed through the building blocks. (We can swap in various
- # implementations of each based on config).
- is_congested = self.congestion_estimator(t,uprate)
- rate_limit = self.control_law(is_congested,uprate)
- rate_limit = self.starvation_prevention(rate_limit)
- self._set_rate_limit(rate_limit)
- def _set_rate_limit(self,rate_limit):
- self.set_config('max_upload_rate', rate_limit)
- if __name__ == "__main__":
- # perform unit tests.
- n_tests = 0
- n_tests_passed = 0
- n_tests += 1
- medmin = MedianOfMinWindow(3, 7)
- if medmin(5) != 5: # [5], i = 1
- print "FAILED. Median [5] %s should be 5, but it is %d." % \
- (medmin,medmin())
- else:
- n_tests_passed += 1
- n_tests += 1
- m = medmin(6) # [5,6], i = 2
- if medmin._window != [5,6] or m < 5.5-0.01 or m > 5.5 + 0.01:
- print ( "FAILED. medmin should be [5,6] with median 5.5, but it is "
- "%s with median %f." % (medmin,medmin()))
- else:
- n_tests_passed += 1
- n_tests += 1
- m = medmin(7) # [5,6,7], i = 3
- if medmin._window != [5,6,7] or m < 6-0.01 or m > 6+0.01:
- print ( "FAILED. medmin should be [5,6,7] with median 6, but it is %s"
- " with median %f." ) % (medmin,medmin())
- else:
- n_tests_passed += 1
- n_tests += 1
- medmin(8) # [5,6,7], discard 8, i = 4
- medmin(9) # [5,6,7], discard 9, i = 5
- m = medmin(10) # [5,6,7], discard 10, i = 6
- if medmin._window != [5,6,7] or m < 6-0.01 or m > 6+0.01:
- print ( "FAILED. After inserting [5..10], we should've dropped 8,9 "
- "and 10 leaving us with [5,6,7], but the list is %s with "
- "median %d." ) % (medmin,medmin())
- else:
- n_tests_passed += 1
- n_tests += 1
- m = medmin(11)
- if medmin._window != [6,7,11] or m < 7-0.01 or m > 7+0.01:
- print ( "FAILED. After inserting [5..11], we should've dropped "
- "8,9 and 10, but when 11 is added this is the 'every_nth'"
- " and thus we should've dropped 5 instead leaving us with "
- "[6,7,11] and thus a median of 7." )
- else:
- n_tests_passed += 1
-
- n_tests += 1
- avgmin = AverageOfMinWindow(3,7)
- m = avgmin(2)
- if m < 2-0.01 or m > 2+0.01: # [2], i = 1
- print ("FAILED. Average of [2] should be 2, but average of %s is "
- "%f." ) % (avgmin, avgmin(2))
- else:
- n_tests_passed += 1
- n_tests += 1
- m = avgmin() # [2], i = 1
- if m < 2-0.01 or m > 2+0.01:
- print ( "FAILED. Average after inserting no new elements should "
- "remain unchanged at 2, but it is %d." ) % val
- else:
- n_tests_passed += 1
- n_tests += 1
- avgmin(3) # [2,3], i = 2
- val = avgmin(5) # [2,3,5], i = 3
- if avgmin._window != [2,3,5] or val < 10/3.-0.01 or val > 10/3.+0.01:
- print ( "FAILED: after avgmin(3), avgmin's window should contain "
- "[2,3,5] and have average %f, but it is %s with average %f." %
- ( (2+3+5)/3., avgmin, avgmin() ) )
- else:
- n_tests_passed += 1
- n_tests += 1
- val = avgmin(0) # [0,2,3], i = 4
- if avgmin._window != [0,2,3] or val < 5/3.-.01 or val > 5/3.+0.01:
- print ( "FAILED: avgmin's window should contain [0,2,3] and have "
- "average %f, but it is %s with average %f." %
- ( (0+2+3)/3., avgmin._window, avgmin() ) )
- else:
- n_tests_passed += 1
- n_tests += 1
- avgmin(2) # [0,2,2], i = 5
- avgmin(4) # [0,2,2], i = 6, discarded 4
- avgmin(1) # [1,2,2], i = 7, dropped smallest 0
- if avgmin._window != [1,2,2] or val < 5/3.-0.01 or val > 5/.3+0.01:
- print ( "FAILED: avgmin's window should contain [1,2,2] and have "
- "average %f, but it is %s with average %f." %
- ( (1+2+2)/3., avgmin, avgmin() ) )
- else:
- n_tests_passed += 1
- n_tests += 1
- medmax = MedianOfMaxWindow(3,7)
- val = medmax(2) # i = 1
- if medmax != [2] or val < 2-0.01 or val > 2+0.01:
- print ( "FAILED: medmax should be [2] and median 2, but medmax is %s "
- "and median is %f." % (medmax, medmax()) )
- else:
- n_tests_passed += 1
- n_tests += 1
- medmax(4)
- val = medmax(1) # i = 3
- if medmax != [1,2,4] or val < 2-0.01 or val > 2+0.01:
- print ( "FAILED: medmax should be [1,2,4] and median 2, but medmax "
- "is %s and median is %f." % (medmax, medmax()) )
- else:
- n_tests_passed += 1
- n_tests += 1
- val = medmax(5) # i = 4
- if medmax != [2,4,5] or val < 4-0.01 or val > 4+0.01:
- print ( "FAILED: medmax should be [2] and median 2, but medmax is "
- "%s and median is %f." % (medmax, medmax()) )
- else:
- n_tests_passed += 1
- n_tests += 1
- medmax(1) # i = 5
- medmax(1) # i = 6
- val = medmax(3) # i = 7
- if medmax != [2,3,4] or val < 3-0.01 or val > 3+0.01:
- print ( "FAILED: medmax should be [3] and median 3, but medmax is "
- "%s and median is %f." % (medmax,medmax()) )
- else:
- n_tests_passed += 1
-
- if n_tests == n_tests_passed:
- print "Passed all %d tests." % n_tests
|