-
Notifications
You must be signed in to change notification settings - Fork 222
/
item_39_use_queue.py
367 lines (284 loc) · 11.9 KB
/
item_39_use_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# Item 39: Use queue to coordinate work between threads
from collections import deque
from threading import Thread
from threading import Lock
from time import sleep
from queue import Queue
import time
# Python programs that do many things concurrently often need to coordinate
# their work. One of the most useful arrangements for concurrent work is a
# pipeline of functions.
# A pipeline works like an assembly line used in manufacturing. Pipelines have
# many phases in serial with a specific function for each phase. Each function
# can operate concurrently on the piece of work in its phase. The work moves
# forward as each function completes until there are no phases remaining. This
# approach is especially good for work that includes blocking I/O or
# sub-processes-activities that can easily be parallelized using Python (see
# Item 37: "Use threads for blocking I/O, avoid for parallelism").
# For example, say you want to build a system that will take a constant
# stream of images from your digital camera, resize them, and then add them to
# a photo gallery online. New images are retrieved in the first phase. The
# The downloaded images are passed through the resize function int he second
# phase. The resized images are consumed by the upload function in the final
# phase.
# Imagine you had already written Python functions that execute the phases:
# download, upload. How do you assemble a pipeline to do the work
# concurrently.
# The first thing you need is a way to hand off work between the pipeline
# phases. This can be modeled as a thread-safe producer-consumer queues (see
# Item 38: "Use lock to prevent data races in threads" to understand the
# importance of thread safely in Python; see Item 46: "Use built-in algorithms
# and data structures" for the deque class).
class MyQueue(object):
def __init__(self):
self.items = deque()
self.lock = Lock()
# The producer, your digital camera, adds new images to the end of th list of
# pending items.
def put(self, item):
with self.lock:
self.items.append(item)
# The consumer, the first phase of your processing pipeline, removes images
# from the front of the list of pending items.
def get(self):
with self.lock:
return self.items.popleft()
# Here, I represent each phase of the pipeline as a Python thread that takes
# work from one queue like this, runs a function on it, and puts the results
# on another queue. I also track how many times the worker has checked for new
# input and how much work it's completed.
class Worker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.polled_count = 0
self.work_done = 0
# The trickiest pat is that the worker thread must properly handle the case
# where the input queue is empty because the previous phase hasn't completed
# its work yet. This happens where I catch the Index Error exceptions below.
# You can think of this as a holdup in the assembly line.
def run(self):
while True:
self.polled_count += 1
try:
item = self.in_queue.get()
except IndexError:
sleep(0.01) # No work to do
else:
result = self.func(item)
self.out_queue.put(result)
self.work_done += 1
# Now I can connect the three phases together by creating the sequence for
# their coordination point and the corresponding worker threads.
def download(item):
# print(item)
# print("download()")
pass
def resize(item):
# print("resize()")
# print(item)
pass
def upload(item):
# print("upload()")
# print(item)
pass
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
Worker(download, download_queue, resize_queue),
Worker(resize, resize_queue, upload_queue),
Worker(upload, upload_queue, done_queue),
]
# I can start the threads and then inject a bunch of work into the first
# phase of the pipeline. Here, I use a plain object instance as a porxy for
# the real data required by the download function:
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
# Now I wait for all of the items to be processed by the pipeline and end up
# in the done_queue.
while len(done_queue.items) < 1000:
# Do something useful while waiting
# ....
pass
# This runs properly, but there's an interesting side effect caused by the
# threads polling their input queues for new work. The tricky part, where I
# catch IndexError exceptions in the run method, executes a large number of
# items.
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling', polled, 'items')
# Processed 1000 items after polling 3008 items
# When the worker functions vary in speeds, an earlier phase can prevent
# progress in later phase, backing up the pipeline. This causes later phases
# to starve and constantly check their input queues for new work in a tight
# loop. The outcome is that worker threads waste CPU time doing nothing useful
# (they're constantly raising and catching IndexError exceptions).
# But that's just the beginning of what's wrong with this implementation.
# There are three more problems that you should also avoid. First, determining
# that all of the input work is complete requires yer another busy wait on the
# done_queue. Second, in Worker the run method will execute forever in its busy
# loop. There's no way to signal to a worker thread that it's time to exit.
# Third, and worst of all, a backup in the pipeline can cause the program to
# crash arbitrarily. If the first phase makes rapid progress but the second
# phase will constantly increase in size. The second phase won't be able to
# keep up. Given enough time and input data, the program will eventually run
# out of memory and die.
# Queue to Rescue
# The Queue class from the queue built-in module provides all of the
# functionality you need to solve these problems.
# Queue eliminates the busy waiting in the worker by making the get method
# block until new data is available. For example, here I start a thread that
# waits for some input data on a queue:
queue = Queue()
def consumer():
print('Consumer waiting')
queue.get() # Runs after put() below
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
# Even though the thread is running first, it won't finish until an item is
# put on the Queue instance and the get method has something to return.
print('Producer putting')
queue.put(object()) # Runs before get() above
thread.join()
print('Producer done')
# Consumer waiting
# Producer putting
# Consumer done
# Producer done
# To solve the pipeline backup issue, the Queue class lets you specify the
# maximum amount of pending work you'll allow between two phases. This buffer
# size causes calls to put to block when the queue is already full. For
# example, here I define a thread that waits for a while before consuming a
# queue:
queue = Queue(1) # Buffer size of 1
def consumer():
time.sleep(0.1) # Waits
queue.get() # Runs second
print('Consumer got 1')
queue.get() # Runs fourth
print('Consumer got 2')
thread = Thread(target=consumer)
thread.start()
# The wait should allow the producer thread to put both objects on the queue
# before the consume thread ever calls get. But the Queue size is one. That
# means the producer adding items to the queue will have to wait for the
# consumer thread to call get at least once before the second call to put will
# stop blocking and add the second item to the queue.
queue.put(object())
print('Producer put 1') # Runs first
queue.put(object())
print('Producer put 2')
thread.join()
print('Producer done')
# Producer put 1
# Consumer got 1
# Producer put 2
# Consumer got 2
# Producer done
# The Queue class can also track the progress of work using the task_done
# method. This lets you wait for a phase's input queue to drain and eliminates
# the need for polling the done_queue at the end of your pipeline. For
# example, here I define a consumer thread that calls task_done when it
# finishes working on an item.
in_queue = Queue()
if __name__ == '__main__':
def consumer():
print("Consumer waiting")
work = in_queue.get() # Done second
print('Consumer working')
# Doning work
# ..
print('Consumer done')
in_queue.task_done() # Done third
Thread(target=consumer).start()
# Now, the producer code doesn't have to join the consumer thread or poll. The
# producer can just wait for the in_queue to finish by calling join on the
# Queue instance. Even once it's empty, the in_queue won't be joinable until
# after task_done is called for every item that was ever enqueued.
in_queue.put(object()) # Done first
print('Producer waiting')
in_queue.join()
print('Producer done')
# Consumer waiting
# Producer waiting
# Consumer working
# Consumer done
# Producer done
# I can put all of these behaviors together into a Queue subclass that also
# tells the worker thread when it should stop processing. Here, I define a
# close method that adds a special item to the queue that indicates these will
# be no more input items after it:
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
# Then, I define an iterator for the queue that looks for this special object
# and stops iteration when it's found. This __iter__ method also calls
# task_done at appropriate times, letting me track the progress of work on the
# queue.
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # Cause the thread to exit
yield item
finally:
self.task_done()
# Now, I can redefine my worker thread to rely on the behavior of the
# ClosableQueue class. The thread will exit once the for loop is exhausted.
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.polled_count = 0
self.work_done = 0
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
# Here, I re-create the set of worker threads using the new worker class:
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, done_queue),
]
# After running the worker threads like before, I also send the stop signal
# once all the input work has been injected by closing the input queue of the
# first phase.
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
download_queue.close()
# Finally, I wait for the work to finish by joining each queue that connects
# the phases. Each time one phase is done, I signal the next phase to stop by
# closing its input queue. At the end, the done_queue contains all of the
# output objects as expected.
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'item finished')
# 1000 item finished
# Things to remember
# 1. Pipelines are a great way to organize sequences of work that run
# concurrently using multiple Python threads.
# 2. Be aware of the many problems in building concurrent pipelines: busy
# waiting, stopping workers, and memory explosion.
# 3. The Queue class has all of the facilities you need to build robust
# pipelines: blocking operations, buffer sizes, and joining.