|
1 ================= |
|
2 pyams_zmq package |
|
3 ================= |
|
4 |
|
5 PyAMS 'ZMQ' package can be used to build wrapper around ØMQ (or ZeroMQ) library to exchange |
|
6 messages following all ØMQ possible usages. |
|
7 |
|
8 At least two components are required to build a ØMQ based application: |
|
9 |
|
10 - a ØMQ server |
|
11 |
|
12 - a ØMQ client |
|
13 |
|
14 The way client and server communicate depends on used ØMQ protocol. |
|
15 |
|
16 We will take example on the medias conversion utility provided by 'pyams_media' package, which |
|
17 allows you to automatically convert medias files (videos...) asynchronously as soon as they are |
|
18 uploaded. The conversion process is a background process so doesn't return any result. |
|
19 |
|
20 The conversion process is a simple ØMQ process: |
|
21 |
|
22 >>> from pyramid.testing import setUp, tearDown |
|
23 >>> config = setUp() |
|
24 |
|
25 >>> from multiprocessing import Process |
|
26 >>> from pyams_zmq.process import ZMQProcess, process_exit_func |
|
27 |
|
28 >>> converter_address = '127.0.0.1:25556' |
|
29 |
|
30 >>> class MyConversionProcess(Process): |
|
31 ... """Conversion manager process""" |
|
32 ... |
|
33 ... def __init__(self, settings, group=None, target=None, name=None, *args, **kwargs): |
|
34 ... Process.__init__(self, group, target, name, args, kwargs) |
|
35 ... self.settings = settings |
|
36 ... |
|
37 ... def run(self): |
|
38 ... settings = self.settings |
|
39 ... path = settings['path'] |
|
40 ... format = settings['format'] |
|
41 ... # you can virtually do anything you want with these settings |
|
42 |
|
43 To be sure to run asynchronously, this process is managed by a thread: |
|
44 |
|
45 >>> import time |
|
46 >>> from threading import Thread |
|
47 |
|
48 >>> class ConversionThread(Thread): |
|
49 ... """Conversion thread""" |
|
50 ... |
|
51 ... def __init__(self, process): |
|
52 ... Thread.__init__(self) |
|
53 ... self.process = process |
|
54 ... |
|
55 ... def run(self): |
|
56 ... self.process.start() |
|
57 ... self.process.join() |
|
58 |
|
59 The conversion handler is the simple class to which conversion is delegated: |
|
60 |
|
61 >>> class ConversionHandler: |
|
62 ... """Conversion handler""" |
|
63 ... def convert(self, data): |
|
64 ... ConversionThread(MyConversionProcess(data)).start() |
|
65 ... return [200, 'OK'] |
|
66 |
|
67 The message handler receives the message and handle it: |
|
68 |
|
69 >>> from pyams_zmq.handler import ZMQMessageHandler |
|
70 >>> class ConversionMessageHandler(ZMQMessageHandler): |
|
71 ... handler = ConversionHandler |
|
72 |
|
73 >>> class ConversionProcess(ZMQProcess): |
|
74 ... """Medias conversion process""" |
|
75 |
|
76 The ØMQ process is generally started on application startup; following tests are commented |
|
77 because tests are stucked when running in test mode: |
|
78 |
|
79 # >>> import atexit |
|
80 # >>> process = ConversionProcess(converter_address, ConversionMessageHandler) |
|
81 # >>> process.start() |
|
82 # >>> time.sleep(2) |
|
83 # >>> if process.is_alive(): |
|
84 # ... atexit.register(process_exit_func, process=process) |
|
85 # <function process_exit_func at 0x...> |
|
86 |
|
87 Once all these elements are in place, you just have to create a ØMQ client context, open a |
|
88 connection and send a message. |
|
89 |
|
90 Messages are lists of two objects; the first one is the type os the message, which should match |
|
91 a method name of the message handler; the second object is the method arguments: |
|
92 |
|
93 >>> import zmq |
|
94 >>> from pyams_zmq.socket import zmq_socket, zmq_response |
|
95 |
|
96 >>> settings = {'path': '/this/is/my/path', |
|
97 ... 'format': 'JPEG'} |
|
98 >>> message = ['convert', settings] |
|
99 |
|
100 # >>> socket = zmq_socket('tcp://' + converter_address) |
|
101 # >>> socket.send_json(message) |
|
102 # >>> response = zmq_response(socket) |
|
103 # >>> response |
|
104 # [200, 'OK'] |
|
105 |
|
106 >>> tearDown() |