Storage_IOCP.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Greg Hazel
  11. import os
  12. import sys
  13. import ctypes
  14. import win32file
  15. from bisect import bisect_right
  16. from BTL.translation import _
  17. from BTL import BTFailure
  18. from BTL.defer import Deferred, ThreadedDeferred, Failure, wrap_task
  19. from BTL.yielddefer import launch_coroutine
  20. from BitTorrent.platform import get_allocated_regions
  21. from BTL.sparse_set import SparseSet
  22. from BTL.DictWithLists import DictWithLists, DictWithSets
  23. from BitTorrent.Storage_base import make_file_sparse, bad_libc_workaround, is_open_for_write
  24. from BitTorrent.Storage_base import open_sparse_file as open_sparse_file_base
  25. from BitTorrent.Storage_base import UnregisteredFileException
  26. # not needed, but it raises errors for platforms that don't support iocp
  27. from twisted.internet.iocpreactor import _iocp
  28. from twisted.internet.iocpreactor.proactor import Proactor
  29. from twisted.internet import reactor
  30. assert isinstance(reactor, Proactor), "You imported twisted.internet.reactor before RawServer_twisted!"
  31. class OverlappedOp:
  32. def initiateOp(self, handle, seekpos, buffer):
  33. assert len(buffer) > 0
  34. assert seekpos >= 0
  35. df = Deferred()
  36. try:
  37. self.op(handle, seekpos, buffer,
  38. self.ovDone, (handle, buffer))
  39. except:
  40. df.errback(Failure())
  41. else:
  42. self.df = df
  43. return df
  44. def op(self, *a, **kw):
  45. raise NotImplementedError
  46. def ovDone(self, ret, bytes, (handle, buffer)):
  47. df = self.df
  48. del self.df
  49. if ret or not bytes:
  50. try:
  51. raise ctypes.WinError()
  52. except:
  53. df.errback(Failure())
  54. else:
  55. self.opComplete(df, bytes, buffer)
  56. def opComplete(self, df, bytes, buffer):
  57. raise NotImplementedError
  58. class ReadFileOp(OverlappedOp):
  59. op = reactor.issueReadFile
  60. def opComplete(self, df, bytes, buffer):
  61. df.callback(buffer[:bytes])
  62. class WriteFileOp(OverlappedOp):
  63. op = reactor.issueWriteFile
  64. def opComplete(self, df, bytes, buffer):
  65. df.callback(bytes)
  66. class IOCPFile(object):
  67. # standard block size by default
  68. buffer_size = 16384
  69. def __init__(self, handle):
  70. from twisted.internet import reactor
  71. self.reactor = reactor
  72. self.handle = handle
  73. self.osfhandle = win32file._get_osfhandle(self.handle.fileno())
  74. self.mode = self.handle.mode
  75. # CloseHandle automatically calls CancelIo
  76. self.close = self.handle.close
  77. self.fileno = self.handle.fileno
  78. self.read_op = ReadFileOp()
  79. self.write_op = WriteFileOp()
  80. self.readbuf = self.reactor.AllocateReadBuffer(self.buffer_size)
  81. def seek(self, offset):
  82. self.seekpos = offset
  83. def write(self, data):
  84. return self.write_op.initiateOp(self.osfhandle, self.seekpos, data)
  85. def read(self, bytes):
  86. if bytes == self.buffer_size:
  87. readbuf = self.readbuf
  88. else:
  89. # hmmmm, slow. but, readfile tries to fill the buffer,
  90. # so maybe this is better than reading too much all the time.
  91. readbuf = self.reactor.AllocateReadBuffer(bytes)
  92. return self.read_op.initiateOp(self.osfhandle, self.seekpos, readbuf)
  93. def open_sparse_file(path, mode, length=0, overlapped=True):
  94. return IOCPFile(open_sparse_file_base(path, mode, length, overlapped))
  95. class FilePool(object):
  96. def __init__(self, doneflag, add_task, external_add_task, max_files_open, num_disk_threads):
  97. self.add_task = add_task
  98. self.file_to_torrent = {}
  99. self.waiting_ops = []
  100. self.active_file_to_handles = DictWithSets()
  101. self.open_file_to_handles = DictWithLists()
  102. self.set_max_files_open(max_files_open)
  103. def close_all(self):
  104. df = Deferred()
  105. self._close_all(df)
  106. return df
  107. def _close_all(self, df):
  108. failures = {}
  109. while len(self.open_file_to_handles) > 0:
  110. filename, handle = self.open_file_to_handles.popitem()
  111. try:
  112. handle.close()
  113. except:
  114. failures[self.file_to_torrent[filename]] = Failure()
  115. for torrent, failure in failures.iteritems():
  116. torrent.got_exception(failure)
  117. if self.get_open_file_count() > 0:
  118. # it would be nice to wait on the deferred for the outstanding ops
  119. self.add_task(0.5, self._close_all, df)
  120. else:
  121. df.callback(True)
  122. def close_files(self, file_set):
  123. df = Deferred()
  124. self._close_files(df, file_set)
  125. return df
  126. def _close_files(self, df, file_set):
  127. failure = None
  128. done = False
  129. filenames = self.open_file_to_handles.keys()
  130. for filename in filenames:
  131. if filename not in file_set:
  132. continue
  133. handles = self.open_file_to_handles.poprow(filename)
  134. for handle in handles:
  135. try:
  136. handle.close()
  137. except:
  138. failure = Failure()
  139. done = True
  140. for filename in file_set.iterkeys():
  141. if filename in self.active_file_to_handles:
  142. done = False
  143. break
  144. if failure is not None:
  145. df.errback(failure)
  146. if not done:
  147. # it would be nice to wait on the deferred for the outstanding ops
  148. self.add_task(0.5, self._close_files, df, file_set)
  149. else:
  150. df.callback(True)
  151. def set_max_files_open(self, max_files_open):
  152. if max_files_open <= 0:
  153. max_files_open = 1e100
  154. self.max_files_open = max_files_open
  155. self.close_all()
  156. def add_files(self, files, torrent):
  157. for filename in files:
  158. if filename in self.file_to_torrent:
  159. raise BTFailure(_("File %s belongs to another running torrent")
  160. % filename)
  161. for filename in files:
  162. self.file_to_torrent[filename] = torrent
  163. def remove_files(self, files):
  164. for filename in files:
  165. del self.file_to_torrent[filename]
  166. def _ensure_exists(self, filename, length=0):
  167. if not os.path.exists(filename):
  168. f = os.path.split(filename)[0]
  169. if f != '' and not os.path.exists(f):
  170. os.makedirs(f)
  171. f = file(filename, 'wb')
  172. make_file_sparse(filename, f, length)
  173. f.close()
  174. def get_open_file_count(self):
  175. t = self.open_file_to_handles.total_length()
  176. t += self.active_file_to_handles.total_length()
  177. return t
  178. def free_handle_notify(self):
  179. if self.waiting_ops:
  180. args = self.waiting_ops.pop(0)
  181. self._produce_handle(*args)
  182. def acquire_handle(self, filename, for_write, length=0):
  183. df = Deferred()
  184. if filename not in self.file_to_torrent:
  185. raise UnregisteredFileException()
  186. if self.active_file_to_handles.total_length() == self.max_files_open:
  187. self.waiting_ops.append((df, filename, for_write, length))
  188. else:
  189. self._produce_handle(df, filename, for_write, length)
  190. return df
  191. def _produce_handle(self, df, filename, for_write, length):
  192. if filename in self.open_file_to_handles:
  193. handle = self.open_file_to_handles.pop_from_row(filename)
  194. if for_write and not is_open_for_write(handle.mode):
  195. handle.close()
  196. handle = open_sparse_file(filename, 'rb+', length=length)
  197. #elif not for_write and is_open_for_write(handle.mode):
  198. # handle.close()
  199. # handle = file(filename, 'rb', 0)
  200. else:
  201. if self.get_open_file_count() == self.max_files_open:
  202. oldfname, oldhandle = self.open_file_to_handles.popitem()
  203. oldhandle.close()
  204. self._ensure_exists(filename, length)
  205. if for_write:
  206. handle = open_sparse_file(filename, 'rb+', length=length)
  207. else:
  208. handle = open_sparse_file(filename, 'rb', length=length)
  209. self.active_file_to_handles.push_to_row(filename, handle)
  210. df.callback(handle)
  211. def release_handle(self, filename, handle):
  212. self.active_file_to_handles.remove_fom_row(filename, handle)
  213. self.open_file_to_handles.push_to_row(filename, handle)
  214. self.free_handle_notify()
  215. class Storage(object):
  216. def __init__(self, config, filepool, save_path, files, add_task,
  217. external_add_task, doneflag):
  218. self.filepool = filepool
  219. self.config = config
  220. self.doneflag = doneflag
  221. self.add_task = add_task
  222. self.external_add_task = external_add_task
  223. self.initialize(save_path, files)
  224. def initialize(self, save_path, files):
  225. # a list of bytes ranges and filenames for window-based IO
  226. self.ranges = []
  227. # a dict of filename-to-ranges for piece priorities and filename lookup
  228. self.range_by_name = {}
  229. # a sparse set for smart allocation detection
  230. self.allocated_regions = SparseSet()
  231. # dict of filename-to-length on disk (for % complete in the file view)
  232. self.undownloaded = {}
  233. self.save_path = save_path
  234. # Rather implement this as an ugly hack here than change all the
  235. # individual calls. Affects all torrent instances using this module.
  236. if self.config['bad_libc_workaround']:
  237. bad_libc_workaround()
  238. self.initialized = False
  239. self.startup_df = ThreadedDeferred(wrap_task(self.external_add_task),
  240. self._build_file_structs,
  241. self.filepool, files)
  242. return self.startup_df
  243. def _build_file_structs(self, filepool, files):
  244. total = 0
  245. for filename, length in files:
  246. # we're shutting down, abort.
  247. if self.doneflag.isSet():
  248. return False
  249. self.undownloaded[filename] = length
  250. if length > 0:
  251. self.ranges.append((total, total + length, filename))
  252. self.range_by_name[filename] = (total, total + length)
  253. if os.path.exists(filename):
  254. if not os.path.isfile(filename):
  255. raise BTFailure(_("File %s already exists, but is not a "
  256. "regular file") % filename)
  257. l = os.path.getsize(filename)
  258. if l > length:
  259. # This is the truncation Bram was talking about that no one
  260. # else thinks is a good idea.
  261. #h = file(filename, 'rb+')
  262. #make_file_sparse(filename, h, length)
  263. #h.truncate(length)
  264. #h.close()
  265. l = length
  266. a = get_allocated_regions(filename, begin=0, length=l)
  267. if a is not None:
  268. a.offset(total)
  269. else:
  270. a = SparseSet()
  271. if l > 0:
  272. a.add(total, total + l)
  273. self.allocated_regions += a
  274. total += length
  275. self.total_length = total
  276. self.initialized = True
  277. return True
  278. def get_byte_range_for_filename(self, filename):
  279. if filename not in self.range_by_name:
  280. filename = os.path.normpath(filename)
  281. filename = os.path.join(self.save_path, filename)
  282. return self.range_by_name[filename]
  283. def was_preallocated(self, pos, length):
  284. return self.allocated_regions.is_range_in(pos, pos+length)
  285. def get_total_length(self):
  286. return self.total_length
  287. def _intervals(self, pos, amount):
  288. r = []
  289. stop = pos + amount
  290. p = max(bisect_right(self.ranges, (pos, 2 ** 500)) - 1, 0)
  291. for begin, end, filename in self.ranges[p:]:
  292. if begin >= stop:
  293. break
  294. r.append((filename,
  295. max(pos, begin) - begin, min(end, stop) - begin))
  296. return r
  297. def _file_op(self, filename, pos, param, write):
  298. begin, end = self.get_byte_range_for_filename(filename)
  299. length = end - begin
  300. hdf = self.filepool.acquire_handle(filename, for_write=write,
  301. length=length)
  302. def op(h):
  303. h.seek(pos)
  304. if write:
  305. odf = h.write(param)
  306. else:
  307. odf = h.read(param)
  308. def like_finally(r):
  309. self.filepool.release_handle(filename, h)
  310. return r
  311. odf.addBoth(like_finally)
  312. return odf
  313. hdf.addCallback(op)
  314. return hdf
  315. def _batch_read(self, pos, amount):
  316. dfs = []
  317. r = []
  318. # queue all the reads
  319. for filename, pos, end in self._intervals(pos, amount):
  320. df = self._file_op(filename, pos, end - pos, write=False)
  321. dfs.append(df)
  322. # yield on all the reads in order - they complete in any order
  323. exc = None
  324. for df in dfs:
  325. yield df
  326. try:
  327. r.append(df.getResult())
  328. except:
  329. exc = exc or sys.exc_info()
  330. if exc:
  331. raise exc[0], exc[1], exc[2]
  332. r = ''.join(r)
  333. if len(r) != amount:
  334. raise BTFailure(_("Short read (%d of %d) - "
  335. "something truncated files?") %
  336. (len(r), amount))
  337. yield r
  338. def read(self, pos, amount):
  339. df = launch_coroutine(wrap_task(self.add_task),
  340. self._batch_read, pos, amount)
  341. return df
  342. def _batch_write(self, pos, s):
  343. dfs = []
  344. total = 0
  345. amount = len(s)
  346. # queue all the writes
  347. for filename, begin, end in self._intervals(pos, amount):
  348. length = end - begin
  349. assert length > 0, '%s %s' % (pos, amount)
  350. d = buffer(s, total, length)
  351. total += length
  352. df = self._file_op(filename, begin, d, write=True)
  353. dfs.append(df)
  354. assert total == amount, '%s and %s' % (total, amount)
  355. written = 0
  356. # yield on all the writes - they complete in any order
  357. exc = None
  358. for df in dfs:
  359. yield df
  360. try:
  361. written += df.getResult()
  362. except:
  363. exc = exc or sys.exc_info()
  364. if exc:
  365. raise exc[0], exc[1], exc[2]
  366. assert total == written, '%s and %s' % (total, written)
  367. yield total
  368. def write(self, pos, s):
  369. df = launch_coroutine(wrap_task(self.add_task),
  370. self._batch_write, pos, s)
  371. return df
  372. def close(self):
  373. if not self.initialized:
  374. def post_init(r):
  375. return self.filepool.close_files(self.range_by_name)
  376. self.startup_df.addCallback(post_init)
  377. return self.startup_df
  378. df = self.filepool.close_files(self.range_by_name)
  379. return df
  380. def downloaded(self, pos, length):
  381. for filename, begin, end in self._intervals(pos, length):
  382. self.undownloaded[filename] -= end - begin