1 ================= |
|
2 pyams_zmq package |
|
3 ================= |
|
4 |
|
5 PyAMS 'ZMQ' package can be used to build wrapper around ØMQ (or ZeroMQ) library to exchange |
|
6 messages following all ØMQ possible usages. |
|
7 |
|
8 At least two components are required to build a ØMQ based application: |
|
9 |
|
10 - a ØMQ server |
|
11 |
|
12 - a ØMQ client |
|
13 |
|
14 The way client and server communicate depends on used ØMQ protocol. |
|
15 |
|
16 We will take example on the medias conversion utility provided by 'pyams_media' package, which allows you |
|
17 to automatically convert medias files (videos...) asynchronously as soon as they are uploaded. The conversion |
|
18 process is a background process so doesn't return any result. |
|
19 |
|
20 The conversion process is a simple ØMQ process: |
|
21 |
|
22 >>> converter_address = '127.0.0.1:5555' |
|
23 |
|
24 >>> from pyams_zmq.process import ZMQProcess, process_exit_func |
|
25 >>> |
|
26 >>> class MediasConversionProcess(ZMQProcess): |
|
27 ... """Medias conversion process""" |
|
28 |
|
29 >>> from multiprocessing import Process |
|
30 >>> |
|
31 >>> class ConversionProcess(Process): |
|
32 ... """Conversion manager process""" |
|
33 ... |
|
34 ... def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs): |
|
35 ... Process.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs) |
|
36 ... self.settings = settings |
|
37 ... |
|
38 ... def run(self): |
|
39 ... settings = self.settings |
|
40 ... path = settings['path'] |
|
41 ... format = settings['format'] |
|
42 ... # just image you're doing anything you want with these settings! |
|
43 |
|
44 To be sure to run asynchronously, this process is managed by a thread: |
|
45 |
|
46 >>> import time |
|
47 >>> from threading import Thread |
|
48 >>> |
|
49 >>> class ConversionThread(Thread): |
|
50 ... """Conversion thread""" |
|
51 ... |
|
52 ... def __init__(self, process): |
|
53 ... Thread.__init__(self) |
|
54 ... self.process = process |
|
55 ... |
|
56 ... def run(self): |
|
57 ... self.process.start() |
|
58 ... self.process.join() |
|
59 |
|
60 The conversion handler is the simple class to which conversion is delegated: |
|
61 |
|
62 >>> class ConversionHandler(object): |
|
63 ... """Conversion handler""" |
|
64 ... |
|
65 ... def convert(self, data): |
|
66 ... ConversionThread(ConversionProcess(data)).start() |
|
67 ... return [200, 'OK'] |
|
68 |
|
69 The message handler receives the message and handle it: |
|
70 |
|
71 >>> from pyams_zmq.handler import ZMQMessageHandler |
|
72 >>> |
|
73 >>> class ConversionMessageHandler(ZMQMessageHandler): |
|
74 ... |
|
75 ... handler = ConversionHandler |
|
76 |
|
77 The ØMQ process is generally started on application startup: |
|
78 |
|
79 >>> import atexit |
|
80 >>> |
|
81 >>> process = MediasConversionProcess(converter_address, ConversionMessageHandler) |
|
82 >>> process.start() |
|
83 >>> time.sleep(2) |
|
84 >>> if process.is_alive(): |
|
85 ... atexit.register(process_exit_func, process=process) |
|
86 <function process_exit_func at 0x...> |
|
87 |
|
88 Once all these elements are in place, you just have to create a ØMQ client context, open a connection and send a |
|
89 message: |
|
90 |
|
91 >>> import zmq |
|
92 >>> settings = {'path': '/this/is/my/path', |
|
93 ... 'format': 'JPEG'} |
|
94 >>> message = ['convert', settings] |
|
95 >>> context = zmq.Context() |
|
96 >>> socket = context.socket(zmq.REQ) |
|
97 >>> socket.connect('tcp://' + converter_address) |
|
98 >>> socket.send_json(message) |
|
99 >>> socket.recv_json() |
|
100 [200, 'OK'] |
|