RateLimiter.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # Written by Bram Cohen
  2. # see LICENSE.txt for license information
  3. from traceback import print_exc
  4. from binascii import b2a_hex
  5. from clock import clock
  6. from CurrentRateMeasure import Measure
  7. from cStringIO import StringIO
  8. from math import sqrt
  9. try:
  10. True
  11. except:
  12. True = 1
  13. False = 0
  14. try:
  15. sum([1])
  16. except:
  17. sum = lambda a: reduce(lambda x,y: x+y, a, 0)
  18. DEBUG = False
  19. MAX_RATE_PERIOD = 20.0
  20. MAX_RATE = 10e10
  21. PING_BOUNDARY = 1.2
  22. PING_SAMPLES = 7
  23. PING_DISCARDS = 1
  24. PING_THRESHHOLD = 5
  25. PING_DELAY = 5 # cycles 'til first upward adjustment
  26. PING_DELAY_NEXT = 2 # 'til next
  27. ADJUST_UP = 1.05
  28. ADJUST_DOWN = 0.95
  29. UP_DELAY_FIRST = 5
  30. UP_DELAY_NEXT = 2
  31. SLOTS_STARTING = 6
  32. SLOTS_FACTOR = 1.66/1000
  33. class RateLimiter:
  34. def __init__(self, sched, unitsize, slotsfunc = lambda x: None):
  35. self.sched = sched
  36. self.last = None
  37. self.unitsize = unitsize
  38. self.slotsfunc = slotsfunc
  39. self.measure = Measure(MAX_RATE_PERIOD)
  40. self.autoadjust = False
  41. self.upload_rate = MAX_RATE * 1000
  42. self.slots = SLOTS_STARTING # garbage if not automatic
  43. def set_upload_rate(self, rate):
  44. # rate = -1 # test automatic
  45. if rate < 0:
  46. if self.autoadjust:
  47. return
  48. self.autoadjust = True
  49. self.autoadjustup = 0
  50. self.pings = []
  51. rate = MAX_RATE
  52. self.slots = SLOTS_STARTING
  53. self.slotsfunc(self.slots)
  54. else:
  55. self.autoadjust = False
  56. if not rate:
  57. rate = MAX_RATE
  58. self.upload_rate = rate * 1000
  59. self.lasttime = clock()
  60. self.bytes_sent = 0
  61. def queue(self, conn):
  62. assert conn.next_upload is None
  63. if self.last is None:
  64. self.last = conn
  65. conn.next_upload = conn
  66. self.try_send(True)
  67. else:
  68. conn.next_upload = self.last.next_upload
  69. self.last.next_upload = conn
  70. self.last = conn
  71. def try_send(self, check_time = False):
  72. t = clock()
  73. self.bytes_sent -= (t - self.lasttime) * self.upload_rate
  74. self.lasttime = t
  75. if check_time:
  76. self.bytes_sent = max(self.bytes_sent, 0)
  77. cur = self.last.next_upload
  78. while self.bytes_sent <= 0:
  79. bytes = cur.send_partial(self.unitsize)
  80. self.bytes_sent += bytes
  81. self.measure.update_rate(bytes)
  82. if bytes == 0 or cur.backlogged():
  83. if self.last is cur:
  84. self.last = None
  85. cur.next_upload = None
  86. break
  87. else:
  88. self.last.next_upload = cur.next_upload
  89. cur.next_upload = None
  90. cur = self.last.next_upload
  91. else:
  92. self.last = cur
  93. cur = cur.next_upload
  94. else:
  95. self.sched(self.try_send, self.bytes_sent / self.upload_rate)
  96. def adjust_sent(self, bytes):
  97. self.bytes_sent = min(self.bytes_sent+bytes, self.upload_rate*3)
  98. self.measure.update_rate(bytes)
  99. def ping(self, delay):
  100. if DEBUG:
  101. print delay
  102. if not self.autoadjust:
  103. return
  104. self.pings.append(delay > PING_BOUNDARY)
  105. if len(self.pings) < PING_SAMPLES+PING_DISCARDS:
  106. return
  107. if DEBUG:
  108. print 'cycle'
  109. pings = sum(self.pings[PING_DISCARDS:])
  110. del self.pings[:]
  111. if pings >= PING_THRESHHOLD: # assume flooded
  112. if self.upload_rate == MAX_RATE:
  113. self.upload_rate = self.measure.get_rate()*ADJUST_DOWN
  114. else:
  115. self.upload_rate = min(self.upload_rate,
  116. self.measure.get_rate()*1.1)
  117. self.upload_rate = max(int(self.upload_rate*ADJUST_DOWN),2)
  118. self.slots = int(sqrt(self.upload_rate*SLOTS_FACTOR))
  119. self.slotsfunc(self.slots)
  120. if DEBUG:
  121. print 'adjust down to '+str(self.upload_rate)
  122. self.lasttime = clock()
  123. self.bytes_sent = 0
  124. self.autoadjustup = UP_DELAY_FIRST
  125. else: # not flooded
  126. if self.upload_rate == MAX_RATE:
  127. return
  128. self.autoadjustup -= 1
  129. if self.autoadjustup:
  130. return
  131. self.upload_rate = int(self.upload_rate*ADJUST_UP)
  132. self.slots = int(sqrt(self.upload_rate*SLOTS_FACTOR))
  133. self.slotsfunc(self.slots)
  134. if DEBUG:
  135. print 'adjust up to '+str(self.upload_rate)
  136. self.lasttime = clock()
  137. self.bytes_sent = 0
  138. self.autoadjustup = UP_DELAY_NEXT