|
5 | 5 | ----------
|
6 | 6 | 问题
|
7 | 7 | ----------
|
8 |
| -You want to implement simple remote procedure call (RPC) on top of a message passing |
9 |
| -layer, such as sockets, multiprocessing connections, or ZeroMQ. |
| 8 | +你想在一个消息传输层如 ``sockets`` 、``multiprocessing connections`` 或 ``ZeroMQ`` |
| 9 | +的基础之上实现一个简单的远程过程调用(RPC)。 |
10 | 10 |
|
11 | 11 | |
|
12 | 12 |
|
13 | 13 | ----------
|
14 | 14 | 解决方案
|
15 | 15 | ----------
|
16 |
| -RPC is easy to implement by encoding function requests, arguments, and return values |
17 |
| -using pickle, and passing the pickled byte strings between interpreters. Here is an |
18 |
| -example of a simple RPC handler that could be incorporated into a server: |
19 |
| - |
20 |
| -# rpcserver.py |
21 |
| - |
22 |
| -import pickle |
23 |
| -class RPCHandler: |
24 |
| - def __init__(self): |
25 |
| - self._functions = { } |
26 |
| - |
27 |
| - def register_function(self, func): |
28 |
| - self._functions[func.__name__] = func |
29 |
| - |
30 |
| - def handle_connection(self, connection): |
31 |
| - try: |
32 |
| - while True: |
33 |
| - # Receive a message |
34 |
| - func_name, args, kwargs = pickle.loads(connection.recv()) |
35 |
| - # Run the RPC and send a response |
36 |
| - try: |
37 |
| - r = self._functions[func_name](*args,**kwargs) |
38 |
| - connection.send(pickle.dumps(r)) |
39 |
| - except Exception as e: |
40 |
| - connection.send(pickle.dumps(e)) |
41 |
| - except EOFError: |
42 |
| - pass |
43 |
| -
|
44 |
| -To use this handler, you need to add it into a messaging server. There are many possible |
45 |
| -choices, but the multiprocessing library provides a simple option. Here is an example |
46 |
| -RPC server: |
47 |
| - |
48 |
| -from multiprocessing.connection import Listener |
49 |
| -from threading import Thread |
50 |
| - |
51 |
| -def rpc_server(handler, address, authkey): |
52 |
| - sock = Listener(address, authkey=authkey) |
53 |
| - while True: |
54 |
| - client = sock.accept() |
55 |
| - t = Thread(target=handler.handle_connection, args=(client,)) |
56 |
| - t.daemon = True |
57 |
| - t.start() |
58 |
| - |
59 |
| -# Some remote functions |
60 |
| -def add(x, y): |
61 |
| - return x + y |
62 |
| - |
63 |
| -def sub(x, y): |
64 |
| - return x - y |
65 |
| - |
66 |
| -# Register with a handler |
67 |
| -handler = RPCHandler() |
68 |
| -handler.register_function(add) |
69 |
| -handler.register_function(sub) |
70 |
| - |
71 |
| -# Run the server |
72 |
| -rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo') |
73 |
| - |
74 |
| -To access the server from a remote client, you need to create a corresponding RPC proxy |
75 |
| -class that forwards requests. For example: |
76 |
| - |
77 |
| -import pickle |
78 |
| - |
79 |
| -class RPCProxy: |
80 |
| - def __init__(self, connection): |
81 |
| - self._connection = connection |
82 |
| - def __getattr__(self, name): |
83 |
| - def do_rpc(*args, **kwargs): |
84 |
| - self._connection.send(pickle.dumps((name, args, kwargs))) |
85 |
| - result = pickle.loads(self._connection.recv()) |
86 |
| - if isinstance(result, Exception): |
87 |
| - raise result |
88 |
| - return result |
89 |
| - return do_rpc |
90 |
| -
|
91 |
| -To use the proxy, you wrap it around a connection to the server. For example: |
92 |
| - |
93 |
| ->>> from multiprocessing.connection import Client |
94 |
| ->>> c = Client(('localhost', 17000), authkey=b'peekaboo') |
95 |
| ->>> proxy = RPCProxy(c) |
96 |
| ->>> proxy.add(2, 3) |
97 |
| - |
98 |
| -5 |
99 |
| ->>> proxy.sub(2, 3) |
100 |
| --1 |
101 |
| ->>> proxy.sub([1, 2], 4) |
102 |
| -Traceback (most recent call last): |
103 |
| - File "<stdin>", line 1, in <module> |
104 |
| - File "rpcserver.py", line 37, in do_rpc |
105 |
| - raise result |
106 |
| -TypeError: unsupported operand type(s) for -: 'list' and 'int' |
107 |
| ->>> |
108 |
| - |
109 |
| -It should be noted that many messaging layers (such as multiprocessing) already se‐ |
110 |
| -rialize data using pickle. If this is the case, the pickle.dumps() and pickle.loads() |
111 |
| -calls can be eliminated. |
| 16 | + |
| 17 | +将函数请求、参数和返回值使用pickle编码后,在不同的解释器直接传送pickle字节字符串,可以很容易的实现RPC。 |
| 18 | +下面是一个简单的PRC处理器,可以被整合到一个服务器中去: |
| 19 | + |
| 20 | +.. code-block:: python |
| 21 | +
|
| 22 | + # rpcserver.py |
| 23 | +
|
| 24 | + import pickle |
| 25 | + class RPCHandler: |
| 26 | + def __init__(self): |
| 27 | + self._functions = { } |
| 28 | +
|
| 29 | + def register_function(self, func): |
| 30 | + self._functions[func.__name__] = func |
| 31 | +
|
| 32 | + def handle_connection(self, connection): |
| 33 | + try: |
| 34 | + while True: |
| 35 | + # Receive a message |
| 36 | + func_name, args, kwargs = pickle.loads(connection.recv()) |
| 37 | + # Run the RPC and send a response |
| 38 | + try: |
| 39 | + r = self._functions[func_name](*args,**kwargs) |
| 40 | + connection.send(pickle.dumps(r)) |
| 41 | + except Exception as e: |
| 42 | + connection.send(pickle.dumps(e)) |
| 43 | + except EOFError: |
| 44 | + pass |
| 45 | +
|
| 46 | +要使用这个处理器,你需要将它加入到一个消息服务器中。你有很多种选择, |
| 47 | +但是使用 ``multiprocessing`` 库是最简单的。下面是一个RPC服务器例子: |
| 48 | + |
| 49 | +.. code-block:: python |
| 50 | +
|
| 51 | + from multiprocessing.connection import Listener |
| 52 | + from threading import Thread |
| 53 | +
|
| 54 | + def rpc_server(handler, address, authkey): |
| 55 | + sock = Listener(address, authkey=authkey) |
| 56 | + while True: |
| 57 | + client = sock.accept() |
| 58 | + t = Thread(target=handler.handle_connection, args=(client,)) |
| 59 | + t.daemon = True |
| 60 | + t.start() |
| 61 | +
|
| 62 | + # Some remote functions |
| 63 | + def add(x, y): |
| 64 | + return x + y |
| 65 | +
|
| 66 | + def sub(x, y): |
| 67 | + return x - y |
| 68 | +
|
| 69 | + # Register with a handler |
| 70 | + handler = RPCHandler() |
| 71 | + handler.register_function(add) |
| 72 | + handler.register_function(sub) |
| 73 | +
|
| 74 | + # Run the server |
| 75 | + rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo') |
| 76 | +
|
| 77 | +为了从一个远程客户端访问服务器,你需要创建一个对应的用来传送请求的RPC代理类。例如 |
| 78 | + |
| 79 | +.. code-block:: python |
| 80 | +
|
| 81 | + import pickle |
| 82 | +
|
| 83 | + class RPCProxy: |
| 84 | + def __init__(self, connection): |
| 85 | + self._connection = connection |
| 86 | + def __getattr__(self, name): |
| 87 | + def do_rpc(*args, **kwargs): |
| 88 | + self._connection.send(pickle.dumps((name, args, kwargs))) |
| 89 | + result = pickle.loads(self._connection.recv()) |
| 90 | + if isinstance(result, Exception): |
| 91 | + raise result |
| 92 | + return result |
| 93 | + return do_rpc |
| 94 | +
|
| 95 | +要使用这个代理类,你需要将其包装到一个服务器的连接上面,例如: |
| 96 | + |
| 97 | +.. code-block:: python |
| 98 | +
|
| 99 | + >>> from multiprocessing.connection import Client |
| 100 | + >>> c = Client(('localhost', 17000), authkey=b'peekaboo') |
| 101 | + >>> proxy = RPCProxy(c) |
| 102 | + >>> proxy.add(2, 3) |
| 103 | +
|
| 104 | + 5 |
| 105 | + >>> proxy.sub(2, 3) |
| 106 | + -1 |
| 107 | + >>> proxy.sub([1, 2], 4) |
| 108 | + Traceback (most recent call last): |
| 109 | + File "<stdin>", line 1, in <module> |
| 110 | + File "rpcserver.py", line 37, in do_rpc |
| 111 | + raise result |
| 112 | + TypeError: unsupported operand type(s) for -: 'list' and 'int' |
| 113 | + >>> |
| 114 | +
|
| 115 | +要注意的是很多消息层(比如 ``multiprocessing`` )已经使用pickle序列化了数据。 |
| 116 | +如果是这样的话,对 ``pickle.dumps()`` 和 ``pickle.loads()`` 的调用要去掉。 |
112 | 117 |
|
113 | 118 | |
|
114 | 119 |
|
115 | 120 | ----------
|
116 | 121 | 讨论
|
117 | 122 | ----------
|
118 |
| -The general idea of the RPCHandler and RPCProxy classes is relatively simple. If a client |
119 |
| -wants to call a remote function, such as foo(1, 2, z=3), the proxy class creates a tuple |
120 |
| -('foo', (1, 2), {'z': 3}) that contains the function name and arguments. This |
121 |
| -tuple is pickled and sent over the connection. This is performed in the do_rpc() closure |
122 |
| -that’s returned by the __getattr__() method of RPCProxy. The server receives and |
123 |
| -unpickles the message, looks up the function name to see if it’s registered, and executes |
124 |
| -it with the given arguments. The result (or exception) is then pickled and sent back. |
125 |
| -As shown, the example relies on multiprocessing for communication. However, this |
126 |
| -approach could be made to work with just about any other messaging system. For ex‐ |
127 |
| -ample, if you want to implement RPC over ZeroMQ, just replace the connection objects |
128 |
| -with an appropriate ZeroMQ socket object. |
129 |
| -Given the reliance on pickle, security is a major concern (because a clever hacker can |
130 |
| -create messages that make arbitrary functions execute during unpickling). In particular, |
131 |
| -you should never allow RPC from untrusted or unauthenticated clients. In particular, |
132 |
| -you definitely don’t want to allow access from just any machine on the Internet—this |
133 |
| -should really only be used internally, behind a firewall, and not exposed to the rest of |
134 |
| -the world. |
135 |
| -As an alternative to pickle, you might consider the use of JSON, XML, or some other |
136 |
| -data encoding for serialization. For example, this recipe is fairly easy to adapt to JSON |
137 |
| -encoding |
138 |
| -if you simply replace pickle.loads() and pickle.dumps() with |
139 |
| -json.loads() and json.dumps(). For example: |
140 |
| - |
141 |
| -# jsonrpcserver.py |
142 |
| -import json |
143 |
| - |
144 |
| -class RPCHandler: |
145 |
| - def __init__(self): |
146 |
| - self._functions = { } |
147 |
| - |
148 |
| - def register_function(self, func): |
149 |
| - self._functions[func.__name__] = func |
150 |
| - |
151 |
| - def handle_connection(self, connection): |
152 |
| - try: |
153 |
| - while True: |
154 |
| - # Receive a message |
155 |
| - func_name, args, kwargs = json.loads(connection.recv()) |
156 |
| - # Run the RPC and send a response |
157 |
| - try: |
158 |
| - r = self._functions[func_name](*args,**kwargs) |
159 |
| - connection.send(json.dumps(r)) |
160 |
| - except Exception as e: |
161 |
| - connection.send(json.dumps(str(e))) |
162 |
| - except EOFError: |
163 |
| - pass |
164 |
| -
|
165 |
| -# jsonrpcclient.py |
166 |
| -import json |
167 |
| - |
168 |
| -class RPCProxy: |
169 |
| - def __init__(self, connection): |
170 |
| - self._connection = connection |
171 |
| - def __getattr__(self, name): |
172 |
| - def do_rpc(*args, **kwargs): |
173 |
| - self._connection.send(json.dumps((name, args, kwargs))) |
174 |
| - result = json.loads(self._connection.recv()) |
175 |
| - return result |
176 |
| - return do_rpc |
177 |
| -
|
178 |
| -One complicated factor in implementing RPC is how to handle exceptions. At the very |
179 |
| -least, the server shouldn’t crash if an exception is raised by a method. However, the |
180 |
| -means by which the exception gets reported back to the client requires some study. If |
181 |
| -you’re using pickle, exception instances can often be serialized and reraised in the |
182 |
| -client. If you’re using some other protocol, you might have to think of an alternative |
183 |
| -approach. At the very least, you would probably want to return the exception string in |
184 |
| -the response. This is the approach taken in the JSON example. |
185 |
| -For another example of an RPC implementation, it can be useful to look at the imple‐ |
186 |
| -mentation of the SimpleXMLRPCServer and ServerProxy classes used in XML-RPC, as |
187 |
| -described in Recipe 11.6. |
| 123 | +``RPCHandler`` 和 ``RPCProxy`` 的基本思路是很比较简单的。 |
| 124 | +如果一个客户端想要调用一个远程函数,比如 ``foo(1, 2, z=3)`` |
| 125 | +,代理类创建一个包含了函数名和参数的元组 ``('foo', (1, 2), {'z': 3})`` 。 |
| 126 | +这个元组被pickle序列化后通过网络连接发生出去。 |
| 127 | +这一步在 ``RPCProxy`` 的 ``__getattr__()`` 方法返回的 ``do_rpc()`` 闭包中完成。 |
| 128 | +服务器接收后通过pickle反序列化消息,查找函数名看看是否已经注册过,然后执行相应的函数。 |
| 129 | +执行结果(或异常)被pickle序列化后返回发送给客户端。我们的实例需要依赖 ``multiprocessing`` 进行通信。 |
| 130 | +不过,这种方式可以适用于其他任何消息系统。例如,如果你想在ZeroMQ之上实习RPC, |
| 131 | +仅仅只需要将连接对象换成合适的ZeroMQ的socket对象即可。 |
| 132 | +
|
| 133 | +由于底层需要依赖pickle,那么安全问题就需要考虑了 |
| 134 | +(因为一个聪明的黑客可以创建特定的消息,能够让任意函数通过pickle反序列化后被执行)。 |
| 135 | +因此你永远不要允许来自不信任或未认证的客户端的RPC。特别是你绝对不要允许来自Internet的任意机器的访问, |
| 136 | +这种只能在内部被使用,位于防火墙后面并且不要对外暴露。 |
| 137 | +
|
| 138 | +作为pickle的替代,你也许可以考虑使用JSON、XML或一些其他的编码格式来序列化消息。 |
| 139 | +例如,本机实例可以很容易的改写成JSON编码方案。还需要将 ``pickle.loads()`` 和 ``pickle.dumps()`` |
| 140 | +替换成 ``json.loads()`` 和 ``json.dumps()`` 即可: |
| 141 | +
|
| 142 | +.. code-block:: python |
| 143 | +
|
| 144 | + # jsonrpcserver.py |
| 145 | + import json |
| 146 | +
|
| 147 | + class RPCHandler: |
| 148 | + def __init__(self): |
| 149 | + self._functions = { } |
| 150 | +
|
| 151 | + def register_function(self, func): |
| 152 | + self._functions[func.__name__] = func |
| 153 | +
|
| 154 | + def handle_connection(self, connection): |
| 155 | + try: |
| 156 | + while True: |
| 157 | + # Receive a message |
| 158 | + func_name, args, kwargs = json.loads(connection.recv()) |
| 159 | + # Run the RPC and send a response |
| 160 | + try: |
| 161 | + r = self._functions[func_name](*args,**kwargs) |
| 162 | + connection.send(json.dumps(r)) |
| 163 | + except Exception as e: |
| 164 | + connection.send(json.dumps(str(e))) |
| 165 | + except EOFError: |
| 166 | + pass |
| 167 | +
|
| 168 | + # jsonrpcclient.py |
| 169 | + import json |
| 170 | +
|
| 171 | + class RPCProxy: |
| 172 | + def __init__(self, connection): |
| 173 | + self._connection = connection |
| 174 | + def __getattr__(self, name): |
| 175 | + def do_rpc(*args, **kwargs): |
| 176 | + self._connection.send(json.dumps((name, args, kwargs))) |
| 177 | + result = json.loads(self._connection.recv()) |
| 178 | + return result |
| 179 | + return do_rpc |
| 180 | +
|
| 181 | +实现RPC的一个比较复杂的问题是如何去处理异常。至少,当方法产生异常时服务器不应该奔溃。 |
| 182 | +因此,返回给客户端的异常所代表的含义就要好好设计了。 |
| 183 | +如果你使用pickle,异常对象实例在客户端能被反序列化并抛出。如果你使用其他的协议,那得想想另外的方法了。 |
| 184 | +不过至少,你应该在响应中返回异常字符串。我们在JSON的例子中就是使用的这种方式。 |
| 185 | +
|
| 186 | +对于其他的RPC实现例子,我推荐你看看在XML-RPC中使用的 ``SimpleXMLRPCServer`` 和 ``ServerProxy`` 的实现, |
| 187 | +也就是11.6小节中的内容。 |
0 commit comments