IPC.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.1 (the License). You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License. You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10. # Written by Greg Hazel
  11. # based on code by Uoti Urpala
  12. from __future__ import generators
  13. import os
  14. import time
  15. import Queue
  16. import socket
  17. import logging
  18. import traceback
  19. if os.name == 'nt':
  20. from BitTorrent import pykill
  21. #import BTL.likewin32api as win32api
  22. import win32api
  23. import win32event
  24. import winerror
  25. import win32ui # needed for dde
  26. import dde
  27. import pywin.mfc.object
  28. from binascii import b2a_hex
  29. from BTL.translation import _
  30. from BitTorrent.RawServer_twisted import Handler
  31. from BitTorrent.platform import get_dot_dir
  32. from BitTorrent import BTFailure
  33. from BTL.platform import app_name, encode_for_filesystem
  34. from BTL.exceptions import str_exc
  35. ipc_logger = logging.getLogger('IPC')
  36. ipc_logger.setLevel(logging.DEBUG)
  37. def toint(s):
  38. return int(b2a_hex(s), 16)
  39. def tobinary(i):
  40. return (chr(i >> 24) + chr((i >> 16) & 0xFF) +
  41. chr((i >> 8) & 0xFF) + chr(i & 0xFF))
  42. CONTROL_SOCKET_PORT = 46881
  43. class ControlsocketListener(Handler):
  44. def __init__(self, callback):
  45. self.callback = callback
  46. def connection_made(self, connection):
  47. connection.handler = MessageReceiver(self.callback)
  48. class MessageReceiver(Handler):
  49. def __init__(self, callback):
  50. self.callback = callback
  51. self._buffer = []
  52. self._buffer_len = 0
  53. self._reader = self._read_messages()
  54. self._next_len = self._reader.next()
  55. def _read_messages(self):
  56. while True:
  57. yield 4
  58. l = toint(self._message)
  59. yield l
  60. action = self._message
  61. if action in ('no-op',):
  62. self.callback(action, None)
  63. else:
  64. yield 4
  65. l = toint(self._message)
  66. yield l
  67. data = self._message
  68. if action in ('show_error','start_torrent'):
  69. self.callback(action, data)
  70. else:
  71. yield 4
  72. l = toint(self._message)
  73. yield l
  74. path = self._message
  75. if action in ('publish_torrent'):
  76. self.callback(action, data, path)
  77. # copied from Connecter.py
  78. def data_came_in(self, conn, s):
  79. while True:
  80. i = self._next_len - self._buffer_len
  81. if i > len(s):
  82. self._buffer.append(s)
  83. self._buffer_len += len(s)
  84. return
  85. m = s[:i]
  86. if self._buffer_len > 0:
  87. self._buffer.append(m)
  88. m = ''.join(self._buffer)
  89. self._buffer = []
  90. self._buffer_len = 0
  91. s = s[i:]
  92. self._message = m
  93. try:
  94. self._next_len = self._reader.next()
  95. except StopIteration:
  96. self._reader = None
  97. conn.close()
  98. return
  99. def connection_lost(self, conn):
  100. self._reader = None
  101. pass
  102. def connection_flushed(self, conn):
  103. pass
  104. class IPC(object):
  105. """Used for communication between raw server thread and other threads."""
  106. def __init__(self, rawserver, config, name="controlsocket"):
  107. self.rawserver = rawserver
  108. self.name = name
  109. self.config = config
  110. self.callback = None
  111. self._command_q = Queue.Queue()
  112. def create(self):
  113. pass
  114. def start(self, callback):
  115. self.callback = callback
  116. while not self._command_q.empty():
  117. self.callback(*self._command_q.get())
  118. def send_command(self, command, *args):
  119. pass
  120. def handle_command(self, *args):
  121. if callable(self.callback):
  122. return self.callback(*args)
  123. self._command_q.put(args)
  124. def stop(self):
  125. pass
  126. class IPCSocketBase(IPC):
  127. def __init__(self, *args):
  128. IPC.__init__(self, *args)
  129. self.port = CONTROL_SOCKET_PORT
  130. self.controlsocket = None
  131. def start(self, callback):
  132. IPC.start(self, callback)
  133. self.rawserver.start_listening(self.controlsocket,
  134. ControlsocketListener(self.handle_command))
  135. def stop(self):
  136. # safe double-stop, since MultiTorrent seems to be prone to do so
  137. if self.controlsocket:
  138. # it's possible we're told to stop after controlsocket creation but
  139. # before rawserver registration
  140. if self.rawserver:
  141. self.rawserver.stop_listening(self.controlsocket)
  142. self.controlsocket.close()
  143. self.controlsocket = None
  144. class IPCUnixSocket(IPCSocketBase):
  145. def __init__(self, *args):
  146. IPCSocketBase.__init__(self, *args)
  147. data_dir,bad = encode_for_filesystem(self.config['data_dir'])
  148. if bad:
  149. raise BTFailure(_("Invalid path encoding."))
  150. self.socket_filename = os.path.join(data_dir, self.name)
  151. def create(self):
  152. filename = self.socket_filename
  153. if os.path.exists(filename):
  154. try:
  155. self.send_command('no-op')
  156. except BTFailure:
  157. pass
  158. else:
  159. raise BTFailure(_("Could not create control socket: already in use"))
  160. try:
  161. os.unlink(filename)
  162. except OSError, e:
  163. raise BTFailure(_("Could not remove old control socket filename:")
  164. + str_exc(e))
  165. try:
  166. controlsocket = self.rawserver.create_unixserversocket(filename)
  167. except socket.error, e:
  168. raise BTFailure(_("Could not create control socket: ") + str_exc(e))
  169. self.controlsocket = controlsocket
  170. # blocking version without rawserver
  171. def send_command(self, command, *args):
  172. s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  173. filename = self.socket_filename
  174. try:
  175. s.connect(filename)
  176. s.send(tobinary(len(command)))
  177. s.send(command)
  178. for arg in args:
  179. s.send(tobinary(len(arg)))
  180. s.send(arg)
  181. s.close()
  182. except socket.error, e:
  183. s.close()
  184. raise BTFailure(_("Could not send command: ") + str_exc(e))
  185. class IPCWin32Socket(IPCSocketBase):
  186. def __init__(self, *args):
  187. IPCSocketBase.__init__(self, *args)
  188. self.socket_filename = os.path.join(self.config['data_dir'], self.name)
  189. self.mutex = None
  190. self.master = 0
  191. def _get_sic_path(self):
  192. configdir = get_dot_dir()
  193. filename = os.path.join(configdir, ".btcontrol")
  194. return filename
  195. def create(self):
  196. obtain_mutex = 1
  197. mutex = win32event.CreateMutex(None, obtain_mutex, app_name)
  198. # prevent the PyHANDLE from going out of scope, ints are fine
  199. self.mutex = int(mutex)
  200. mutex.Detach()
  201. lasterror = win32api.GetLastError()
  202. if lasterror == winerror.ERROR_ALREADY_EXISTS:
  203. takeover = 0
  204. try:
  205. # if the mutex already exists, discover which port to connect to.
  206. # if something goes wrong with that, tell us to take over the
  207. # role of master
  208. takeover = self.discover_sic_socket()
  209. except:
  210. pass
  211. if not takeover:
  212. raise BTFailure(_("Global mutex already created."))
  213. self.master = 1
  214. # lazy free port code
  215. port_limit = 50000
  216. while self.port < port_limit:
  217. try:
  218. controlsocket = self.rawserver.create_serversocket(self.port,
  219. '127.0.0.1')
  220. self.controlsocket = controlsocket
  221. break
  222. except socket.error, e:
  223. self.port += 1
  224. if self.port >= port_limit:
  225. raise BTFailure(_("Could not find an open port!"))
  226. filename = self._get_sic_path()
  227. (path, name) = os.path.split(filename)
  228. try:
  229. os.makedirs(path)
  230. except OSError, e:
  231. # 17 is dir exists
  232. if e.errno != 17:
  233. BTFailure(_("Could not create application data directory!"))
  234. f = open(filename, "w")
  235. f.write(str(self.port))
  236. f.close()
  237. # we're done writing the control file, release the mutex so other instances can lock it and read the file
  238. # but don't destroy the handle until the application closes, so that the named mutex is still around
  239. win32event.ReleaseMutex(self.mutex)
  240. def discover_sic_socket(self):
  241. takeover = 0
  242. # mutex exists and has been opened (not created, not locked).
  243. # wait for it so we can read the file
  244. r = win32event.WaitForSingleObject(self.mutex, win32event.INFINITE)
  245. # WAIT_OBJECT_0 means the mutex was obtained
  246. # WAIT_ABANDONED means the mutex was obtained, and it had previously been abandoned
  247. if (r != win32event.WAIT_OBJECT_0) and (r != win32event.WAIT_ABANDONED):
  248. raise BTFailure(_("Could not acquire global mutex lock for controlsocket file!"))
  249. filename = self._get_sic_path()
  250. try:
  251. f = open(filename, "r")
  252. self.port = int(f.read())
  253. f.close()
  254. except:
  255. if r == win32event.WAIT_ABANDONED:
  256. ipc_logger.warning(_("A previous instance of BT was not cleaned up properly. Continuing."))
  257. # take over the role of master
  258. takeover = 1
  259. else:
  260. ipc_logger.warning((_("Another instance of BT is running, but \"%s\" does not exist.\n") % filename)+
  261. _("I'll guess at the port."))
  262. try:
  263. self.port = CONTROL_SOCKET_PORT
  264. self.send_command('no-op')
  265. ipc_logger.warning(_("Port found: %d") % self.port)
  266. try:
  267. f = open(filename, "w")
  268. f.write(str(self.port))
  269. f.close()
  270. except:
  271. traceback.print_exc()
  272. except:
  273. # this is where this system falls down.
  274. # There's another copy of BitTorrent running, or something locking the mutex,
  275. # but I can't communicate with it.
  276. ipc_logger.warning(_("Could not find port."))
  277. # we're done reading the control file, release the mutex so other instances can lock it and read the file
  278. win32event.ReleaseMutex(self.mutex)
  279. return takeover
  280. #blocking version without rawserver
  281. def send_command(self, command, *datas):
  282. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  283. try:
  284. s.connect(('127.0.0.1', self.port))
  285. s.send(tobinary(len(command)))
  286. s.send(command)
  287. for data in datas:
  288. data = data.encode('utf-8')
  289. s.send(tobinary(len(data)))
  290. s.send(data)
  291. s.close()
  292. except socket.error, e:
  293. try:
  294. s.close()
  295. except:
  296. pass
  297. raise BTFailure(_("Could not send command: ") + str_exc(e))
  298. def stop(self):
  299. if self.master:
  300. r = win32event.WaitForSingleObject(self.mutex, win32event.INFINITE)
  301. filename = self._get_sic_path()
  302. try:
  303. os.remove(filename)
  304. except OSError, e:
  305. # print, but continue
  306. traceback.print_exc()
  307. self.master = 0
  308. win32event.ReleaseMutex(self.mutex)
  309. # close it so the named mutex goes away
  310. win32api.CloseHandle(self.mutex)
  311. self.mutex = None
  312. if os.name == 'nt':
  313. class HandlerObject(pywin.mfc.object.Object):
  314. def __init__(self, handler, target):
  315. self.handler = handler
  316. pywin.mfc.object.Object.__init__(self, target)
  317. class Topic(HandlerObject):
  318. def __init__(self, handler, target):
  319. target.AddItem(dde.CreateStringItem(""))
  320. HandlerObject.__init__(self, handler, target)
  321. def Request(self, x):
  322. # null byte hack
  323. x = x.replace("\\**0", "\0")
  324. items = x.split("|")
  325. self.handler(items[0], *items[1:])
  326. return ("OK")
  327. # remote procedure call
  328. #def Exec(self, x):
  329. # exec x
  330. class Server(HandlerObject):
  331. def CreateSystemTopic(self):
  332. return Topic(self.handler, dde.CreateServerSystemTopic())
  333. def Status(self, s):
  334. ipc_logger.debug(_("IPC Status: %s") % s)
  335. def stop(self):
  336. self.Shutdown()
  337. self.Destroy()
  338. class SingleInstanceMutex(object):
  339. def __init__(self):
  340. obtain_mutex = False
  341. self.mutex = win32event.CreateMutex(None, obtain_mutex, app_name)
  342. self.lasterror = win32api.GetLastError()
  343. def close(self):
  344. del self.mutex
  345. def IsAnotherInstanceRunning(self):
  346. return winerror.ERROR_ALREADY_EXISTS == self.lasterror
  347. if os.name == 'nt':
  348. g_mutex = SingleInstanceMutex()
  349. class IPCWin32DDE(IPC):
  350. def create(self):
  351. self.server = None
  352. if g_mutex.IsAnotherInstanceRunning():
  353. # test whether there is a program actually running that holds
  354. # the mutex.
  355. for i in xrange(10):
  356. # try to connect first
  357. self.client = Server(None, dde.CreateServer())
  358. self.client.Create(app_name, dde.CBF_FAIL_SELFCONNECTIONS|dde.APPCMD_CLIENTONLY)
  359. self.conversation = dde.CreateConversation(self.client)
  360. try:
  361. self.conversation.ConnectTo(app_name, self.name)
  362. raise BTFailure("DDE Conversation connected.")
  363. except dde.error, e:
  364. # no one is listening
  365. pass
  366. # clean up
  367. self.client.stop()
  368. del self.client
  369. del self.conversation
  370. ipc_logger.warning("No DDE Server is listening, but the global mutex exists. Retry %d!" % i)
  371. time.sleep(1.0)
  372. # oh no you didn't!
  373. if i == 5:
  374. pykill.kill_process(app_name)
  375. # continuing might be dangerous (two instances)
  376. raise Exception("No DDE Server is listening, but the global mutex exists!")
  377. # start server
  378. self.server = Server(self.handle_command, dde.CreateServer())
  379. self.server.Create(app_name, dde.CBF_FAIL_SELFCONNECTIONS|dde.APPCLASS_STANDARD)
  380. self.server.AddTopic(Topic(self.handle_command, dde.CreateTopic(self.name)))
  381. def send_command(self, command, *args):
  382. s = '|'.join([command, ] + list(args))
  383. # null byte hack
  384. if s.count("\0") > 0:
  385. ipc_logger.warinig("IPC: String with null byte(s):" + s)
  386. s = s.replace("\0", "\\**0")
  387. s = s.encode('utf-8')
  388. result = self.conversation.Request(s)
  389. def stop(self):
  390. if self.server:
  391. server = self.server
  392. self.server = None
  393. server.stop()
  394. if os.name == 'nt':
  395. #ipc_interface = IPCWin32Socket
  396. ipc_interface = IPCWin32DDE
  397. else:
  398. ipc_interface = IPCUnixSocket