replication.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from BTL import buffer
  2. from BTL import ebencode
  3. from BTL.reactor_magic import reactor
  4. from twisted.internet import protocol
  5. from BTL.protocol import SmartReconnectingClientFactory
  6. from BTL.decorate import decorate_func
  7. class RepeaterProtocol(protocol.Protocol):
  8. def connectionMade(self):
  9. self.factory.children.add(self)
  10. def connectionLost(self, reason):
  11. self.factory.children.remove(self)
  12. def EBRPC_ReplicationServer(port, ebrpc):
  13. factory = protocol.Factory()
  14. factory.children = set()
  15. factory.protocol = RepeaterProtocol
  16. def render(request):
  17. request.content.seek(0, 0)
  18. if hasattr(request.content, 'getvalue'):
  19. c = request.content.getvalue()
  20. for child in factory.children:
  21. # don't worry, write is non-blocking
  22. child.transport.write(ebencode.make_int(len(c)))
  23. child.transport.write(c)
  24. else:
  25. c = buffer.Buffer()
  26. while True:
  27. b = request.content.read(100000)
  28. if len(b) == 0:
  29. break
  30. c.write(b)
  31. c = str(c)
  32. for child in factory.children:
  33. # don't worry, write is non-blocking
  34. child.transport.write(ebencode.make_int(len(c)))
  35. child.transport.write(c)
  36. request.content.seek(0, 0)
  37. ebrpc.render = decorate_func(render, ebrpc.render)
  38. reactor.listenTCP(port, factory)
  39. class ReplicationListener(protocol.Protocol):
  40. def connectionMade(self):
  41. self.length = None
  42. self.buffer = buffer.Buffer()
  43. def dataReceived(self, data):
  44. self.buffer.write(data)
  45. while True:
  46. if self.length is None:
  47. try:
  48. self.length, pos = ebencode.read_int(self.buffer, 0)
  49. except IndexError:
  50. return
  51. self.buffer.drop(pos)
  52. if self.length is not None:
  53. # look for payload
  54. if len(self.buffer) < self.length:
  55. break
  56. self.payloadReceived(self.buffer[:self.length])
  57. self.buffer.drop(self.length)
  58. self.length = None
  59. def payloadReceived(self, payload):
  60. """Override this for when each payload is received.
  61. """
  62. raise NotImplementedError
  63. class RegurgitatingReplicationListener(ReplicationListener):
  64. def payloadReceived(self, payload):
  65. from BTL import ebrpc
  66. args, functionPath = ebrpc.loads(payload)
  67. args, kwargs = args
  68. function = self.factory.ebrpc._getFunction(functionPath)
  69. function(*args, **kwargs)
  70. def EBRPC_ListenerAdaptor(ebrpc):
  71. factory = SmartReconnectingClientFactory()
  72. factory.ebrpc = ebrpc
  73. factory.protocol = RegurgitatingReplicationListener
  74. return factory
  75. if __name__ == '__main__':
  76. from BTL.reactor_magic import reactor
  77. ## server
  78. from BTL import twisted_ebrpc, replication
  79. from twisted.web import server
  80. class Server(twisted_ebrpc.EBRPC):
  81. def ebrpc_ping(self, *args):
  82. print 'server got: ping(%s)' % repr(args)
  83. assert args == (1002,)
  84. return args
  85. r = Server()
  86. replication.EBRPC_ReplicationServer(9000, r)
  87. reactor.listenTCP(7080, server.Site(r))
  88. ## listener 1 (simple)
  89. from twisted.internet.protocol import ClientFactory
  90. from BTL import replication
  91. from BTL.ebencode import ebdecode
  92. class PrintingReplicationListener(replication.ReplicationListener):
  93. def payloadReceived(self, payload):
  94. payload = ebdecode(payload)
  95. print 'listener got:', payload
  96. assert payload == {'a': [[1002], {}], 'q': 'ping', 'y': 'q'}
  97. factory = ClientFactory()
  98. factory.protocol = PrintingReplicationListener
  99. reactor.connectTCP('127.0.0.1', 9000, factory)
  100. ## listener 2 (ebrpc)
  101. from BTL import twisted_ebrpc, replication
  102. from twisted.web import server
  103. class Server(twisted_ebrpc.EBRPC):
  104. def ebrpc_ping(self, *args):
  105. print 'listener got: ping(%s)' % repr(args)
  106. assert args == (1002,)
  107. return args
  108. r = Server()
  109. r = replication.EBRPC_ListenerAdaptor(r)
  110. reactor.connectTCP('127.0.0.1', 9000, r)
  111. ## client
  112. from BTL.twisted_ebrpc import AsyncServerProxy
  113. df = AsyncServerProxy("http://127.0.0.1:7080").ping(1002)
  114. def done(r):
  115. print 'client got:', r
  116. assert r == [1002]
  117. reactor.callLater(0.5, reactor.stop)
  118. df.addCallback(done)
  119. df.addErrback(done)
  120. reactor.run()