Usage¶
Queues¶
The basic usage is intentionally much like the standard multiprocessing.Queue. However, in the background this basic usage is spinning up a redislite instance. Like a standard multiprocessing.Queue, objects are (by default) serialized with pickle. However, the highest, rather than default, protocol is used (and it will attempt to use the backported pickle5).
import time
import yarqueue
queue = yarqueue.Queue()
queue.put(1)
queue.put("two")
queue.put({3: "four"})
queue.put(["five", 6, 7.0])
assert queue.get() == 1
assert queue.get() == "two"
def parallel_fn(wait):
print("waiting for {} seconds".format(wait))
time.sleep(wait)
return queue.get()
import multiprocessing as mp
with mp.Pool() as p:
third, fourth = p.map(parallel_fn, range(2))
.put()
and .get()
methods accept block
and timeout
arguments like their counterparts;
they also have the expected .*_nowait()
variants.
.qsize()
, .full()
, and .empty()
also work as expected.
Note that redis has a limitation in that its get
timeouts must be integers.
yarqueue
will handle this for you, but will raise a warning.
put
timeouts are handled by yarqueue
and so may be floats; future versions may do the same with get
.
There are some additional convenience methods:
# a more pythonic queue length interface
assert len(queue) == queue.qsize()
# add several elements in one go
queue.put_many(["one", "one", "two", "two"])
# return a generator of the first n elements
for item in queue.get_many(2):
assert item == "one"
assert list(queue.get_many(2)) == ["two", "two"]
# empty the queue
queue.clear()
# pythonic iteration: yields with .get_nowait() until the queue is empty
for item in queue:
print(item)
# clears queue on leaving the context manager
with queue:
queue.put_many([1, 2, 3])
assert len(queue) == 3
assert len(queue) == 0
Like multiprocessing
, there is a JoinableQueue
, which keeps track of how many tasks have been added and processed.
With .join()
, it will wait until all of them have registered .task_done()
jqueue = yarqueue.JoinableQueue()
jqueue.put_many([1, 2, 3])
one = jqueue.get()
assert one == 1
jqueue.task_done()
def another_parallel_fn(wait):
item = jqueue.get()
print("waiting for {} seconds".format(wait))
time.sleep(wait)
jqueue.task_done()
return item
with mp.Pool() as p:
results = p.map_async(another_parallel_fn, range(2))
jqueue.join()
yarqueue
has some improvements here as well, which allow you to track workers’ progress through the queue.
Joinable queues increment a counter whenever an item is added, and decrement it when .task_done()
is called.
.qsize()
counts how many items are currently in the queue.
.n_tasks()
returns the current counter value.
.n_in_progress()
returns the number of items which have been removed from the queue, but are not done yet.
jqueue.clear()
jqueue.put_many([1, 2, 3])
assert len(jqueue) == 3
assert jqueue.n_tasks() == 3
assert jqueue.n_in_progress() == 0
item = jqueue.get()
assert len(jqueue) == 2
assert jqueue.n_tasks() == 3
assert jqueue.n_in_progress() == 1
jqueue.task_done()
assert len(jqueue) == 2
assert jqueue.n_tasks() == 2
assert jqueue.n_in_progress() == 0
# .wait() does the same as .join(), but with a timeout in seconds
import pytest
with pytest.raises(yarqueue.QueueTimeoutError):
jqueue.wait(1.5)
# the context manager calls .join() on exit
with jqueue:
jqueue.get()
jqueue.task_done()
jqueue.get()
jqueue.task_done()
As well as the default first-in, first-out queue, there is a last-in, first-out queue (stack), like the threading.LifoQueue: yarqueue.LifoQueue
.
There is also a double-ended queue (yarqueue.DeQueue
) which behaves like the standard Queue,
but has additional .*_left()
(start) and .*_right()
(end) variants for each .put*()
and .get*()
method.
lifo = yarqueue.LifoQueue()
lifo.put(1)
lifo.put(2)
assert lifo.get() == 2
assert lifo.get() == 1
de = yarqueue.DeQueue()
de.put(1)
de.put_left(2)
de.put_right(3)
assert list(de.get_many_left(3)) == [2, 1, 3]
de.put_left(1)
de.put_left(2)
de.put_many_left(3, 4) # order as would be expected for repeated calls
assert list(de.get_many_left(4)) == [4, 3, 2, 1]
These come in joinable varieties too: yarqueue.JoinableLifoQueue
and yarqueue.JoinableDeQueue
.
The power of redis¶
The real value of using redis-backed queues is sharing them across different python instances. To do this, you need to give the queues a name and a connection to the same redis server:
from redis import Redis
redis_config = {"host": "localhost", "port": 6379, "db": 0}
redis1 = Redis(**redis_config)
queue1 = yarqueue.Queue(name="my_queue", redis=redis1)
redis2 = Redis(**redis_config)
queue2 = yarqueue.Queue(name="my_queue", redis=redis2)
queue1.put(1)
assert queue2.get() == 1
Be aware that different python environments may have different pickle
protocols available:
it may be better to explicitly set your serializer on queue instantiation (see below).
These names are not mangled: redis can be used to synchronise programs running in different languages!
If you’re using it this way, you should replace the default serializer, as most languages do not use pickle
.
Note that other languages and queue implementations will not respect the task-counting semantics of yarqueue.JoinableQueue
.
Serializers¶
In yarqueue
, a serializer is anything which can turn an object into bytes
with a .dumps(obj)
method,
and then bytes
back into a python object with .loads(bytes_object)
method.
The default serializer is a wrapper around pickle
, and uses the highest available pickle protocol.
Explicitly set the protocol version (useful for sharing a redis list between python environments) like this:
from yarqueue import Pickle
pickle3_q = yarqueue.Queue(serializer=Pickle(3))
Feel free to create your own serializers (useful for sharing a redis list between programming languages).
here is the implementation of the included json
serializer:
from copy import deepcopy
from yarqueue.serializer import BaseSerializer
class Json(BaseSerializer):
def __init__(self, dumps_kwargs=None, loads_kwargs=None):
self.dumps_kwargs = deepcopy(dumps_kwargs) or dict()
self.loads_kwargs = deepcopy(loads_kwargs) or dict()
def dumps(self, obj) -> bytes:
return json.dumps(obj, **self.dumps_kwargs).encode()
def loads(self, bytes_object: bytes) -> object:
return json.loads(bytes_object, **self.loads_kwargs)
If you explicitly set the serializer
argument to None
, values will not be serialized and deserialized.
This might be useful if you’re only working with primitives redis
understands.
Watchers¶
Command line¶
Usage: yarqwatch [OPTIONS]
Watch the progress of a number of redis-backed queues, on the command
line.
Options:
--version Show the version and exit.
--help Show this message and exit.
-n, --name TEXT Name of redis lists to watch (accepts multiple)
-t, --total INTEGER Total items added to the queue (accepts multiple, same
order as --name)
-i, --interval FLOAT Polling interval (seconds) [default: 1]
--host TEXT Hostname for the Redis instance [default: localhost]
--port INTEGER Port for the Redis instance [default: 6379]
--db INTEGER DB ID for the Redis instance [default: 0]
--password TEXT Password for the Redis instance
For example, to create a progress bar for
the potato
queue, which had 10 jobs, and
the spade
queue, which had 20 jobs,
on the redis instance at myserver:1234
,
polling every 5 seconds:
yarqwatch -n potato -t 10 -n spade -t 20 --host myserver --port 1234 --interval 5
If totals are not explicitly given, the number of enqueued (and for Joinable queues, in-progress) items at calling time is used.
HTTP¶
Usage: yarqserve [OPTIONS]
Watch the progress of a number of redis-backed queues, over HTTP.
Options:
--version Show the version and exit.
--help Show this message and exit.
-n, --name TEXT Name of redis lists to watch (accepts multiple)
-t, --total INTEGER Total items added to the queue (accepts multiple, same
order as --name
--host TEXT Hostname at which to run server [default: localhost]
--port INTEGER Port on which to run server [default: 8080]
--rhost TEXT Hostname for the Redis instance [default: localhost]
--rport INTEGER Port for the Redis instance [default: 6379]
--db INTEGER DB ID for the Redis instance [default: 0]
--password INTEGER Password for the Redis instance
For example, to serve a webpage and REST endpoint with a progress bar for
the potato
queue, which started with 10 jobs, and
the spade
queue, which started with 20 jobs,
on the redis instance at myserver:1234
,
at the host localhost:8888
:
yarqserve -n potato -t 10 -n spade -t 20 --host localhost --port 8888 --rhost myserver --rport 1234
If totals are not explicitly given, the number of enqueued (and for Joinable queues, in-progress) items at server startup time is used.
Point your browser to http://localhost:8888 to see the webpage,
or curl http://localhost:8888/json
to get the progress data in JSON form.
The returned object’s keys are the queue names, and the values are objects with queued
, inProgress
, and total
counts.