Storage_threadpool.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Bram Cohen and Greg Hazel
  11. import os
  12. import sys
  13. import Queue
  14. from bisect import bisect_right
  15. from BTL.translation import _
  16. from BTL.obsoletepythonsupport import set
  17. from BitTorrent import BTFailure
  18. from BTL.defer import Deferred, ThreadedDeferred, Failure, wrap_task
  19. from BTL.yielddefer import launch_coroutine
  20. from BitTorrent.platform import get_allocated_regions
  21. from BTL.sparse_set import SparseSet
  22. from BTL.DictWithLists import DictWithLists, DictWithSets
  23. import BTL.stackthreading as threading
  24. from BitTorrent.Storage_base import open_sparse_file, make_file_sparse
  25. from BitTorrent.Storage_base import bad_libc_workaround, is_open_for_write
  26. from BitTorrent.Storage_base import UnregisteredFileException
  27. class FilePool(object):
  28. def __init__(self, doneflag, add_task, external_add_task,
  29. max_files_open, num_disk_threads):
  30. self.doneflag = doneflag
  31. self.external_add_task = external_add_task
  32. self.file_to_torrent = {}
  33. self.free_handle_condition = threading.Condition()
  34. self.active_file_to_handles = DictWithSets()
  35. self.open_file_to_handles = DictWithLists()
  36. self.set_max_files_open(max_files_open)
  37. self.diskq = Queue.Queue()
  38. for i in xrange(num_disk_threads):
  39. t = threading.Thread(target=self._disk_thread,
  40. name="disk_thread-%s" % (i+1))
  41. t.start()
  42. self.doneflag.addCallback(self.finalize)
  43. def finalize(self, r=None):
  44. # re-queue self so all threads die. we end up with one extra event on
  45. # the queue, but who cares.
  46. self._create_op(self.finalize)
  47. def close_all(self):
  48. failures = {}
  49. self.free_handle_condition.acquire()
  50. while self.get_open_file_count() > 0:
  51. while len(self.open_file_to_handles) > 0:
  52. filename, handle = self.open_file_to_handles.popitem()
  53. try:
  54. handle.close()
  55. except Exception, e:
  56. failures[self.file_to_torrent[filename]] = e
  57. self.free_handle_condition.notify()
  58. if self.get_open_file_count() > 0:
  59. self.free_handle_condition.wait(1)
  60. self.free_handle_condition.release()
  61. for torrent, e in failures.iteritems():
  62. torrent.got_exception(e)
  63. def close_files(self, file_set):
  64. failures = set()
  65. self.free_handle_condition.acquire()
  66. done = False
  67. while not done:
  68. filenames = list(self.open_file_to_handles.iterkeys())
  69. for filename in filenames:
  70. if filename not in file_set:
  71. continue
  72. handles = self.open_file_to_handles.poprow(filename)
  73. for handle in handles:
  74. try:
  75. handle.close()
  76. except Exception, e:
  77. failures.add(e)
  78. self.free_handle_condition.notify()
  79. done = True
  80. for filename in file_set.iterkeys():
  81. if filename in self.active_file_to_handles:
  82. done = False
  83. break
  84. if not done:
  85. self.free_handle_condition.wait(0.5)
  86. self.free_handle_condition.release()
  87. if len(failures) > 0:
  88. raise failures.pop()
  89. def set_max_files_open(self, max_files_open):
  90. if max_files_open <= 0:
  91. max_files_open = 1e100
  92. self.max_files_open = max_files_open
  93. self.close_all()
  94. def add_files(self, files, torrent):
  95. for filename in files:
  96. if filename in self.file_to_torrent:
  97. raise BTFailure(_("File %s belongs to another running torrent")
  98. % filename)
  99. for filename in files:
  100. self.file_to_torrent[filename] = torrent
  101. def remove_files(self, files):
  102. for filename in files:
  103. del self.file_to_torrent[filename]
  104. def _ensure_exists(self, filename, length=0):
  105. if not os.path.exists(filename):
  106. f = os.path.split(filename)[0]
  107. if f != '' and not os.path.exists(f):
  108. os.makedirs(f)
  109. f = file(filename, 'wb')
  110. make_file_sparse(filename, f, length)
  111. f.close()
  112. def get_open_file_count(self):
  113. t = self.open_file_to_handles.total_length()
  114. t += self.active_file_to_handles.total_length()
  115. return t
  116. def acquire_handle(self, filename, for_write, length=0):
  117. # this will block until a new file handle can be made
  118. self.free_handle_condition.acquire()
  119. if filename not in self.file_to_torrent:
  120. self.free_handle_condition.release()
  121. raise UnregisteredFileException()
  122. while self.active_file_to_handles.total_length() == self.max_files_open:
  123. self.free_handle_condition.wait()
  124. if filename in self.open_file_to_handles:
  125. handle = self.open_file_to_handles.pop_from_row(filename)
  126. if for_write and not is_open_for_write(handle.mode):
  127. handle.close()
  128. handle = open_sparse_file(filename, 'rb+', length=length)
  129. #elif not for_write and is_open_for_write(handle.mode):
  130. # handle.close()
  131. # handle = open_sparse_file(filename, 'rb', length=length)
  132. else:
  133. if self.get_open_file_count() == self.max_files_open:
  134. oldfname, oldhandle = self.open_file_to_handles.popitem()
  135. oldhandle.close()
  136. self._ensure_exists(filename, length)
  137. if for_write:
  138. handle = open_sparse_file(filename, 'rb+', length=length)
  139. else:
  140. handle = open_sparse_file(filename, 'rb', length=length)
  141. self.active_file_to_handles.push_to_row(filename, handle)
  142. self.free_handle_condition.release()
  143. return handle
  144. def release_handle(self, filename, handle):
  145. self.free_handle_condition.acquire()
  146. self.active_file_to_handles.remove_fom_row(filename, handle)
  147. self.open_file_to_handles.push_to_row(filename, handle)
  148. self.free_handle_condition.notify()
  149. self.free_handle_condition.release()
  150. def _create_op(self, _f, *args, **kwargs):
  151. df = Deferred()
  152. self.diskq.put((df, _f, args, kwargs))
  153. return df
  154. read = _create_op
  155. write = _create_op
  156. def _disk_thread(self):
  157. while not self.doneflag.isSet():
  158. df, func, args, kwargs = self.diskq.get(True)
  159. try:
  160. v = func(*args, **kwargs)
  161. except:
  162. self.external_add_task(0, df.errback, Failure())
  163. else:
  164. self.external_add_task(0, df.callback, v)
  165. class Storage(object):
  166. def __init__(self, config, filepool, save_path,
  167. files, add_task,
  168. external_add_task, doneflag):
  169. self.filepool = filepool
  170. self.config = config
  171. self.doneflag = doneflag
  172. self.add_task = add_task
  173. self.external_add_task = external_add_task
  174. self.initialize(save_path, files)
  175. def initialize(self, save_path, files):
  176. # a list of bytes ranges and filenames for window-based IO
  177. self.ranges = []
  178. # a dict of filename-to-ranges for piece priorities and filename lookup
  179. self.range_by_name = {}
  180. # a sparse set for smart allocation detection
  181. self.allocated_regions = SparseSet()
  182. # dict of filename-to-length on disk (for % complete in the file view)
  183. self.undownloaded = {}
  184. self.save_path = save_path
  185. # Rather implement this as an ugly hack here than change all the
  186. # individual calls. Affects all torrent instances using this module.
  187. if self.config['bad_libc_workaround']:
  188. bad_libc_workaround()
  189. self.initialized = False
  190. self.startup_df = ThreadedDeferred(wrap_task(self.external_add_task),
  191. self._build_file_structs,
  192. self.filepool, files)
  193. return self.startup_df
  194. def _build_file_structs(self, filepool, files):
  195. total = 0
  196. for filename, length in files:
  197. # we're shutting down, abort.
  198. if self.doneflag.isSet():
  199. return False
  200. self.undownloaded[filename] = length
  201. if length > 0:
  202. self.ranges.append((total, total + length, filename))
  203. self.range_by_name[filename] = (total, total + length)
  204. if os.path.exists(filename):
  205. if not os.path.isfile(filename):
  206. raise BTFailure(_("File %s already exists, but is not a "
  207. "regular file") % filename)
  208. l = os.path.getsize(filename)
  209. if l > length:
  210. # This is the truncation Bram was talking about that no one
  211. # else thinks is a good idea.
  212. #h = file(filename, 'rb+')
  213. #make_file_sparse(filename, h, length)
  214. #h.truncate(length)
  215. #h.close()
  216. l = length
  217. a = get_allocated_regions(filename, begin=0, length=l)
  218. if a is not None:
  219. a.offset(total)
  220. else:
  221. a = SparseSet()
  222. if l > 0:
  223. a.add(total, total + l)
  224. self.allocated_regions += a
  225. total += length
  226. self.total_length = total
  227. self.initialized = True
  228. return True
  229. def get_byte_range_for_filename(self, filename):
  230. if filename not in self.range_by_name:
  231. filename = os.path.normpath(filename)
  232. filename = os.path.join(self.save_path, filename)
  233. return self.range_by_name[filename]
  234. def was_preallocated(self, pos, length):
  235. return self.allocated_regions.is_range_in(pos, pos+length)
  236. def get_total_length(self):
  237. return self.total_length
  238. def _intervals(self, pos, amount):
  239. r = []
  240. stop = pos + amount
  241. p = max(bisect_right(self.ranges, (pos, 2 ** 500)) - 1, 0)
  242. for begin, end, filename in self.ranges[p:]:
  243. if begin >= stop:
  244. break
  245. r.append((filename, max(pos, begin) - begin, min(end, stop) - begin))
  246. return r
  247. def _read(self, filename, pos, amount):
  248. begin, end = self.get_byte_range_for_filename(filename)
  249. length = end - begin
  250. h = self.filepool.acquire_handle(filename, for_write=False, length=length)
  251. if h is None:
  252. return
  253. try:
  254. h.seek(pos)
  255. r = h.read(amount)
  256. finally:
  257. self.filepool.release_handle(filename, h)
  258. return r
  259. def _batch_read(self, pos, amount):
  260. dfs = []
  261. r = []
  262. # queue all the reads
  263. for filename, pos, end in self._intervals(pos, amount):
  264. df = self.filepool.read(self._read, filename, pos, end - pos)
  265. dfs.append(df)
  266. # yield on all the reads in order - they complete in any order
  267. exc = None
  268. for df in dfs:
  269. yield df
  270. try:
  271. r.append(df.getResult())
  272. except:
  273. exc = exc or sys.exc_info()
  274. if exc:
  275. raise exc[0], exc[1], exc[2]
  276. r = ''.join(r)
  277. if len(r) != amount:
  278. raise BTFailure(_("Short read (%d of %d) - something truncated files?") %
  279. (len(r), amount))
  280. yield r
  281. def read(self, pos, amount):
  282. df = launch_coroutine(wrap_task(self.add_task),
  283. self._batch_read, pos, amount)
  284. return df
  285. def _write(self, filename, pos, s):
  286. begin, end = self.get_byte_range_for_filename(filename)
  287. length = end - begin
  288. h = self.filepool.acquire_handle(filename, for_write=True, length=length)
  289. if h is None:
  290. return
  291. try:
  292. h.seek(pos)
  293. h.write(s)
  294. finally:
  295. self.filepool.release_handle(filename, h)
  296. return len(s)
  297. def _batch_write(self, pos, s):
  298. dfs = []
  299. total = 0
  300. amount = len(s)
  301. # queue all the writes
  302. for filename, begin, end in self._intervals(pos, amount):
  303. length = end - begin
  304. d = buffer(s, total, length)
  305. total += length
  306. df = self.filepool.write(self._write, filename, begin, d)
  307. dfs.append(df)
  308. # yield on all the writes - they complete in any order
  309. exc = None
  310. for df in dfs:
  311. yield df
  312. try:
  313. df.getResult()
  314. except:
  315. exc = exc or sys.exc_info()
  316. if exc:
  317. raise exc[0], exc[1], exc[2]
  318. yield total
  319. def write(self, pos, s):
  320. df = launch_coroutine(wrap_task(self.add_task),
  321. self._batch_write, pos, s)
  322. return df
  323. def close(self):
  324. if not self.initialized:
  325. self.startup_df.addCallback(lambda *a : self.filepool.close_files(self.range_by_name))
  326. return self.startup_df
  327. self.filepool.close_files(self.range_by_name)
  328. def downloaded(self, pos, length):
  329. for filename, begin, end in self._intervals(pos, length):
  330. self.undownloaded[filename] -= end - begin