Skip to content

Commit 1e1333b

Browse files
committed
add task master, worker sample
1 parent 3d77f04 commit 1e1333b

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

py3/multitask/task_master.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
4+
import random, time, queue
5+
from multiprocessing.managers import BaseManager
6+
7+
# 发送任务的队列:
8+
task_queue = queue.Queue()
9+
# 接收结果的队列:
10+
result_queue = queue.Queue()
11+
12+
# 从BaseManager继承的QueueManager:
13+
class QueueManager(BaseManager):
14+
pass
15+
16+
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
17+
QueueManager.register('get_task_queue', callable=lambda: task_queue)
18+
QueueManager.register('get_result_queue', callable=lambda: result_queue)
19+
# 绑定端口5000, 设置验证码'abc':
20+
manager = QueueManager(address=('', 5000), authkey=b'abc')
21+
# 启动Queue:
22+
manager.start()
23+
# 获得通过网络访问的Queue对象:
24+
task = manager.get_task_queue()
25+
result = manager.get_result_queue()
26+
# 放几个任务进去:
27+
for i in range(10):
28+
n = random.randint(0, 10000)
29+
print('Put task %d...' % n)
30+
task.put(n)
31+
# 从result队列读取结果:
32+
print('Try get results...')
33+
for i in range(10):
34+
r = result.get(timeout=10)
35+
print('Result: %s' % r)
36+
# 关闭:
37+
manager.shutdown()
38+
print('master exit.')

py3/multitask/task_worker.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
4+
import time, sys, queue
5+
from multiprocessing.managers import BaseManager
6+
7+
# 创建类似的QueueManager:
8+
class QueueManager(BaseManager):
9+
pass
10+
11+
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
12+
QueueManager.register('get_task_queue')
13+
QueueManager.register('get_result_queue')
14+
15+
# 连接到服务器,也就是运行taskmanager.py的机器:
16+
server_addr = '127.0.0.1'
17+
print('Connect to server %s...' % server_addr)
18+
# 端口和验证码注意保持与taskmanager.py设置的完全一致:
19+
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
20+
# 从网络连接:
21+
m.connect()
22+
# 获取Queue的对象:
23+
task = m.get_task_queue()
24+
result = m.get_result_queue()
25+
# 从task队列取任务,并把结果写入result队列:
26+
for i in range(10):
27+
try:
28+
n = task.get(timeout=1)
29+
print('run task %d * %d...' % (n, n))
30+
r = '%d * %d = %d' % (n, n, n*n)
31+
time.sleep(1)
32+
result.put(r)
33+
except Queue.Empty:
34+
print('task queue is empty.')
35+
# 处理结束:
36+
print('worker exit.')

0 commit comments

Comments
 (0)