| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- # The contents of this file are subject to the BitTorrent Open Source License
- # Version 1.1 (the License). You may not copy or use this file, in either
- # source code or executable form, except in compliance with the License. You
- # may obtain a copy of the License at http://www.bittorrent.com/license/.
- #
- # Software distributed under the License is distributed on an AS IS basis,
- # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
- # for the specific language governing rights and limitations under the
- # License.
- # Written by Bram Cohen and Greg Hazel
- import os
- import sys
- import Queue
- from bisect import bisect_right
- from BTL.translation import _
- from BTL.obsoletepythonsupport import set
- from BitTorrent import BTFailure
- from BTL.defer import Deferred, ThreadedDeferred, Failure, wrap_task
- from BTL.yielddefer import launch_coroutine
- from BitTorrent.platform import get_allocated_regions
- from BTL.sparse_set import SparseSet
- from BTL.DictWithLists import DictWithLists, DictWithSets
- import BTL.stackthreading as threading
- from BitTorrent.Storage_base import open_sparse_file, make_file_sparse
- from BitTorrent.Storage_base import bad_libc_workaround, is_open_for_write
- from BitTorrent.Storage_base import UnregisteredFileException
- class FilePool(object):
- def __init__(self, doneflag, add_task, external_add_task,
- max_files_open, num_disk_threads):
- self.doneflag = doneflag
- self.external_add_task = external_add_task
- self.file_to_torrent = {}
- self.free_handle_condition = threading.Condition()
- self.active_file_to_handles = DictWithSets()
- self.open_file_to_handles = DictWithLists()
- self.set_max_files_open(max_files_open)
- self.diskq = Queue.Queue()
- for i in xrange(num_disk_threads):
- t = threading.Thread(target=self._disk_thread,
- name="disk_thread-%s" % (i+1))
- t.start()
- self.doneflag.addCallback(self.finalize)
- def finalize(self, r=None):
- # re-queue self so all threads die. we end up with one extra event on
- # the queue, but who cares.
- self._create_op(self.finalize)
- def close_all(self):
- failures = {}
- self.free_handle_condition.acquire()
- while self.get_open_file_count() > 0:
- while len(self.open_file_to_handles) > 0:
- filename, handle = self.open_file_to_handles.popitem()
- try:
- handle.close()
- except Exception, e:
- failures[self.file_to_torrent[filename]] = e
- self.free_handle_condition.notify()
- if self.get_open_file_count() > 0:
- self.free_handle_condition.wait(1)
- self.free_handle_condition.release()
- for torrent, e in failures.iteritems():
- torrent.got_exception(e)
- def close_files(self, file_set):
- failures = set()
- self.free_handle_condition.acquire()
- done = False
- while not done:
- filenames = list(self.open_file_to_handles.iterkeys())
- for filename in filenames:
- if filename not in file_set:
- continue
- handles = self.open_file_to_handles.poprow(filename)
- for handle in handles:
- try:
- handle.close()
- except Exception, e:
- failures.add(e)
- self.free_handle_condition.notify()
- done = True
- for filename in file_set.iterkeys():
- if filename in self.active_file_to_handles:
- done = False
- break
- if not done:
- self.free_handle_condition.wait(0.5)
- self.free_handle_condition.release()
- if len(failures) > 0:
- raise failures.pop()
- def set_max_files_open(self, max_files_open):
- if max_files_open <= 0:
- max_files_open = 1e100
- self.max_files_open = max_files_open
- self.close_all()
- def add_files(self, files, torrent):
- for filename in files:
- if filename in self.file_to_torrent:
- raise BTFailure(_("File %s belongs to another running torrent")
- % filename)
- for filename in files:
- self.file_to_torrent[filename] = torrent
- def remove_files(self, files):
- for filename in files:
- del self.file_to_torrent[filename]
- def _ensure_exists(self, filename, length=0):
- if not os.path.exists(filename):
- f = os.path.split(filename)[0]
- if f != '' and not os.path.exists(f):
- os.makedirs(f)
- f = file(filename, 'wb')
- make_file_sparse(filename, f, length)
- f.close()
- def get_open_file_count(self):
- t = self.open_file_to_handles.total_length()
- t += self.active_file_to_handles.total_length()
- return t
- def acquire_handle(self, filename, for_write, length=0):
- # this will block until a new file handle can be made
- self.free_handle_condition.acquire()
- if filename not in self.file_to_torrent:
- self.free_handle_condition.release()
- raise UnregisteredFileException()
- while self.active_file_to_handles.total_length() == self.max_files_open:
- self.free_handle_condition.wait()
- if filename in self.open_file_to_handles:
- handle = self.open_file_to_handles.pop_from_row(filename)
- if for_write and not is_open_for_write(handle.mode):
- handle.close()
- handle = open_sparse_file(filename, 'rb+', length=length)
- #elif not for_write and is_open_for_write(handle.mode):
- # handle.close()
- # handle = open_sparse_file(filename, 'rb', length=length)
- else:
- if self.get_open_file_count() == self.max_files_open:
- oldfname, oldhandle = self.open_file_to_handles.popitem()
- oldhandle.close()
- self._ensure_exists(filename, length)
- if for_write:
- handle = open_sparse_file(filename, 'rb+', length=length)
- else:
- handle = open_sparse_file(filename, 'rb', length=length)
- self.active_file_to_handles.push_to_row(filename, handle)
- self.free_handle_condition.release()
- return handle
- def release_handle(self, filename, handle):
- self.free_handle_condition.acquire()
- self.active_file_to_handles.remove_fom_row(filename, handle)
- self.open_file_to_handles.push_to_row(filename, handle)
- self.free_handle_condition.notify()
- self.free_handle_condition.release()
- def _create_op(self, _f, *args, **kwargs):
- df = Deferred()
- self.diskq.put((df, _f, args, kwargs))
- return df
- read = _create_op
- write = _create_op
- def _disk_thread(self):
- while not self.doneflag.isSet():
- df, func, args, kwargs = self.diskq.get(True)
- try:
- v = func(*args, **kwargs)
- except:
- self.external_add_task(0, df.errback, Failure())
- else:
- self.external_add_task(0, df.callback, v)
- class Storage(object):
- def __init__(self, config, filepool, save_path,
- files, add_task,
- external_add_task, doneflag):
- self.filepool = filepool
- self.config = config
- self.doneflag = doneflag
- self.add_task = add_task
- self.external_add_task = external_add_task
- self.initialize(save_path, files)
- def initialize(self, save_path, files):
- # a list of bytes ranges and filenames for window-based IO
- self.ranges = []
- # a dict of filename-to-ranges for piece priorities and filename lookup
- self.range_by_name = {}
- # a sparse set for smart allocation detection
- self.allocated_regions = SparseSet()
- # dict of filename-to-length on disk (for % complete in the file view)
- self.undownloaded = {}
- self.save_path = save_path
- # Rather implement this as an ugly hack here than change all the
- # individual calls. Affects all torrent instances using this module.
- if self.config['bad_libc_workaround']:
- bad_libc_workaround()
- self.initialized = False
- self.startup_df = ThreadedDeferred(wrap_task(self.external_add_task),
- self._build_file_structs,
- self.filepool, files)
- return self.startup_df
- def _build_file_structs(self, filepool, files):
- total = 0
- for filename, length in files:
- # we're shutting down, abort.
- if self.doneflag.isSet():
- return False
- self.undownloaded[filename] = length
- if length > 0:
- self.ranges.append((total, total + length, filename))
- self.range_by_name[filename] = (total, total + length)
- if os.path.exists(filename):
- if not os.path.isfile(filename):
- raise BTFailure(_("File %s already exists, but is not a "
- "regular file") % filename)
- l = os.path.getsize(filename)
- if l > length:
- # This is the truncation Bram was talking about that no one
- # else thinks is a good idea.
- #h = file(filename, 'rb+')
- #make_file_sparse(filename, h, length)
- #h.truncate(length)
- #h.close()
- l = length
- a = get_allocated_regions(filename, begin=0, length=l)
- if a is not None:
- a.offset(total)
- else:
- a = SparseSet()
- if l > 0:
- a.add(total, total + l)
- self.allocated_regions += a
- total += length
- self.total_length = total
- self.initialized = True
- return True
- def get_byte_range_for_filename(self, filename):
- if filename not in self.range_by_name:
- filename = os.path.normpath(filename)
- filename = os.path.join(self.save_path, filename)
- return self.range_by_name[filename]
- def was_preallocated(self, pos, length):
- return self.allocated_regions.is_range_in(pos, pos+length)
- def get_total_length(self):
- return self.total_length
- def _intervals(self, pos, amount):
- r = []
- stop = pos + amount
- p = max(bisect_right(self.ranges, (pos, 2 ** 500)) - 1, 0)
- for begin, end, filename in self.ranges[p:]:
- if begin >= stop:
- break
- r.append((filename, max(pos, begin) - begin, min(end, stop) - begin))
- return r
- def _read(self, filename, pos, amount):
- begin, end = self.get_byte_range_for_filename(filename)
- length = end - begin
- h = self.filepool.acquire_handle(filename, for_write=False, length=length)
- if h is None:
- return
- try:
- h.seek(pos)
- r = h.read(amount)
- finally:
- self.filepool.release_handle(filename, h)
- return r
- def _batch_read(self, pos, amount):
- dfs = []
- r = []
- # queue all the reads
- for filename, pos, end in self._intervals(pos, amount):
- df = self.filepool.read(self._read, filename, pos, end - pos)
- dfs.append(df)
- # yield on all the reads in order - they complete in any order
- exc = None
- for df in dfs:
- yield df
- try:
- r.append(df.getResult())
- except:
- exc = exc or sys.exc_info()
- if exc:
- raise exc[0], exc[1], exc[2]
- r = ''.join(r)
- if len(r) != amount:
- raise BTFailure(_("Short read (%d of %d) - something truncated files?") %
- (len(r), amount))
- yield r
- def read(self, pos, amount):
- df = launch_coroutine(wrap_task(self.add_task),
- self._batch_read, pos, amount)
- return df
- def _write(self, filename, pos, s):
- begin, end = self.get_byte_range_for_filename(filename)
- length = end - begin
- h = self.filepool.acquire_handle(filename, for_write=True, length=length)
- if h is None:
- return
- try:
- h.seek(pos)
- h.write(s)
- finally:
- self.filepool.release_handle(filename, h)
- return len(s)
- def _batch_write(self, pos, s):
- dfs = []
- total = 0
- amount = len(s)
- # queue all the writes
- for filename, begin, end in self._intervals(pos, amount):
- length = end - begin
- d = buffer(s, total, length)
- total += length
- df = self.filepool.write(self._write, filename, begin, d)
- dfs.append(df)
- # yield on all the writes - they complete in any order
- exc = None
- for df in dfs:
- yield df
- try:
- df.getResult()
- except:
- exc = exc or sys.exc_info()
- if exc:
- raise exc[0], exc[1], exc[2]
- yield total
- def write(self, pos, s):
- df = launch_coroutine(wrap_task(self.add_task),
- self._batch_write, pos, s)
- return df
- def close(self):
- if not self.initialized:
- self.startup_df.addCallback(lambda *a : self.filepool.close_files(self.range_by_name))
- return self.startup_df
- self.filepool.close_files(self.range_by_name)
- def downloaded(self, pos, length):
- for filename, begin, end in self._intervals(pos, length):
- self.undownloaded[filename] -= end - begin
|