Skip to content

Commit b5d9d93

Browse files
author
yidao620c
committed
12.10小节完成
1 parent ccd960b commit b5d9d93

File tree

1 file changed

+179
-176
lines changed

1 file changed

+179
-176
lines changed

source/c12/p10_defining_an_actor_task.rst

+179-176
Original file line numberDiff line numberDiff line change
@@ -5,193 +5,196 @@
55
----------
66
问题
77
----------
8-
You’d like to define tasks with behavior similar to “actors” in the so-called “actor model.”
8+
你想定义跟actor模式中类似“actors”角色的任务
99

1010
|
1111
1212
----------
1313
解决方案
1414
----------
15-
The “actor model” is one of the oldest and most simple approaches to concurrency and
16-
distributed computing. In fact, its underlying simplicity is part of its appeal. In a nutshell,
17-
an actor is a concurrently executing task that simply acts upon messages sent to it. In
18-
response to these messages, it may decide to send further messages to other actors.
19-
Communication with actors is one way and asynchronous. Thus, the sender of a message
20-
does not know when a message actually gets delivered, nor does it receive a response
21-
or acknowledgment that the message has been processed.
22-
Actors are straightforward to define using a combination of a thread and a queue. For
23-
example:
24-
25-
from queue import Queue
26-
from threading import Thread, Event
27-
28-
# Sentinel used for shutdown
29-
class ActorExit(Exception):
30-
pass
31-
32-
class Actor:
33-
def __init__(self):
34-
self._mailbox = Queue()
35-
36-
def send(self, msg):
37-
'''
38-
Send a message to the actor
39-
'''
40-
self._mailbox.put(msg)
41-
42-
def recv(self):
43-
'''
44-
Receive an incoming message
45-
'''
46-
msg = self._mailbox.get()
47-
if msg is ActorExit:
48-
raise ActorExit()
49-
return msg
50-
51-
def close(self):
52-
'''
53-
Close the actor, thus shutting it down
54-
'''
55-
self.send(ActorExit)
56-
57-
def start(self):
58-
'''
59-
Start concurrent execution
60-
'''
61-
self._terminated = Event()
62-
t = Thread(target=self._bootstrap)
63-
64-
t.daemon = True
65-
t.start()
66-
67-
def _bootstrap(self):
68-
try:
69-
self.run()
70-
except ActorExit:
71-
pass
72-
finally:
73-
self._terminated.set()
74-
75-
def join(self):
76-
self._terminated.wait()
77-
78-
def run(self):
79-
'''
80-
Run method to be implemented by the user
81-
'''
15+
actore模式是一种最古老的也是最简单的并行和分布式计算解决方案。
16+
事实上,它天生的简单性是它如此受欢迎的重要原因之一。
17+
简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。
18+
响应这些消息时,它可能还会给其他actor发送更进一步的消息。
19+
actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送,
20+
也不会接收到一个消息已被处理的回应或通知。
21+
22+
结合使用一个线程和一个队列可以很容易的定义actor,例如:
23+
24+
.. code-block:: python
25+
26+
from queue import Queue
27+
from threading import Thread, Event
28+
29+
# Sentinel used for shutdown
30+
class ActorExit(Exception):
31+
pass
32+
33+
class Actor:
34+
def __init__(self):
35+
self._mailbox = Queue()
36+
37+
def send(self, msg):
38+
'''
39+
Send a message to the actor
40+
'''
41+
self._mailbox.put(msg)
42+
43+
def recv(self):
44+
'''
45+
Receive an incoming message
46+
'''
47+
msg = self._mailbox.get()
48+
if msg is ActorExit:
49+
raise ActorExit()
50+
return msg
51+
52+
def close(self):
53+
'''
54+
Close the actor, thus shutting it down
55+
'''
56+
self.send(ActorExit)
57+
58+
def start(self):
59+
'''
60+
Start concurrent execution
61+
'''
62+
self._terminated = Event()
63+
t = Thread(target=self._bootstrap)
64+
65+
t.daemon = True
66+
t.start()
67+
68+
def _bootstrap(self):
69+
try:
70+
self.run()
71+
except ActorExit:
72+
pass
73+
finally:
74+
self._terminated.set()
75+
76+
def join(self):
77+
self._terminated.wait()
78+
79+
def run(self):
80+
'''
81+
Run method to be implemented by the user
82+
'''
83+
while True:
84+
msg = self.recv()
85+
86+
# Sample ActorTask
87+
class PrintActor(Actor):
88+
def run(self):
89+
while True:
90+
msg = self.recv()
91+
print('Got:', msg)
92+
93+
# Sample use
94+
p = PrintActor()
95+
p.start()
96+
p.send('Hello')
97+
p.send('World')
98+
p.close()
99+
p.join()
100+
101+
这个例子中,你使用actor实例的 ``send()`` 方法发送消息给它们。
102+
其机制是,这个方法会将消息放入一个队里中,
103+
然后将其转交给处理被接受消息的一个内部线程。
104+
``close()`` 方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。
105+
用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。
106+
``ActorExit`` 异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求
107+
(异常被get()方法抛出并传播出去)。
108+
109+
如果你放宽对于同步和异步消息发送的要求,
110+
类actor对象还可以通过生成器来简化定义。例如:
111+
112+
.. code-block:: python
113+
114+
def print_actor():
82115
while True:
83-
msg = self.recv()
84116
85-
# Sample ActorTask
86-
class PrintActor(Actor):
87-
def run(self):
88-
while True:
89-
msg = self.recv()
90-
print('Got:', msg)
91-
92-
# Sample use
93-
p = PrintActor()
94-
p.start()
95-
p.send('Hello')
96-
p.send('World')
97-
p.close()
98-
p.join()
99-
100-
In this example, Actor instances are things that you simply send a message to using
101-
their send() method. Under the covers, this places the message on a queue and hands
102-
it off to an internal thread that runs to process the received messages. The close()
103-
method is programmed to shut down the actor by placing a special sentinel value
104-
(ActorExit) on the queue. Users define new actors by inheriting from Actor and re‐
105-
defining the run() method to implement their custom processing. The usage of the
106-
ActorExit exception is such that user-defined code can be programmed to catch the
107-
termination request and handle it if appropriate (the exception is raised by the get()
108-
method and propagated).
109-
If you relax the requirement of concurrent and asynchronous message delivery, actor-
110-
like objects can also be minimally defined by generators. For example:
111-
112-
def print_actor():
113-
while True:
114-
115-
try:
116-
msg = yield # Get a message
117-
print('Got:', msg)
118-
except GeneratorExit:
119-
print('Actor terminating')
120-
121-
# Sample use
122-
p = print_actor()
123-
next(p) # Advance to the yield (ready to receive)
124-
p.send('Hello')
125-
p.send('World')
126-
p.close()
117+
try:
118+
msg = yield # Get a message
119+
print('Got:', msg)
120+
except GeneratorExit:
121+
print('Actor terminating')
122+
123+
# Sample use
124+
p = print_actor()
125+
next(p) # Advance to the yield (ready to receive)
126+
p.send('Hello')
127+
p.send('World')
128+
p.close()
127129
128130
|
129131
130132
----------
131133
讨论
132134
----------
133-
Part of the appeal of actors is their underlying simplicity. In practice, there is just one
134-
core operation, send(). Plus, the general concept of a “message” in actor-based systems
135-
is something that can be expanded in many different directions. For example, you could
136-
pass tagged messages in the form of tuples and have actors take different courses of
137-
action like this:
138-
139-
class TaggedActor(Actor):
140-
def run(self):
141-
while True:
142-
tag, *payload = self.recv()
143-
getattr(self,'do_'+tag)(*payload)
144-
145-
# Methods correponding to different message tags
146-
def do_A(self, x):
147-
print('Running A', x)
148-
149-
def do_B(self, x, y):
150-
print('Running B', x, y)
151-
152-
# Example
153-
a = TaggedActor()
154-
a.start()
155-
a.send(('A', 1)) # Invokes do_A(1)
156-
a.send(('B', 2, 3)) # Invokes do_B(2,3)
157-
158-
As another example, here is a variation of an actor that allows arbitrary functions to be
159-
executed in a worker and results to be communicated back using a special Result object:
160-
161-
from threading import Event
162-
class Result:
163-
def __init__(self):
164-
self._evt = Event()
165-
self._result = None
166-
167-
def set_result(self, value):
168-
self._result = value
169-
170-
self._evt.set()
171-
172-
def result(self):
173-
self._evt.wait()
174-
return self._result
175-
176-
class Worker(Actor):
177-
def submit(self, func, *args, **kwargs):
178-
r = Result()
179-
self.send((func, args, kwargs, r))
180-
return r
181-
182-
def run(self):
183-
while True:
184-
func, args, kwargs, r = self.recv()
185-
r.set_result(func(*args, **kwargs))
186-
187-
# Example use
188-
worker = Worker()
189-
worker.start()
190-
r = worker.submit(pow, 2, 3)
191-
print(r.result())
192-
193-
Last, but not least, the concept of “sending” a task a message is something that can be
194-
scaled up into systems involving multiple processes or even large distributed systems.
195-
For example, the send() method of an actor-like object could be programmed to trans‐
196-
mit data on a socket connection or deliver it via some kind of messaging infrastructure
197-
(e.g., AMQP, ZMQ, etc.).
135+
actor模式的魅力就在于它的简单性。
136+
实际上,这里仅仅只有一个核心操作 ``send()`` .
137+
甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。
138+
例如,你可以以元组形式传递标签消息,让actor执行不同的操作,如下:
139+
140+
.. code-block:: python
141+
142+
class TaggedActor(Actor):
143+
def run(self):
144+
while True:
145+
tag, *payload = self.recv()
146+
getattr(self,'do_'+tag)(*payload)
147+
148+
# Methods correponding to different message tags
149+
def do_A(self, x):
150+
print('Running A', x)
151+
152+
def do_B(self, x, y):
153+
print('Running B', x, y)
154+
155+
# Example
156+
a = TaggedActor()
157+
a.start()
158+
a.send(('A', 1)) # Invokes do_A(1)
159+
a.send(('B', 2, 3)) # Invokes do_B(2,3)
160+
161+
作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数,
162+
并且通过一个特殊的Result对象返回结果:
163+
164+
.. code-block:: python
165+
166+
from threading import Event
167+
class Result:
168+
def __init__(self):
169+
self._evt = Event()
170+
self._result = None
171+
172+
def set_result(self, value):
173+
self._result = value
174+
175+
self._evt.set()
176+
177+
def result(self):
178+
self._evt.wait()
179+
return self._result
180+
181+
class Worker(Actor):
182+
def submit(self, func, *args, **kwargs):
183+
r = Result()
184+
self.send((func, args, kwargs, r))
185+
return r
186+
187+
def run(self):
188+
while True:
189+
func, args, kwargs, r = self.recv()
190+
r.set_result(func(*args, **kwargs))
191+
192+
# Example use
193+
worker = Worker()
194+
worker.start()
195+
r = worker.submit(pow, 2, 3)
196+
print(r.result())
197+
198+
最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。
199+
例如,一个类actor对象的 ``send()`` 方法可以被编程让它能在一个套接字连接上传输数据
200+
或通过某些消息中间件(比如AMQP、ZMQ等)来发送。

0 commit comments

Comments
 (0)