# -*- Mode: Python; tab-width: 4 -*- # Copyright 1999, 2000 by eGroups, Inc. # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of # eGroups not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # EGROUPS DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL EGROUPS BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # There are two RPC implementations here. # The first ('rpc') attempts to be as transparent as possible, and # passes along 'internal' methods like __getattr__, __getitem__, and # __del__. It is rather 'chatty', and may not be suitable for a # high-performance system. # The second ('fastrpc') is less flexible, but has much less overhead, # and is easier to use from an asynchronous client. import marshal import socket import string import sys import types import asyncore import asynchat from producers import scanning_producer from counter import counter MY_NAME = string.split (socket.gethostname(), '.')[0] # =========================================================================== # RPC server # =========================================================================== # marshal is good for low-level data structures. # but when passing an 'object' (any non-marshallable object) # we really want to pass a 'reference', which will act on # the other side as a proxy. How transparent can we make this? class rpc_channel (asynchat.async_chat): 'Simple RPC server.' # a 'packet': NNNNNNNNmmmmmmmmmmmmmmmm # (hex length in 8 bytes, followed by marshal'd packet data) # same protocol used in both directions. STATE_LENGTH = 'length state' STATE_PACKET = 'packet state' ac_out_buffer_size = 65536 request_counter = counter() exception_counter = counter() client_counter = counter() def __init__ (self, root, conn, addr): self.root = root self.addr = addr asynchat.async_chat.__init__ (self, conn) self.pstate = self.STATE_LENGTH self.set_terminator (8) self.buffer = [] self.proxies = {} rid = id(root) self.new_reference (root) p = marshal.dumps ((rid,)) # send root oid to the other side self.push ('%08x%s' % (len(p), p)) self.client_counter.increment() def new_reference (self, object): oid = id(object) ignore, refcnt = self.proxies.get (oid, (None, 0)) self.proxies[oid] = (object, refcnt + 1) def forget_reference (self, oid): object, refcnt = self.proxies.get (oid, (None, 0)) if refcnt > 1: self.proxies[oid] = (object, refcnt - 1) else: del self.proxies[oid] def log (self, *ignore): pass def collect_incoming_data (self, data): self.buffer.append (data) def found_terminator (self): self.buffer, data = [], string.join (self.buffer, '') if self.pstate is self.STATE_LENGTH: packet_length = string.atoi (data, 16) self.set_terminator (packet_length) self.pstate = self.STATE_PACKET else: self.set_terminator (8) self.pstate = self.STATE_LENGTH oid, kind, arg = marshal.loads (data) obj, refcnt = self.proxies[oid] e = None reply_kind = 2 try: if kind == 0: # __call__ result = apply (obj, arg) elif kind == 1: # __getattr__ result = getattr (obj, arg) elif kind == 2: # __setattr__ key, value = arg result = setattr (obj, key, value) elif kind == 3: # __repr__ result = repr(obj) elif kind == 4: # __del__ self.forget_reference (oid) result = None elif kind == 5: # __getitem__ result = obj[arg] elif kind == 6: # __setitem__ (key, value) = arg obj[key] = value result = None elif kind == 7: # __len__ result = len(obj) except: reply_kind = 1 (file,fun,line), t, v, tbinfo = asyncore.compact_traceback() result = '%s:%s:%s:%s (%s:%s)' % (MY_NAME, file, fun, line, t, str(v)) self.log_info (result, 'error') self.exception_counter.increment() self.request_counter.increment() # optimize a common case if type(result) is types.InstanceType: can_marshal = 0 else: can_marshal = 1 try: rb = marshal.dumps ((reply_kind, result)) except ValueError: can_marshal = 0 if not can_marshal: # unmarshallable object, return a reference rid = id(result) self.new_reference (result) rb = marshal.dumps ((0, rid)) self.push_with_producer ( scanning_producer ( ('%08x' % len(rb)) + rb, buffer_size = 65536 ) ) class rpc_server_root: pass class rpc_server (asyncore.dispatcher): def __init__ (self, root, address = ('', 8746)): self.create_socket (socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind (address) self.listen (128) self.root = root def handle_accept (self): conn, addr = self.accept() rpc_channel (self.root, conn, addr) # =========================================================================== # Fast RPC server # =========================================================================== # no proxies, request consists # of a 'chain' of getattrs terminated by a __call__. # Protocol: # .. ( , , ... ) # => ( , , ... ) # # # (, ) # path: tuple of strings # params: tuple of objects class fastrpc_channel (asynchat.async_chat): 'Simple RPC server' # a 'packet': NNNNNNNNmmmmmmmmmmmmmmmm # (hex length in 8 bytes, followed by marshal'd packet data) # same protocol used in both directions. # A request consists of (, ) # where is a list of strings (eqv to string.split ('a.b.c', '.')) STATE_LENGTH = 'length state' STATE_PACKET = 'packet state' def __init__ (self, root, conn, addr): self.root = root self.addr = addr asynchat.async_chat.__init__ (self, conn) self.pstate = self.STATE_LENGTH self.set_terminator (8) self.buffer = [] def log (*ignore): pass def collect_incoming_data (self, data): self.buffer.append (data) def found_terminator (self): self.buffer, data = [], string.join (self.buffer, '') if self.pstate is self.STATE_LENGTH: packet_length = string.atoi (data, 16) self.set_terminator (packet_length) self.pstate = self.STATE_PACKET else: self.set_terminator (8) self.pstate = self.STATE_LENGTH (path, params) = marshal.loads (data) o = self.root e = None try: for p in path: o = getattr (o, p) result = apply (o, params) except: e = repr (asyncore.compact_traceback()) result = None rb = marshal.dumps ((e,result)) self.push (('%08x' % len(rb)) + rb) class fastrpc_server (asyncore.dispatcher): def __init__ (self, root, address = ('', 8748)): self.create_socket (socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind (address) self.listen (128) self.root = root def handle_accept (self): conn, addr = self.accept() fastrpc_channel (self.root, conn, addr) # =========================================================================== if __name__ == '__main__': class thing: def __del__ (self): print 'a thing has gone away %08x' % id(self) class sample_calc: def product (self, *values): return reduce (lambda a,b: a*b, values, 1) def sum (self, *values): return reduce (lambda a,b: a+b, values, 0) def eval (self, string): return eval (string) def make_a_thing (self): return thing() import sys if '-f' in sys.argv: server_class = fastrpc_server address = ('', 8748) else: server_class = rpc_server address = ('', 8746) root = rpc_server_root() root.calc = sample_calc() root.sys = sys rs = server_class (root, address) asyncore.loop()