| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # By Greg Hazel
- from __future__ import division
- debug = False
- stats = False # collect statistics and dump to files.
- import os # for stats
- from BitTorrent import platform # for stats
- import time # for stats
- import math
- import itertools
- from BTL.obsoletepythonsupport import set
- from BitTorrent.RTTMonitor import RTTMonitor
- #from BitTorrent.RTTMonitor2 import RTTMonitor, Win32Icmp, UnixIcmp
- from BTL.HostIP import get_deferred_host_ips
- from BTL.platform import bttime
- from BTL.Lists import SizedList
- def fp(flt):
- return "%.2f" % flt
-
- class NodeFeeder(object):
- """Every few minutes, this will obtain the set of known peers from
- the running torrents and then tell the RTTMonitor to retraceroute
- to these peers to reidentify the common path."""
- def __init__(self, external_add_task,
- get_remote_endpoints, rttmonitor):
- self.external_add_task = external_add_task
- self.get_remote_endpoints = get_remote_endpoints
- self.rttmonitor = rttmonitor
- self.external_add_task(3, self._start)
- def _start(self):
- df = get_deferred_host_ips()
- df.addCallback(self._collect_nodes)
- def _collect_nodes(self, local_ips):
- addrs = self.get_remote_endpoints()
-
- ips = set()
- for (ip, port) in addrs:
- if ip is not None and ip != "0.0.0.0" and ip not in local_ips:
- assert isinstance(ip, str)
- assert isinstance(port, int)
- ips.add(ip)
-
- self.rttmonitor.set_nodes_restart(ips)
- delay = 5
- if len(ips) > 0:
- delay = 300
-
- self.external_add_task(delay, self._collect_nodes, local_ips)
- def ratio_sum_lists(a, b):
- tx = float(sum(a))
- ty = float(sum(b))
- return tx / max(ty, 0.0001)
- def standard_deviation(l):
- N = len(l)
- x = 0
-
- total = float(sum(l))
-
- mean = total/N
-
- # Subtract the mean from each data item and square the difference.
- # Sum all the squared deviations.
- for i in l:
- x += (i - mean)**2.0
-
- try:
- if False:
- # Divide sum of squares by N-1 (sample variance).
- variance = x / (N - 1)
- else:
- # Divide sum of squares by N (population variance).
- variance = x / N
- except ZeroDivisionError:
- variance = 0
- stddev = math.sqrt(variance)
-
- return stddev
- class BandwidthManager(object):
-
- def __init__(self, external_add_task, config, set_option,
- get_remote_endpoints, get_rates):
- if debug:
- if config['bandwidth_management']:
- print "bandwidth management is up."
- else:
- print "!@#!@#!@#!@#!@# bandwidth management is down."
- self.external_add_task = external_add_task
- self.config = config
- self.set_option = set_option
- self.get_rates = get_rates
- # Next few lines were added by David Harrison to use RTTMonitor2
- #if os.name == 'nt':
- # icmp_impl = Win32Icmp()
- #elif os.name == 'posix':
- # icmp_impl = UnixIcmp(external_add_task, config['xicmp_port'])
- def got_new_rtt(rtt):
- self.external_add_task(0, self._inspect_rates, rtt)
- #self.rttmonitor = RTTMonitor(got_new_rtt, icmp_impl)
- self.rttmonitor = RTTMonitor(got_new_rtt)
- self.nodefeeder = NodeFeeder(external_add_task=external_add_task,
- get_remote_endpoints=get_remote_endpoints,
- rttmonitor=self.rttmonitor)
- self.start_time = None
- self.max_samples = 10 # hmm...
- self.u = SizedList(self.max_samples)
- self.d = SizedList(self.max_samples)
- self.t = SizedList(self.max_samples * 2)
- self.ur = SizedList(self.max_samples)
- self.dr = SizedList(self.max_samples)
- self.current_std = 0.001
- self.max_std = 0.001
- self.last_max = bttime()
- self.max_rates = {}
- self.max_rates["upload"] = 1.0
- self.max_rates["download"] = 1.0
- self.max_p = 1.0
- self.min_p = 2**500
- self.mid_p = ((self.max_p - self.min_p) / 2.0) + self.min_p
- self.old_p = None
- # I pulled these numbers out of my ass.
- if stats:
- tmp_dir = platform.get_temp_dir()
- timestr = "%d_%d_%d_%d_%d" % time.localtime()[1:6]
- stats_dir = os.path.join( tmp_dir, "bittorrent%s_%d" %
- (timestr, os.getpid()) )
- os.mkdir(stats_dir)
- if debug: print "BandwidthManager: stats_dir = %s" % stats_dir
- rate_vs_time = os.path.join( stats_dir, "rate_vs_time.plotdata" )
- self.rfp = open( rate_vs_time, "w" )
- delay_vs_time = os.path.join( stats_dir, "delay_vs_time.plotdata" )
- self.dfp = open( delay_vs_time, "w" )
- sdev_vs_time = os.path.join( stats_dir,
- "stddev_vs_time.plotdata" )
- self.sdevfp = open( sdev_vs_time, "w" )
- def _method_1(self, type, t, c, old_c, rate):
- # This concept is:
- # if the correlation is high and the latency is high
- # then lower the bandwidth limit.
- # otherwise, raise it.
- if ((c > 0.96) and (t > 100)):
- rate /= 2.0
- if debug:
- print type, "down to", rate
- else:
- rate += 500 # hmm
- if debug:
- print type, "up to", rate
- return rate
-
- def _method_2(self, type, t, c, old_c, rate):
- # This concept is:
- # if the correlation is low and the latency is high, lower the limit
- # otherwise raise it
- if ((c < 0.60) and (t > 100)):
- rate /= 2.0
- if debug:
- print type, "down to", rate
- else:
- rate += 500 # hmm
- if debug:
- print type, "up to", rate
- return rate
- def _method_vegasish(self, type, t, c, old_c, rate):
- middle_rtt = ((self.rttmonitor.get_min_rtt() +
- self.rttmonitor.get_max_rtt()) / 2.0)
- if t > middle_rtt:
- rate *= 1.0/8.0
- if debug:
- print type, "down to", rate
- else:
- rate += 1000 # hmm
- if debug:
- print type, "up to", rate
- return rate
- def _method_vegas_greg(self, type, t, c, old_c, rate):
- middle_rtt = ((self.rttmonitor.get_min_rtt() +
- self.rttmonitor.get_max_rtt()) / 2.0)
- if t > middle_rtt and c < 0.5:
- rate *= 1.0/8.0
- if debug:
- print type, "down to", rate
- else:
- rate += 1000 # hmm
- if debug:
- print type, "up to", rate
- return rate
- def _method_ratio(self, type, t, p, min_p, max_p, rate):
- ratio = p / max_p
- if debug:
- print "RATIO", ratio
- if ratio < 0.5:
- rate = ratio * self.max_rates[type]
- if debug:
- print type.upper(), "SET to", rate
- else:
- rate += rate * (ratio/10.0) # hmm
- if debug:
- print type.upper(), "UP to", rate
-
- return max(rate, 1000)
- def _method_stddev(self, type, std, max_std, rate):
- top = 0.80 # FUDGE
- if std > (max_std * top):
- center = 1.0 + top - ((1.0 - top) * 0.5)
- s = min(std/max(0.0001, max_std), center)
- s = center - s
- rate *= s
- if debug:
- print type.upper(), "DOWN *", s, "to", rate
- else:
- s = 1000.0 # FUDGE
- s *= min(max_std/max(0.0001, std), 4) / 4.0
- s = int(s)
- rate += s
- if debug:
- print type.upper(), "UP +", s, "to", rate
- return max(rate, 1000) # FUDGE
-
-
- def _affect_rate(self, type, std, max_std, rate, set):
- rate = self._method_stddev(type, std, max_std, rate)
- rock_bottom = False
- if rate <= 4096:
- if debug:
- print "Rock bottom"
- rock_bottom = True
- rate = 4096
-
- set(int(rate))
- if stats:
- print "BandwidthManager._affect_rate(%f)" % rate
- self.rfp.write( "%d %d" % (bttime(),int(rate)) )
- self.sdevfp.write( "%d %f" % (bttime(), std ) )
- return rock_bottom
-
- def _inspect_rates(self, t = None):
- if t == None:
- t = self.rttmonitor.get_current_rtt()
- if t == None:
- # this makes timeouts reduce the maximum std deviation
- self.max_std *= 0.80 # FUDGE
- return
- if self.start_time == None:
- self.start_time = bttime()
- if debug:
- print "BandwidthManager._inspect_rates rtt: %d" % t
- if stats:
- self.dfp.write( "%d %d" % (bttime(),t) )
- def set_if_enabled(option, value):
- if not self.config['bandwidth_management']:
- return
- if debug:
- print "Setting %s to: %s" % (option, value)
- self.set_option(option, value)
- # TODO: slow start should be smarter than this
- if self.start_time + 20 > bttime():
- if debug:
- print 'SLOW START', fp(self.start_time + 20), fp(bttime())
- set_if_enabled('max_upload_rate', 10000000)
- set_if_enabled('max_download_rate', 10000000)
- if t < 3:
- # I simply don't believe you. Go away.
- return
- tup = self.get_rates()
- if tup == None:
- return
- u, d = tup
- #print "udt", u, d, t
- #print "uprate, downrate=", u, d
- self.u.append(u)
- self.d.append(d)
- self.t.append(t)
- self.ur.append(self.config['max_upload_rate'])
- self.dr.append(self.config['max_download_rate'])
- #s = time.time()
- #cu = correlation(self.u, self.t)
- #cd = correlation(self.d, self.t)
- #cur = correlation(self.u, self.ur)
- #cdr = correlation(self.d, self.dr)
- #e = time.time()
- self.current_std = standard_deviation(self.t)
-
- pu = ratio_sum_lists(self.u, self.t)
- pd = ratio_sum_lists(self.d, self.t)
- if len(self.u) > 2:
- lp = [ x/y for x, y in itertools.izip(self.u, self.t) ]
- min_pu = min(*lp)
- max_pu = max(*lp)
- else:
- min_pu = u / t
- max_pu = u / t
- pu = u / t
- self.max_rates["upload"] = max(max(self.u), self.max_rates["upload"])
- self.max_rates["download"] = max(max(self.d), self.max_rates["download"])
- if debug:
- print 'urate:', fp(u), 'umax:', self.config['max_upload_rate'], \
- 'maxstd:', fp(self.max_std), 'std:', fp(self.current_std), \
- 'pu:', fp(pu), 'pd:', fp(pd)
-
- rb = self._affect_rate("upload", self.current_std, self.max_std,
- self.config['max_upload_rate'],
- lambda r : set_if_enabled('max_upload_rate', r))
- # don't adjust download rate, it's not nearly correlated enough
- ## if rb:
- ## v = int(self.config['max_download_rate'] * 0.90) # FUDGE
- ## v = max(v, 2000) # FUDGE
- ## set_if_enabled('max_download_rate', v)
- ## else:
- ## v = int(self.config['max_download_rate'] + 6000) # FUDGE
- ## set_if_enabled('max_download_rate', v)
- ## if debug:
- ## print "DOWNLOAD SET to", v
-
- #self._affect_rate("download", t, cd, self.last_cd, pd,
- # self.config['max_download_rate'],
- # lambda r : self.set_option('max_download_rate', r))
- if self.current_std > self.max_std:
- self.max_std = self.current_std
- self.last_max = bttime()
- elif bttime() - self.last_max > 10:
- # decay old maximums, to recover from flakey connections
- self.max_std *= 0.90 # FUDGE
- self.last_max = bttime()
- #self.last_cu = cu
- #self.last_cd = cd
- # we're re-called by the pinging thing
- #self.external_add_task(0.1, self._inspect_rates)
-
-
|