RequestManager.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # by Greg Hazel
  2. from BTL.sparse_set import SparseSet
  3. class RequestManager(object):
  4. def __init__(self, request_size, piece_size, numpieces, total_length):
  5. self.request_size = request_size
  6. self.piece_size = piece_size
  7. self.numpieces = numpieces
  8. # hmm
  9. self.total_length = total_length
  10. self.amount_inactive = total_length
  11. self.endgame = False
  12. # If chunks have been requested then _inactive_requests has a list
  13. # of the unrequested chunks. Otherwise the piece is not in the dict.
  14. self.inactive_requests = {}
  15. # If chunks have been requested then active_requests has a list
  16. # of the unrequested chunks which are pending on the network.
  17. # Otherwise the piece is not in the dict.
  18. self.active_requests = {}
  19. # If all chunks have been requested and the piece is not written
  20. # to the disk, it is in this set. Equivalent to an empty list in
  21. # _inactive_requests.
  22. self.fully_active = set()
  23. def set_storage(self, storage):
  24. self.storage = storage
  25. def want_requests(self, index):
  26. # blah, this is dumb.
  27. if self.storage.have[index]:
  28. return False
  29. assert ((index in self.inactive_requests and
  30. len(self.inactive_requests[index]) == 0) ==
  31. (index in self.fully_active))
  32. if index in self.fully_active:
  33. return False
  34. return True
  35. def iter_want(self):
  36. for index in self.storage.have_set.iterneg(0, self.numpieces):
  37. assert ((index in self.inactive_requests and
  38. len(self.inactive_requests[index]) == 0) ==
  39. (index in self.fully_active))
  40. if index in self.fully_active:
  41. continue
  42. yield index
  43. def _break_up(self, begin, length):
  44. l = []
  45. x = 0
  46. request_size = self.request_size
  47. while x + request_size < length:
  48. l.append((begin + x, request_size))
  49. x += request_size
  50. l.append((begin + x, length - x))
  51. return l
  52. def _piecelen(self, piece):
  53. if piece < self.numpieces - 1:
  54. return self.piece_size
  55. else:
  56. return self.total_length - piece * self.piece_size
  57. def _make_inactive(self, index):
  58. self.inactive_requests[index] = self._break_up(0, self._piecelen(index))
  59. self.active_requests[index] = []
  60. def new_request(self, index, full=False):
  61. # returns (begin, length)
  62. if index not in self.inactive_requests:
  63. self._make_inactive(index)
  64. rs = self.inactive_requests[index]
  65. if full:
  66. s = SparseSet()
  67. while rs:
  68. r = rs.pop()
  69. s.add(r[0], r[0] + r[1])
  70. b, e = s.largest_range()
  71. s.remove(b, e)
  72. reqs = self._break_up(b, e - b)
  73. for r in reqs:
  74. assert r[1] <= self.request_size
  75. self.active_requests[index].append(r)
  76. # I don't like this. the function should return reqs
  77. r = (b, e - b)
  78. for b, e in s.iterrange():
  79. rs.extend(self._break_up(b, e - b))
  80. else:
  81. # why min? do we want all the blocks in order?
  82. r = min(rs)
  83. rs.remove(r)
  84. assert r[1] <= self.request_size
  85. self.active_requests[index].append(r)
  86. self.amount_inactive -= r[1]
  87. assert self.amount_inactive >= 0, ('Amount inactive: %d' %
  88. self.amount_inactive)
  89. if len(rs) == 0:
  90. self.fully_active.add(index)
  91. if self.amount_inactive == 0:
  92. self.endgame = True
  93. assert (r[0] + r[1]) <= self._piecelen(index)
  94. return r
  95. def request_lost(self, index, begin, length):
  96. if len(self.inactive_requests[index]) == 0:
  97. self.fully_active.remove(index)
  98. self.amount_inactive += length
  99. r = (begin, length)
  100. self.active_requests[index].remove(r)
  101. self.inactive_requests[index].extend(self._break_up(*r))
  102. def request_received(self, index, begin, length):
  103. self.active_requests[index].remove((begin, length))
  104. def add_inactive(self, index, request_list):
  105. assert index not in self.inactive_requests
  106. assert index not in self.active_requests
  107. a = []
  108. for r in request_list:
  109. a.extend(self._break_up(*r))
  110. # amount_inactive does not include partials we've written to disk
  111. t = self._piecelen(index)
  112. for b, l in a:
  113. t -= l
  114. self.amount_inactive -= t
  115. self.inactive_requests[index] = a
  116. self.active_requests[index] = []
  117. def get_unwritten_requests(self):
  118. # collapse inactive and active requests into one set
  119. unwritten = {}
  120. for k, v in self.inactive_requests.iteritems():
  121. if v:
  122. unwritten.setdefault(k, []).extend(v)
  123. for k, v in self.active_requests.iteritems():
  124. if v:
  125. unwritten.setdefault(k, []).extend(v)
  126. return unwritten
  127. def is_piece_received(self, index):
  128. # hm.
  129. return (not self.want_requests(index) and
  130. len(self.active_requests[index]) == 0)
  131. def piece_finished(self, index):
  132. del self.inactive_requests[index]
  133. del self.active_requests[index]
  134. self.fully_active.remove(index)