| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- # This was built to be like SFQ but turned out like round-robin.
- # (Why didn't you just use Deficit Round Robin?) --Dave
- # (Because of unitsize) --Greg
- #
- # I call it Heirarchical Round Robin Bucket Percentage Style
- #
- # by Greg Hazel
- import time
- import traceback
- from BTL.platform import bttime
- from BTL.DictWithLists import OrderedDictWithLists
- # these are for logging and such
- class GlobalRate(object):
- def __init__(self):
- self.total = 0.0
- self.start_time = bttime()
- self.last_time = self.start_time
- def print_rate(self, size):
- self.total += size
- this_time = bttime()
- start_delta = this_time - self.start_time
- this_delta = this_time - self.last_time
- if start_delta > 0 and this_delta > 0:
- print "UPLOAD: This:", size / this_delta, "Total:", self.total / start_delta
- self.last_time = this_time
- global_rate = GlobalRate()
- # very simple.
- # every call gives you the duration since the last call in tokens
- class DeltaTokens(object):
- def __init__(self, rate):
- self.set_rate(rate)
- def set_rate(self, rate):
- self.rate = rate
- # clear the history since the rate has changed and it could be way off
- self.last_time = bttime()
- # return the number of tokens you can have since the last call
- def __call__(self):
- new_time = bttime()
- delta_time = new_time - self.last_time
- # if last time was more than a second ago, we can't give a clear
- # approximation since rate is in tokens per second.
- delta_time = min(delta_time, 1.0)
- if delta_time <= 0:
- return 0
- tokens = self.rate * delta_time
- self.last_time = new_time
- return tokens
- # allows you to subtract tokens from DeltaTokens to compensate
- def remove_tokens(self, x):
- if self.rate == 0:
- # shit, I don't know.
- self.last_time += x
- else:
- self.last_time += x / self.rate
- # returns the time until you'll get tokens again
- def get_remaining_time(self):
- return max(0, self.last_time - bttime())
- class Classifer(object):
- def __init__(self):
- self.channels = OrderedDictWithLists()
- def add_data(self, keyable, func):
- # hmm, this should rotate every 10 seconds or so, but moving over the
- # old data is hard (can't write out-of-order)
- #key = sha.sha(id(o)).hexdigest()[0]
- # this is technically round-robin
- key = keyable
- self.channels.push_to_row(key, func)
- def rem_data(self, key):
- try:
- l = self.channels.poprow(key)
- l.clear()
- except KeyError:
- pass
- def rotate_data(self):
- # the removes the top-most row from the ordereddict
- k = self.channels.iterkeys().next()
- l = self.channels.poprow(k)
-
- data = l.popleft()
- # this puts the whole row at the bottom of the ordereddict
- self.channels.setrow(k, l)
-
- return data
-
- def __len__(self):
- return len(self.channels)
- class Scheduler(object):
- def __init__(self, rate, add_task):
- """@param rate: rate at which 'tokens' are generated.
- @param add_task: callback to schedule an event.
- """
- self.add_task = add_task
- self.classifier = Classifer()
- self.delta_tokens = DeltaTokens(rate)
- self.task = None
- self.children = {}
- def set_rate(self, rate, cascade=True):
- self.delta_tokens.set_rate(rate)
- if cascade:
- for child, scale in self.children.iteritems():
- child.set_rate(rate * scale)
- # the rate changed, so it's possible the loop is
- # running slower than it needs to
- self.restart_loop(0)
- def add_child(self, child, scale):
- self.children[child] = scale
- child.set_rate(self.delta_tokens.rate * scale)
-
- def remove_child(self, child):
- del self.children[child]
- def add_data(self, keyable, func):
- self.classifier.add_data(keyable, func)
- # kick off a loop since we have data now
- self.restart_loop(0)
- def restart_loop(self, t):
- # check for pending loop event
- if self.task and not self.task.called:
- ## look at when it's scheduled to occur
- # we can special case events which have a delta of 0, since they
- # should occur asap. no need to check the time.
- if self.task.delta == 0:
- return
- # use time.time since twisted does anyway
- s = self.task.getTime() - time.time()
- if s > t:
- # if it would occur after the time we want, reset it
- self.task.reset(t)
- self.task.delta = t
- else:
- if t == 0:
- # don't spin the event loop needlessly
- self.run()
- else:
- self.task = self.add_task(t, self.run)
- self.task.delta = t
- def _write(self, to_write):
- amount = 0
- each = min(self.delta_tokens.rate, self.unitsize)
- if self.children:
- for child, scale in self.children.iteritems():
- child.set_rate(self.delta_tokens.rate * scale, cascade=False)
- i = 0
- while amount < to_write and len(self.classifier) > 0:
- (func, args) = self.classifier.rotate_data()
- # ERROR: func can fill buffers, so use the on_flush technique
- try:
- amount += func(each)
- except:
- # don't stop the loop if we hit an error
- traceback.print_exc()
- i += 1
- if i == len(self.children):
- break
-
- for child, scale in self.children.iteritems():
- # really max, but we happen to know it can't exceed amount
- child.set_rate(amount, cascade=False)
- while amount < to_write and len(self.classifier) > 0:
- func = self.classifier.rotate_data()
- # ERROR: func can fill buffers, so use the on_flush technique
- try:
- amount += func(each)
- except:
- # don't stop the loop if we hit an error
- traceback.print_exc()
-
- return amount
- def _run_once(self):
- f_to_write = self.delta_tokens()
- to_write = int(f_to_write)
- if to_write == 0:
- written = 0
- else:
- written = self._write(to_write)
- # for debugging
- #print "Ideal:", self.delta_tokens.rate, f_to_write
- #global_rate.print_rate(written)
- self.delta_tokens.remove_tokens(written - f_to_write)
- return written
- def run(self):
- t = 0
- while t == 0:
- if len(self.classifier) == 0:
- return
- self._run_once()
- t = self.delta_tokens.get_remaining_time()
- self.restart_loop(t)
-
- # made to look like the original
- class MultiRateLimiter(Scheduler):
- # Since data is sent to peers in a round-robin fashion, max one
- # full request at a time, setting this higher would send more data
- # to peers that use request sizes larger than standard 16 KiB.
- # 17000 instead of 16384 to allow room for metadata messages.
- max_unitsize = 17000
-
- def __init__(self, sched, parent=None):
- Scheduler.__init__(self, rate = 0, add_task = sched)
- if parent == None:
- self.run()
- def set_parameters(self, rate, unitsize=2**500):
- self.set_rate(rate)
- unitsize = min(unitsize, self.max_unitsize)
- self.unitsize = unitsize
- def queue(self, conn):
- keyable = conn
- self.add_data(keyable, conn.send_partial)
-
- def dequeue(self, keyable):
- self.classifier.rem_data(keyable)
- def increase_offset(self, bytes):
- # hackity hack hack
- self.delta_tokens.remove_tokens(0 - bytes)
- class FakeConnection(object):
- def __init__(self, gr):
- self.gr = gr
- def _use_length_(self, length):
- def do():
- return length
- return self.write(do)
- def write(self, fn, *args):
- size = fn(*args)
- self.gr.print_rate(size)
- return size
-
- if __name__ == '__main__':
- profile = True
- try:
- from BTL.profile import Profiler, Stats
- prof_file_name = 'NewRateLimiter.prof'
- except ImportError, e:
- print "profiling not available:", e
- profile = False
- import os
- import random
- from RawServer_twisted import RawServer
- from twisted.internet import task
- from BTL.defer import DeferredEvent
- rawserver = RawServer()
- s = Scheduler(4096, add_task = rawserver.add_task)
- s.unitsize = 17000
- a = []
- for i in xrange(500):
- keyable = FakeConnection(global_rate)
- a.append(keyable)
- freq = 0.01
- def push():
- if random.randint(0, 5 / freq) == 0:
- rate = random.randint(1, 100) * 1000
- print "new rate", rate
- s.set_rate(rate)
- for c in a:
- s.add_data(c, c._use_length_)
-
- t = task.LoopingCall(push)
- t.start(freq)
-
- ## m = MultiRateLimiter(sched=rawserver.add_task)
- ## m.set_parameters(120000000)
- ## class C(object):
- ## def send_partial(self, size):
- ## global_rate.print_rate(size)
- ## rawserver.add_task(0, m.queue, self)
- ## return size
- ##
- ## m.queue(C())
- if profile:
- try:
- os.unlink(prof_file_name)
- except:
- pass
- prof = Profiler()
- prof.enable()
- rawserver.listen_forever()
- if profile:
- prof.disable()
- st = Stats(prof.getstats())
- st.sort()
- f = open(prof_file_name, 'wb')
- st.dump(file=f)
|