1
0

DownloadRateLimiter.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Written by David Harrison
  2. from BTL.platform import bttime as time
  3. from BTL.rand_tools import iter_rand_pos
  4. import logging
  5. logger = logging.getLogger( "DownloadRateLimiter" )
  6. log = logger.debug
  7. from twisted.internet import task
  8. class ThrottleListener(object):
  9. def throttle_connections(self):
  10. pass
  11. def unthrottle_connections(self):
  12. pass
  13. class DownloadRateLimiter( object ):
  14. """DownloadRateLimiter implements a leaky bucket.
  15. """
  16. def __init__(self, interval, max_download_rate ):
  17. """@param add_task: called to schedule periodic rate adjustements.
  18. @param interval: time between rate adjustments.
  19. @param max_download_rate: in Bytes per second.
  20. """
  21. assert type(interval) in (int,float,long) and interval > 0.0
  22. assert type(max_download_rate) in (int, float, long) and \
  23. max_download_rate > 0.0
  24. self._interval = interval
  25. self._max_download_rate = max_download_rate
  26. self._throttle_listeners = set()
  27. self._token_bucket = 0 # number of bytes that can be sent.
  28. self._prev_time = None
  29. token_size = max_download_rate * interval
  30. self._max_token_bytes = 2 * token_size # > 1.*token_size ensures enough for
  31. # continuous transmission except for really
  32. # bursty sources.
  33. # start update interval timer.
  34. self._timer = task.LoopingCall(self.end_of_interval)
  35. self._timer.start(interval)
  36. def set_parameters( self, max_download_rate ): # more parameters?
  37. self._max_download_rate = max_download_rate
  38. token_size = max_download_rate * self._interval
  39. self._max_token_bytes = 2 * token_size
  40. def add_throttle_listener( self, listener ):
  41. self._throttle_listeners.add(listener)
  42. def remove_throttle_listener( self, listener ):
  43. if listener in self._throttle_listeners:
  44. self._throttle_listeners.remove(listener)
  45. def throttle(self):
  46. #log( "throttle" )
  47. for l in iter_rand_pos(self._throttle_listeners):
  48. l.throttle_connections()
  49. def unthrottle(self):
  50. #log( "unthrottle" )
  51. for l in iter_rand_pos(self._throttle_listeners):
  52. l.unthrottle_connections()
  53. # arg. resume actually flushes the buffers in iocpreactor, so we
  54. # have to check the state constantly
  55. if self._token_bucket <= 0:
  56. break
  57. def update_rate(self, bytes):
  58. #log( "data_came_in: bytes=%d, token_bucket=%d" %
  59. # (bytes,self._token_bucket))
  60. old = self._token_bucket
  61. self._token_bucket -= bytes
  62. # Here we throttle the connections whenver the token bucket
  63. # becomes less than empty.
  64. if self._token_bucket - bytes <= 0 and old > 0:
  65. self.throttle()
  66. def end_of_interval(self):
  67. # Note: at low bitrates it is possible and correct for the
  68. # token bucket to have significantly negative values. At low
  69. # bit rates, the interval length can be on the same order
  70. # or even larger than the packet burst interarrival times, the token
  71. # size becomes smaller than zero due to a burst of packet
  72. # arrivals. This is okay. The observed rate will sawtooth around
  73. # the correct rate.
  74. # compute token size based on the time that really elapsed.
  75. now = time()
  76. if self._prev_time is None:
  77. token = int(self._max_download_rate*self._interval)
  78. else:
  79. token = int(self._max_download_rate*(now-self._prev_time))
  80. self._prev_time = time()
  81. # update token bucket.
  82. self._token_bucket += token
  83. if self._token_bucket > self._max_token_bytes:
  84. self._token_bucket = self._max_token_bytes
  85. # if token bucket not in deficit then safe to begin sending requests.
  86. if self._token_bucket > 0:
  87. self.unthrottle()
  88. else:
  89. self.throttle()