Usage

Queues

The basic usage is intentionally much like the standard multiprocessing.Queue. However, in the background this basic usage is spinning up a redislite instance. Like a standard multiprocessing.Queue, objects are (by default) serialized with pickle. However, the highest, rather than default, protocol is used (and it will attempt to use the backported pickle5).

import time
import yarqueue

queue = yarqueue.Queue()

queue.put(1)
queue.put("two")
queue.put({3: "four"})
queue.put(["five", 6, 7.0])

assert queue.get() == 1
assert queue.get() == "two"


def parallel_fn(wait):
    print("waiting for {} seconds".format(wait))
    time.sleep(wait)

    return queue.get()

import multiprocessing as mp
with mp.Pool() as p:
    third, fourth = p.map(parallel_fn, range(2))

.put() and .get() methods accept block and timeout arguments like their counterparts; they also have the expected .*_nowait() variants. .qsize(), .full(), and .empty() also work as expected.

Note that redis has a limitation in that its get timeouts must be integers. yarqueue will handle this for you, but will raise a warning. put timeouts are handled by yarqueue and so may be floats; future versions may do the same with get.

There are some additional convenience methods:

# a more pythonic queue length interface
assert len(queue) == queue.qsize()

# add several elements in one go
queue.put_many(["one", "one", "two", "two"])

# return a generator of the first n elements
for item in queue.get_many(2):
    assert item == "one"

assert list(queue.get_many(2)) == ["two", "two"]

# empty the queue
queue.clear()

# pythonic iteration: yields with .get_nowait() until the queue is empty
for item in queue:
    print(item)

# clears queue on leaving the context manager
with queue:
    queue.put_many([1, 2, 3])
    assert len(queue) == 3

assert len(queue) == 0

Like multiprocessing, there is a JoinableQueue, which keeps track of how many tasks have been added and processed. With .join(), it will wait until all of them have registered .task_done()

jqueue = yarqueue.JoinableQueue()
jqueue.put_many([1, 2, 3])
one = jqueue.get()
assert one == 1
jqueue.task_done()

def another_parallel_fn(wait):
    item = jqueue.get()
    print("waiting for {} seconds".format(wait))
    time.sleep(wait)
    jqueue.task_done()
    return item

with mp.Pool() as p:
    results = p.map_async(another_parallel_fn, range(2))
    jqueue.join()

yarqueue has some improvements here as well, which allow you to track workers’ progress through the queue. Joinable queues increment a counter whenever an item is added, and decrement it when .task_done() is called. .qsize() counts how many items are currently in the queue. .n_tasks() returns the current counter value. .n_in_progress() returns the number of items which have been removed from the queue, but are not done yet.

jqueue.clear()

jqueue.put_many([1, 2, 3])
assert len(jqueue) == 3
assert jqueue.n_tasks() == 3
assert jqueue.n_in_progress() == 0

item = jqueue.get()
assert len(jqueue) == 2
assert jqueue.n_tasks() == 3
assert jqueue.n_in_progress() == 1

jqueue.task_done()
assert len(jqueue) == 2
assert jqueue.n_tasks() == 2
assert jqueue.n_in_progress() == 0

# .wait() does the same as .join(), but with a timeout in seconds
import pytest
with pytest.raises(yarqueue.QueueTimeoutError):
    jqueue.wait(1.5)

# the context manager calls .join() on exit
with jqueue:
    jqueue.get()
    jqueue.task_done()
    jqueue.get()
    jqueue.task_done()

As well as the default first-in, first-out queue, there is a last-in, first-out queue (stack), like the threading.LifoQueue: yarqueue.LifoQueue. There is also a double-ended queue (yarqueue.DeQueue) which behaves like the standard Queue, but has additional .*_left() (start) and .*_right() (end) variants for each .put*() and .get*() method.

lifo = yarqueue.LifoQueue()
lifo.put(1)
lifo.put(2)
assert lifo.get() == 2
assert lifo.get() == 1

de = yarqueue.DeQueue()
de.put(1)
de.put_left(2)
de.put_right(3)
assert list(de.get_many_left(3)) == [2, 1, 3]

de.put_left(1)
de.put_left(2)
de.put_many_left(3, 4)  # order as would be expected for repeated calls
assert list(de.get_many_left(4)) == [4, 3, 2, 1]

These come in joinable varieties too: yarqueue.JoinableLifoQueue and yarqueue.JoinableDeQueue.

The power of redis

The real value of using redis-backed queues is sharing them across different python instances. To do this, you need to give the queues a name and a connection to the same redis server:

