RateLimiter.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Uoti Urpala and Andrew Loewenstern
  11. from BTL.platform import bttime
  12. def minctx(a,b):
  13. A = B = 0
  14. if a.rate > 0:
  15. A = a.offset_amount / a.rate
  16. if b.rate > 0:
  17. B = b.offset_amount / b.rate
  18. if A <= B:
  19. return a
  20. return b
  21. class Dummy(object):
  22. def __init__(self, next):
  23. self.next_upload = next
  24. def send_partial(self, size):
  25. return 0
  26. closed = False
  27. class RateLimitedGroup(object):
  28. def __init__(self, rate, got_exception):
  29. self.got_exception = got_exception
  30. # limiting
  31. self.check_time = 0
  32. self.lasttime = bttime()
  33. self.offset_amount = 0
  34. self.set_rate(rate)
  35. # accounting
  36. self.count = 0
  37. self.counts = []
  38. def set_rate(self, new_rate):
  39. self.rate = new_rate
  40. self.check_time = 0
  41. self.offset_amount = 0
  42. class MultiRateLimiter(object):
  43. def __init__(self, sched):
  44. self.sched = sched
  45. self.last = None
  46. self.upload_rate = 0
  47. self.unitsize = 17000
  48. self.offset_amount = 0
  49. self.ctxs = [] # list of contexts with connections in the queue
  50. self.ctx_counts = {} # dict conn -> how many connections each context has
  51. def set_parameters(self, rate, unitsize):
  52. if unitsize > 17000:
  53. # Since data is sent to peers in a round-robin fashion, max one
  54. # full request at a time, setting this higher would send more data
  55. # to peers that use request sizes larger than standard 16 KiB.
  56. # 17000 instead of 16384 to allow room for metadata messages.
  57. unitsize = 17000
  58. self.upload_rate = rate
  59. self.unitsize = unitsize
  60. self.lasttime = bttime()
  61. self.offset_amount = 0
  62. def queue(self, conn, ctx):
  63. assert conn.next_upload is None
  64. if ctx not in self.ctxs:
  65. ctx.check_time = 1
  66. self.ctxs.append(ctx)
  67. self.ctx_counts[ctx] = 1
  68. else:
  69. self.ctx_counts[ctx] += 1
  70. if self.last is None:
  71. self.last = conn
  72. conn.next_upload = conn
  73. self.try_send(True)
  74. else:
  75. conn.next_upload = self.last.next_upload
  76. self.last.next_upload = conn
  77. self.last = conn
  78. def increase_offset(self, bytes):
  79. self.offset_amount += bytes
  80. def try_send(self, check_time = False):
  81. t = bttime()
  82. cur = self.last.next_upload
  83. if self.upload_rate > 0:
  84. self.offset_amount -= (t - self.lasttime) * self.upload_rate
  85. if check_time:
  86. self.offset_amount = max(self.offset_amount, -1 * self.unitsize)
  87. else:
  88. self.offset_amount = 0
  89. self.lasttime = t
  90. for ctx in self.ctxs:
  91. if ctx.rate == 0:
  92. ctx.offset_amount = 0
  93. ctx.lasttime = t
  94. elif ctx.lasttime != t:
  95. ctx.offset_amount -=(t - ctx.lasttime) * ctx.rate
  96. ctx.lasttime = t
  97. if ctx.check_time:
  98. ctx.offset_amount = max(ctx.offset_amount, -1 * self.unitsize)
  99. min_offset = reduce(minctx, self.ctxs)
  100. ctx = cur.connection_manager.context.rlgroup
  101. while self.offset_amount <= 0 and min_offset.offset_amount <= 0:
  102. if ctx.offset_amount <= 0:
  103. try:
  104. bytes = cur.send_partial(self.unitsize)
  105. except KeyboardInterrupt:
  106. raise
  107. except Exception, e:
  108. cur.connection_manager.context.rlgroup.got_exception(e)
  109. cur = self.last.next_upload
  110. bytes = 0
  111. if self.upload_rate > 0:
  112. self.offset_amount += bytes
  113. if ctx.rate > 0:
  114. ctx.offset_amount += bytes
  115. ctx.count += bytes
  116. if bytes == 0 or not cur.connection.is_flushed():
  117. if self.last is cur:
  118. self.last = None
  119. cur.next_upload = None
  120. self.ctx_counts = {}
  121. self.ctxs = []
  122. break
  123. else:
  124. self.last.next_upload = cur.next_upload
  125. cur.next_upload = None
  126. old = ctx
  127. cur = self.last.next_upload
  128. ctx = cur.connection_manager.context.rlgroup
  129. self.ctx_counts[old] -= 1
  130. if self.ctx_counts[old] == 0:
  131. del(self.ctx_counts[old])
  132. self.ctxs.remove(old)
  133. if min_offset == old:
  134. min_offset = reduce(minctx, self.ctxs)
  135. else:
  136. if ctx == min_offset:
  137. min_offset = reduce(minctx, self.ctxs)
  138. self.last = cur
  139. cur = cur.next_upload
  140. ctx = cur.connection_manager.context.rlgroup
  141. else:
  142. self.last = cur
  143. cur = self.last.next_upload
  144. ctx = cur.connection_manager.context.rlgroup
  145. else:
  146. myDelay = minCtxDelay = 0
  147. if self.upload_rate > 0:
  148. myDelay = 1.0 * self.offset_amount / self.upload_rate
  149. if min_offset.rate > 0:
  150. minCtxDelay = 1.0 * min_offset.offset_amount / min_offset.rate
  151. delay = max(myDelay, minCtxDelay)
  152. self.sched(self.try_send, delay)
  153. def clean_closed(self):
  154. if self.last is None:
  155. return
  156. orig = self.last
  157. if self.last.closed:
  158. self.last = Dummy(self.last.next_upload)
  159. self.last.connection_manager = orig.connection_manager
  160. c = self.last
  161. while True:
  162. if c.next_upload is orig:
  163. c.next_upload = self.last
  164. break
  165. if c.next_upload.closed:
  166. o = c.next_upload
  167. c.next_upload = Dummy(c.next_upload.next_upload)
  168. c.next_upload.connection_manager = o.connection_manager
  169. c = c.next_upload