twisted_ebrpc.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. """A generic resource for publishing objects via EBRPC.
  2. Requires EBRPC
  3. API Stability: semi-stable
  4. """
  5. from __future__ import nested_scopes
  6. __version__ = "$Revision: 3249 $"[11:-2]
  7. # System Imports
  8. import ebrpc
  9. import urlparse
  10. from cStringIO import StringIO
  11. from gzip import GzipFile
  12. pipeline_debug = False
  13. version = "1.0"
  14. from BTL.platform import app_name
  15. from BTL.reactor_magic import reactor
  16. from BTL.exceptions import str_exc
  17. from BTL.protocol import SmartReconnectingClientFactory
  18. from BTL.ebrpclib import ServerProxy
  19. import twisted.web
  20. if twisted.web.__version__ < '0.6.0':
  21. raise ImportError("BTL.twisted_ebrpc requires twisted.web 0.6.0 or greater,"
  22. " from Twisted 2.4.0.\nYou appear to have twisted.web "
  23. "version %s installed at:\n%s" % (twisted.web.__version__,
  24. twisted.web.__file__))
  25. from twisted.web import resource, server
  26. from twisted.internet import protocol
  27. from twisted.python import log, reflect, failure
  28. from twisted.web import http
  29. from twisted.internet import defer
  30. # Useful so people don't need to import ebrpc directly
  31. Fault = ebrpc.Fault
  32. class NoSuchFunction(Fault):
  33. """There is no function by the given name."""
  34. pass
  35. class Handler:
  36. """Handle a EBRPC request and store the state for a request in progress.
  37. Override the run() method and return result using self.result,
  38. a Deferred.
  39. We require this class since we're not using threads, so we can't
  40. encapsulate state in a running function if we're going to have
  41. to wait for results.
  42. For example, lets say we want to authenticate against twisted.cred,
  43. run a LDAP query and then pass its result to a database query, all
  44. as a result of a single EBRPC command. We'd use a Handler instance
  45. to store the state of the running command.
  46. """
  47. def __init__(self, resource, *args):
  48. self.resource = resource # the EBRPC resource we are connected to
  49. self.result = defer.Deferred()
  50. self.run(*args)
  51. def run(self, *args):
  52. # event driven equivalent of 'raise UnimplementedError'
  53. try:
  54. raise NotImplementedError("Implement run() in subclasses")
  55. except:
  56. self.result.errback(failure.Failure())
  57. def parse_accept_encoding(header):
  58. a = header.split(',')
  59. l = []
  60. for i in a:
  61. i = i.strip()
  62. if ';' not in i:
  63. type = i
  64. # hmmm
  65. l.append(('1', type))
  66. else:
  67. type, q = i.split(';')
  68. type = type.strip()
  69. q = q.strip()
  70. junk, q = q.split('=')
  71. q = q.strip()
  72. if q != '0':
  73. l.append((q, type))
  74. l.sort()
  75. l.reverse()
  76. l = [ t for q, t in l ]
  77. return l
  78. class EBRPC(resource.Resource):
  79. """A resource that implements EBRPC.
  80. You probably want to connect this to '/RPC2'.
  81. Methods published can return EBRPC serializable results, Faults,
  82. Binary, Boolean, DateTime, Deferreds, or Handler instances.
  83. By default methods beginning with 'ebrpc_' are published.
  84. Sub-handlers for prefixed methods (e.g., system.listMethods)
  85. can be added with putSubHandler. By default, prefixes are
  86. separated with a '.'. Override self.separator to change this.
  87. """
  88. # Error codes for Twisted, if they conflict with yours then
  89. # modify them at runtime.
  90. NOT_FOUND = 8001
  91. FAILURE = 8002
  92. isLeaf = 1
  93. separator = '.'
  94. def __init__(self):
  95. resource.Resource.__init__(self)
  96. self.subHandlers = {}
  97. def putSubHandler(self, prefix, handler):
  98. self.subHandlers[prefix] = handler
  99. def getSubHandler(self, prefix):
  100. return self.subHandlers.get(prefix, None)
  101. def getSubHandlerPrefixes(self):
  102. return self.subHandlers.keys()
  103. def _err(self, *a, **kw):
  104. log.err(*a, **kw)
  105. def render(self, request):
  106. request.setHeader('server', "%s/%s" % (app_name, version))
  107. request.content.seek(0, 0)
  108. args, functionPath = ebrpc.loads(request.content.read())
  109. args, kwargs = args
  110. request.functionPath = functionPath
  111. try:
  112. function = self._getFunction(functionPath)
  113. except Fault, f:
  114. self._cbRender(f, request)
  115. else:
  116. request.setHeader("content-type", "application/octet-stream")
  117. defer.maybeDeferred(function, *args, **kwargs).addErrback(
  118. self._ebRender
  119. ).addCallback(
  120. self._cbRender, request
  121. )
  122. return server.NOT_DONE_YET
  123. def _cbRender(self, result, request):
  124. if isinstance(result, Handler):
  125. result = result.result
  126. if not isinstance(result, Fault):
  127. result = (result,)
  128. try:
  129. s = ebrpc.dumps(result, methodresponse=1)
  130. except Exception, e:
  131. f = Fault(self.FAILURE,
  132. "function:%s can't serialize output: %s" %
  133. (request.functionPath, str_exc(e)))
  134. self._err(f)
  135. s = ebrpc.dumps(f, methodresponse=1)
  136. encoding = request.getHeader("accept-encoding")
  137. if encoding:
  138. encodings = parse_accept_encoding(encoding)
  139. if 'gzip' in encodings or '*' in encodings:
  140. sio = StringIO()
  141. g = GzipFile(fileobj=sio, mode='wb', compresslevel=9)
  142. g.write(s)
  143. g.close()
  144. s = sio.getvalue()
  145. request.setHeader("Content-Encoding", "gzip")
  146. request.setHeader("content-length", str(len(s)))
  147. request.write(s)
  148. request.finish()
  149. def _ebRender(self, failure):
  150. self._err(failure)
  151. if isinstance(failure.value, Fault):
  152. return failure.value
  153. return Fault(self.FAILURE, "An unhandled exception occurred: %s" %
  154. failure.getErrorMessage())
  155. def _getFunction(self, functionPath):
  156. """Given a string, return a function, or raise NoSuchFunction.
  157. This returned function will be called, and should return the result
  158. of the call, a Deferred, or a Fault instance.
  159. Override in subclasses if you want your own policy. The default
  160. policy is that given functionPath 'foo', return the method at
  161. self.ebrpc_foo, i.e. getattr(self, "ebrpc_" + functionPath).
  162. If functionPath contains self.separator, the sub-handler for
  163. the initial prefix is used to search for the remaining path.
  164. """
  165. if functionPath.find(self.separator) != -1:
  166. prefix, functionPath = functionPath.split(self.separator, 1)
  167. handler = self.getSubHandler(prefix)
  168. if handler is None: raise NoSuchFunction(self.NOT_FOUND, "no such subHandler %s" % prefix)
  169. return handler._getFunction(functionPath)
  170. f = getattr(self, "ebrpc_%s" % functionPath, None)
  171. if not f:
  172. raise NoSuchFunction(self.NOT_FOUND, "function %s not found" % functionPath)
  173. elif not callable(f):
  174. raise NoSuchFunction(self.NOT_FOUND, "function %s not callable" % functionPath)
  175. else:
  176. return f
  177. def _listFunctions(self):
  178. """Return a list of the names of all ebrpc methods."""
  179. return reflect.prefixedMethodNames(self.__class__, 'ebrpc_')
  180. class EBRPCIntrospection(EBRPC):
  181. """Implement the EBRPC Introspection API.
  182. By default, the methodHelp method returns the 'help' method attribute,
  183. if it exists, otherwise the __doc__ method attribute, if it exists,
  184. otherwise the empty string.
  185. To enable the methodSignature method, add a 'signature' method attribute
  186. containing a list of lists. See methodSignature's documentation for the
  187. format. Note the type strings should be EBRPC types, not Python types.
  188. """
  189. def __init__(self, parent):
  190. """Implement Introspection support for an EBRPC server.
  191. @param parent: the EBRPC server to add Introspection support to.
  192. """
  193. EBRPC.__init__(self)
  194. self._ebrpc_parent = parent
  195. def ebrpc_listMethods(self):
  196. """Return a list of the method names implemented by this server."""
  197. functions = []
  198. todo = [(self._ebrpc_parent, '')]
  199. while todo:
  200. obj, prefix = todo.pop(0)
  201. functions.extend([ prefix + name for name in obj._listFunctions() ])
  202. todo.extend([ (obj.getSubHandler(name),
  203. prefix + name + obj.separator)
  204. for name in obj.getSubHandlerPrefixes() ])
  205. return functions
  206. ebrpc_listMethods.signature = [['array']]
  207. def ebrpc_methodHelp(self, method):
  208. """Return a documentation string describing the use of the given method.
  209. """
  210. method = self._ebrpc_parent._getFunction(method)
  211. return (getattr(method, 'help', None)
  212. or getattr(method, '__doc__', None) or '')
  213. ebrpc_methodHelp.signature = [['string', 'string']]
  214. def ebrpc_methodSignature(self, method):
  215. """Return a list of type signatures.
  216. Each type signature is a list of the form [rtype, type1, type2, ...]
  217. where rtype is the return type and typeN is the type of the Nth
  218. argument. If no signature information is available, the empty
  219. string is returned.
  220. """
  221. method = self._ebrpc_parent._getFunction(method)
  222. return getattr(method, 'signature', None) or ''
  223. ebrpc_methodSignature.signature = [['array', 'string'],
  224. ['string', 'string']]
  225. def addIntrospection(ebrpc):
  226. """Add Introspection support to an EBRPC server.
  227. @param ebrpc: The ebrpc server to add Introspection support to.
  228. """
  229. ebrpc.putSubHandler('system', EBRPCIntrospection(ebrpc))
  230. class Query(object):
  231. def __init__(self, path, host, method, user=None, password=None, *args):
  232. self.path = path
  233. self.host = host
  234. self.user = user
  235. self.password = password
  236. self.method = method
  237. self.payload = ebrpc.dumps(args, method)
  238. self.deferred = defer.Deferred()
  239. self.decode = False
  240. class QueryProtocol(http.HTTPClient):
  241. # All current queries are pipelined over the connection at
  242. # once. When the connection is made, or as queries are made
  243. # while a connection exists, queries are all sent to the
  244. # server. Pipelining limits can be controlled by the caller.
  245. # When a query completes (see parseResponse), if there are no
  246. # more queries then an idle timeout gets sets.
  247. # The QueryFactory reopens the connection if another query occurs.
  248. #
  249. # twisted_ebrpc does currently provide a mechanism for
  250. # per-query timeouts. This could be added with another
  251. # timeout_call mechanism that calls loseConnection and pops the
  252. # current query with an errback.
  253. timeout = 300 # idle timeout.
  254. def log(self, msg, *a):
  255. print "%s: %s: %r" % (self.peer, msg, a)
  256. def connectionMade(self):
  257. http.HTTPClient.connectionMade(self)
  258. self.current_queries = []
  259. self.timeout_call = None
  260. if pipeline_debug:
  261. p = self.transport.getPeer()
  262. p = "%s:%d" % (p.host, p.port)
  263. self.peer = (id(self.transport), p)
  264. self.factory.connectionMade(self)
  265. def _cancelTimeout(self):
  266. if self.timeout_call and self.timeout_call.active():
  267. self.timeout_call.cancel()
  268. self.timeout_call = None
  269. def connectionLost(self, reason):
  270. http.HTTPClient.connectionLost(self, reason)
  271. if pipeline_debug: self.log('connectionLost', reason.getErrorMessage())
  272. self._cancelTimeout()
  273. if self.current_queries:
  274. # queries failed, put them back
  275. if pipeline_debug: self.log('putting back', [q.method for q in self.current_queries])
  276. self.factory.prependQueries(self.current_queries)
  277. self.factory.connectionLost(self)
  278. def sendCommand(self, command, path):
  279. self.transport.write('%s %s HTTP/1.1\r\n' % (command, path))
  280. def setLineMode(self, rest):
  281. # twisted is stupid.
  282. self.firstLine = 1
  283. return http.HTTPClient.setLineMode(self, rest)
  284. def sendQuery(self):
  285. self._cancelTimeout()
  286. query = self.factory.popQuery()
  287. if pipeline_debug: self.log('sending', query.method)
  288. self.current_queries.append(query)
  289. self.sendCommand('POST', query.path)
  290. self.sendHeader('User-Agent', 'BTL/EBRPC 1.0')
  291. self.sendHeader('Host', query.host)
  292. self.sendHeader('Accept-encoding', 'gzip')
  293. self.sendHeader('Connection', 'Keep-Alive')
  294. self.sendHeader('Content-type', 'application/octet-stream')
  295. self.sendHeader('Content-length', str(len(query.payload)))
  296. #if query.user:
  297. # auth = '%s:%s' % (query.user, query.password)
  298. # auth = auth.encode('base64').strip()
  299. # self.sendHeader('Authorization', 'Basic %s' % (auth,))
  300. self.endHeaders()
  301. self.transport.write(query.payload)
  302. def parseResponse(self, contents):
  303. query = self.current_queries.pop(0)
  304. if pipeline_debug: self.log('responded', query.method)
  305. if not self.current_queries:
  306. assert not self.factory.anyQueries()
  307. assert not self.timeout_call
  308. self.timeout_call = reactor.callLater(self.timeout,
  309. self.transport.loseConnection)
  310. try:
  311. response = ebrpc.loads(contents)
  312. except Exception, e:
  313. query.deferred.errback(failure.Failure())
  314. del query.deferred
  315. else:
  316. query.deferred.callback(response[0][0])
  317. del query.deferred
  318. def badStatus(self, status, message):
  319. query = self.current_queries.pop(0)
  320. if pipeline_debug: self.log('failed', query.method)
  321. try:
  322. raise ValueError(status, message)
  323. except:
  324. query.deferred.errback(failure.Failure())
  325. del query.deferred
  326. self.transport.loseConnection()
  327. def handleStatus(self, version, status, message):
  328. if status != '200':
  329. self.badStatus(status, message)
  330. def handleHeader(self, key, val):
  331. if not self.current_queries[0].decode:
  332. if key.lower() == 'content-encoding' and val.lower() == 'gzip':
  333. self.current_queries[0].decode = True
  334. def handleResponse(self, contents):
  335. if self.current_queries[0].decode:
  336. s = StringIO()
  337. s.write(contents)
  338. s.seek(-1)
  339. g = GzipFile(fileobj=s, mode='rb')
  340. contents = g.read()
  341. g.close()
  342. self.parseResponse(contents)
  343. class QueryFactory(object):
  344. def __init__(self):
  345. self.queries = []
  346. self.instance = None
  347. def connectionMade(self, instance):
  348. self.instance = instance
  349. if pipeline_debug: print 'connection made %s' % str(instance.peer)
  350. while self.anyQueries():
  351. self.instance.sendQuery()
  352. def connectionLost(self, instance):
  353. assert self.instance == instance
  354. if pipeline_debug: print 'connection lost %s' % str(instance.peer)
  355. self.instance = None
  356. def prependQueries(self, queries):
  357. self.queries = queries + self.queries
  358. def popQuery(self):
  359. return self.queries.pop(0)
  360. def anyQueries(self):
  361. return bool(self.queries)
  362. def addQuery(self, query):
  363. self.queries.append(query)
  364. if pipeline_debug: print 'addQuery: %s %s' % (self.instance, self.queries)
  365. if self.instance:
  366. self.instance.sendQuery()
  367. def disconnect(self):
  368. if not self.instance:
  369. return
  370. if not hasattr(self.instance, 'transport'):
  371. return
  372. self.instance.transport.loseConnection()
  373. class PersistantSingletonFactory(QueryFactory, SmartReconnectingClientFactory):
  374. def clientConnectionFailed(self, connector, reason):
  375. if pipeline_debug: print 'clientConnectionFailed %s' % str(connector)
  376. return SmartReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
  377. def clientConnectionLost(self, connector, unused_reason):
  378. self.started = False
  379. if not self.anyQueries():
  380. self.continueTrying = False
  381. return SmartReconnectingClientFactory.clientConnectionLost(self, connector, unused_reason)
  382. class SingletonFactory(QueryFactory, protocol.ClientFactory):
  383. def clientConnectionFailed(self, connector, reason):
  384. if pipeline_debug: print 'clientConnectionFailed %s' % str(connector)
  385. queries = list(self.queries)
  386. del self.queries[:]
  387. for query in queries:
  388. query.deferred.errback(reason)
  389. self.started = False
  390. class Proxy:
  391. """A Proxy for making remote EBRPC calls.
  392. Pass the URL of the remote EBRPC server to the constructor.
  393. Use proxy.callRemote('foobar', *args) to call remote method
  394. 'foobar' with *args.
  395. """
  396. def __init__(self, url, user=None, password=None, retry_forever = True):
  397. """
  398. @type url: C{str}
  399. @param url: The URL to which to post method calls. Calls will be made
  400. over SSL if the scheme is HTTPS. If netloc contains username or
  401. password information, these will be used to authenticate, as long as
  402. the C{user} and C{password} arguments are not specified.
  403. @type user: C{str} or None
  404. @param user: The username with which to authenticate with the server
  405. when making calls. If specified, overrides any username information
  406. embedded in C{url}. If not specified, a value may be taken from C{url}
  407. if present.
  408. @type password: C{str} or None
  409. @param password: The password with which to authenticate with the
  410. server when making calls. If specified, overrides any password
  411. information embedded in C{url}. If not specified, a value may be taken
  412. from C{url} if present.
  413. """
  414. scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
  415. netlocParts = netloc.split('@')
  416. if len(netlocParts) == 2:
  417. userpass = netlocParts.pop(0).split(':')
  418. self.user = userpass.pop(0)
  419. try:
  420. self.password = userpass.pop(0)
  421. except:
  422. self.password = None
  423. else:
  424. self.user = self.password = None
  425. hostport = netlocParts[0].split(':')
  426. self.host = hostport.pop(0)
  427. try:
  428. self.port = int(hostport.pop(0))
  429. except:
  430. self.port = None
  431. self.path = path
  432. if self.path in ['', None]:
  433. self.path = '/'
  434. self.secure = (scheme == 'https')
  435. if user is not None:
  436. self.user = user
  437. if password is not None:
  438. self.password = password
  439. if not retry_forever:
  440. _Factory = SingletonFactory
  441. else:
  442. _Factory = PersistantSingletonFactory
  443. self.factory = _Factory()
  444. self.factory.started = False
  445. self.factory.protocol = QueryProtocol
  446. def callRemote(self, method, *args, **kwargs):
  447. if pipeline_debug: print 'callRemote to %s : %s' % (self.host, method)
  448. args = (args, kwargs)
  449. query = Query(self.path, self.host, method, self.user,
  450. self.password, *args)
  451. self.factory.addQuery(query)
  452. if pipeline_debug: print 'factory started: %s' % self.factory.started
  453. if not self.factory.started:
  454. self.factory.started = True
  455. def connect(host):
  456. if self.secure:
  457. if pipeline_debug: print 'connecting to %s' % str((host, self.port or 443))
  458. from twisted.internet import ssl
  459. reactor.connectSSL(host, self.port or 443,
  460. self.factory, ssl.ClientContextFactory(),
  461. timeout=60)
  462. else:
  463. if pipeline_debug: print 'connecting to %s' % str((host, self.port or 80))
  464. reactor.connectTCP(host, self.port or 80, self.factory,
  465. timeout=60)
  466. df = reactor.resolve(self.host)
  467. df.addCallback(connect)
  468. df.addErrback(query.deferred.errback)
  469. return query.deferred
  470. class AsyncServerProxy(object):
  471. def __init__(self, base_url, username=None, password=None, debug=False,
  472. retry_forever = True):
  473. self.base_url = base_url
  474. self.username = username
  475. self.password = password
  476. self.proxy = Proxy(self.base_url, self.username, self.password, retry_forever)
  477. self.debug = debug
  478. def __getattr__(self, attr):
  479. return self._make_call(attr)
  480. def _make_call(self, methodname):
  481. return lambda *a, **kw : self._method(methodname, *a, **kw)
  482. def _method(self, methodname, *a, **kw):
  483. # in case they have changed
  484. self.proxy.user = self.username
  485. self.proxy.password = self.password
  486. if self.debug:
  487. print ('callRemote:', self.__class__.__name__,
  488. self.base_url, methodname, a, kw)
  489. df = self.proxy.callRemote(methodname, *a, **kw)
  490. return df
  491. class EitherServerProxy(object):
  492. SYNC = 0
  493. ASYNC = 1
  494. SYNC_DEFERRED = 2 # BE CAREFUL to call getResult() on the returned Deferred!
  495. """Server Proxy that supports both asynchronous and synchronous calls."""
  496. def __init__(self, base_url, username = None, password = None, debug = False,
  497. async = ASYNC, retry_forever = True ):
  498. """
  499. The EitherServerProxy can make either synchronous or asynchronous calls.
  500. The default is specified by the async parameter to __init__, but each
  501. individual call can override the default behavior by passing 'async' as
  502. a boolean keyword argument to any method call. The async keyword
  503. argument can also be set to None. However, passing async as
  504. None means simply 'use default behavior'. When calling with async=SYNC,
  505. you should not be in the same thread as the reactor or you risk
  506. blocking the reactor.
  507. @param async: determines whether the default is asynchronous or blocking calls."""
  508. assert async in [SYNC, ASYNC, SYNC_DEFERRED]
  509. self.async = async
  510. self.async_proxy = AsyncServerProxy( base_url, username, password, debug,
  511. retry_forever = retry_forever )
  512. # HERE HACK. retry_forever is not supported by ServerProxy.
  513. self.sync_proxy = ServerProxy( base_url )
  514. def __getattr__(self, attr):
  515. return self._make_call(attr)
  516. def _make_call(self, methodname):
  517. return lambda *a, **kw : self._method(methodname, *a, **kw)
  518. def _method(self, methodname, *a, **kw ):
  519. async = kw.pop('async', self.async)
  520. if async is None:
  521. async = self.async
  522. if async == ASYNC:
  523. df = self.async_proxy._method(methodname, *a, **kw)
  524. elif async == SYNC_DEFERRED:
  525. df = defer.execute(getattr(self.sync_proxy, methodname), *a, **kw)
  526. else:
  527. return self.sync_proxy.__getattr__(methodname)(*a, **kw)
  528. return df
  529. SYNC = EitherServerProxy.SYNC
  530. ASYNC = EitherServerProxy.ASYNC
  531. SYNC_DEFERRED = EitherServerProxy.SYNC_DEFERRED
  532. __all__ = ["EBRPC", "Handler", "NoSuchFunction", "Fault", "Proxy", "AsyncServerProxy", "EitherServerProxy"]