from redis import Redis

redis_config = {"host": "localhost", "port": 6379, "db": 0}

redis1 = Redis(**redis_config)
queue1 = yarqueue.Queue(name="my_queue", redis=redis1)

redis2 = Redis(**redis_config)
queue2 = yarqueue.Queue(name="my_queue", redis=redis2)

queue1.put(1)
assert queue2.get() == 1

Be aware that different python environments may have different pickle protocols available: it may be better to explicitly set your serializer on queue instantiation (see below).

These names are not mangled: redis can be used to synchronise programs running in different languages! If you’re using it this way, you should replace the default serializer, as most languages do not use pickle. Note that other languages and queue implementations will not respect the task-counting semantics of yarqueue.JoinableQueue.

Serializers

In yarqueue, a serializer is anything which can turn an object into bytes with a .dumps(obj) method, and then bytes back into a python object with .loads(bytes_object) method.

The default serializer is a wrapper around pickle, and uses the highest available pickle protocol. Explicitly set the protocol version (useful for sharing a redis list between python environments) like this:

from yarqueue import Pickle

pickle3_q = yarqueue.Queue(serializer=Pickle(3))

Feel free to create your own serializers (useful for sharing a redis list between programming languages). here is the implementation of the included json serializer:

from copy import deepcopy
from yarqueue.serializer import BaseSerializer

class Json(BaseSerializer):
    def __init__(self, dumps_kwargs=None, loads_kwargs=None):
        self.dumps_kwargs = deepcopy(dumps_kwargs) or dict()
        self.loads_kwargs = deepcopy(loads_kwargs) or dict()

    def dumps(self, obj) -> bytes:
        return json.dumps(obj, **self.dumps_kwargs).encode()

    def loads(self, bytes_object: bytes) -> object:
        return json.loads(bytes_object, **self.loads_kwargs)

If you explicitly set the serializer argument to None, values will not be serialized and deserialized. This might be useful if you’re only working with primitives redis understands.

Watchers

Command line

Requires click and tqdm.

Usage: yarqwatch [OPTIONS]

  Watch the progress of a number of redis-backed queues, on the command
  line.

Options:
  --version             Show the version and exit.
  --help                Show this message and exit.
  -n, --name TEXT       Name of redis lists to watch (accepts multiple)
  -t, --total INTEGER   Total items added to the queue (accepts multiple, same
                        order as --name)
  -i, --interval FLOAT  Polling interval (seconds)  [default: 1]
  --host TEXT           Hostname for the Redis instance  [default: localhost]
  --port INTEGER        Port for the Redis instance  [default: 6379]
  --db INTEGER          DB ID for the Redis instance  [default: 0]
  --password TEXT       Password for the Redis instance

For example, to create a progress bar for the potato queue, which had 10 jobs, and the spade queue, which had 20 jobs, on the redis instance at myserver:1234, polling every 5 seconds:

yarqwatch -n potato -t 10 -n spade -t 20 --host myserver --port 1234 --interval 5

If totals are not explicitly given, the number of enqueued (and for Joinable queues, in-progress) items at calling time is used.

HTTP

Requires click and flask.

Usage: yarqserve [OPTIONS]

  Watch the progress of a number of redis-backed queues, over HTTP.

Options:
  --version            Show the version and exit.
  --help               Show this message and exit.
  -n, --name TEXT      Name of redis lists to watch (accepts multiple)
  -t, --total INTEGER  Total items added to the queue (accepts multiple, same
                       order as --name
  --host TEXT          Hostname at which to run server  [default: localhost]
  --port INTEGER       Port on which to run server  [default: 8080]
  --rhost TEXT         Hostname for the Redis instance  [default: localhost]
  --rport INTEGER      Port for the Redis instance  [default: 6379]
  --db INTEGER         DB ID for the Redis instance  [default: 0]
  --password INTEGER   Password for the Redis instance

For example, to serve a webpage and REST endpoint with a progress bar for the potato queue, which started with 10 jobs, and the spade queue, which started with 20 jobs, on the redis instance at myserver:1234, at the host localhost:8888:

yarqserve -n potato -t 10 -n spade -t 20 --host localhost --port 8888 --rhost myserver --rport 1234

If totals are not explicitly given, the number of enqueued (and for Joinable queues, in-progress) items at server startup time is used.

Point your browser to http://localhost:8888 to see the webpage, or curl http://localhost:8888/json to get the progress data in JSON form. The returned object’s keys are the queue names, and the values are objects with queued, inProgress, and total counts.