yarqueue package

Submodules

yarqueue.base_queue module

class yarqueue.base_queue.BaseJoinableQueue(maxsize=0)[source]

Bases: yarqueue.base_queue.BaseQueue

join() → None[source]

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

n_in_progress() → int[source]

How many items have been popped from the queue without task_done() being called for them

abstract n_tasks() → int[source]

How many items have been put onto the list without a respective task_done() call

abstract task_done() → None[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

wait(timeout: float)[source]

Like join, but with a timeout.

If the timeout is reached, a QueueTimeoutError is raised.

Parameters

timeout – timeout in seconds

class yarqueue.base_queue.BaseQueue(maxsize=0)[source]

Bases: abc.ABC

empty() → bool[source]

Return whether the queue is empty

full() → bool[source]

Return whether the queue is full (only if maxsize was set)

abstract get(block=True, timeout: Optional[float] = None) → object[source]

Remove and return an item from the queue.

If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Parameters
  • block – whether to wait until an item is available

  • timeout – if blocking, how long to wait (None if infinite)

Returns

item returned from the queue

get_nowait() → object[source]

Equivalent to get(False)

abstract put(obj, block=True, timeout: Optional[float] = None) → None[source]

Put obj into the queue.

If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the queue.Full exception

(timeout is ignored in that case).

Parameters
  • obj – object to add to the queue

  • block – whether to wait until item can be added

  • timeout – if blocking, how long to wait (None if infinite)

put_nowait(obj) → None[source]

Equivalent to put(obj, False).

Parameters

obj – object to add to the queue

abstract qsize() → int[source]

Return the size of the queue.

exception yarqueue.base_queue.QueueTimeoutError[source]

Bases: Exception

yarqueue.base_queue.add_logging_level(name, num)[source]

yarqueue.compat module

yarqueue.constants module

class yarqueue.constants.Side[source]

Bases: strenum.StrEnum

An enumeration.

LEFT = 'l'
RIGHT = 'r'
opposite()[source]

yarqueue.queue module

class yarqueue.queue.DeQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.FifoQueue

Redis-backed double-ended queue otherwise compatible with multiprocessing.Queue.

Contains all of the additional methods from yarqueue.Queue.

put(), put_many(), get(), and get_many() behave has they do in the parent (Fifo)Queue; i.e. putting on the right, and getting from the left. Additionally, each has an explicit _left and _right variant.

get_left(block=True, timeout=None) → object[source]

Get from the left (start) of the list.

get_many_left(n_items: int, block=True, timeout=None) → Iterator[source]

Yield elements from the left (start) of the list.

get_many_right(n_items: int, block=True, timeout=None) → Iterator[source]

Yield elements from the right (end) of the list.

get_right(block=True, timeout=None) → object[source]

Get from the right (end) of the list.

put_left(obj, block=True, timeout=None) → None[source]

Put on the left (start) of the list.

put_many_left(objs: Iterable, block=True, timeout=None)[source]

Put many elements on the left (start) of the list

put_many_right(objs: Iterable, block=True, timeout=None)[source]

Put many elements on the right (end) of the list

put_right(obj, block=True, timeout=None) → None[source]

Put on the right (end) of the list.

class yarqueue.queue.FifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.base_queue.BaseQueue

Redis-backed first-in, first-out queue compatible with multiprocessing.Queue.

Additionally, contains a put_many() method for adding several items to the redis list at once; a get_many() method which yields a given number of items as they become available, and a clear() method to empty and delete the underlying list.

clear()[source]

Empty and delete the underlying Redis list

get(block=True, timeout=None) → object[source]

Remove and return an item from the queue.

If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Parameters
  • block – whether to wait until an item is available

  • timeout – if blocking, how long to wait (None if infinite)

Returns

item returned from the queue

get_many(n_items: int, block=True, timeout=None) → Iterator[source]

Yield items from the queue.

Parameters
  • n_items – how many items to get (can be float("inf"))

  • block – as used in get

  • timeout – timeout as used in get, per item fetched

put(obj, block=True, timeout=None) → None[source]

Put obj into the queue.

If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the queue.Full exception

(timeout is ignored in that case).

Parameters
  • obj – object to add to the queue

  • block – whether to wait until item can be added

  • timeout – if blocking, how long to wait (None if infinite)

put_many(objs: Iterable, block=True, timeout=None)[source]

Put multiple items on the queue at once.

May be faster than individual calls. However, if the queue has a maxsize, no items will be put on until all of them can be.

Parameters
  • objs – iterable of objects to add. Must be finite.

  • block – as used in put

  • timeout – how long to wait, in seconds, for all items to be added together

qsize() → int[source]

Return the size of the queue.

class yarqueue.queue.JoinableDeQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.JoinableFifoQueue, yarqueue.queue.DeQueue

Redis-backed double-ended queue otherwise compatible with multiprocessing.JoinableFifoQueue.

class yarqueue.queue.JoinableFifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.FifoQueue, yarqueue.base_queue.BaseJoinableQueue

Redis-backed first-in, first-out queue compatible with multiprocessing.JoinableQueue.

Additionally, contains an n_tasks() method exposing the number of items put onto the queue without task_done() being called for them, and an n_in_progress() method to count how many have been fetched from the queue with task_done() being called.

clear()[source]

Empty and delete the underlying Redis list

n_tasks() → int[source]

How many items have been put onto the list without a respective task_done() call

task_done() → None[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

class yarqueue.queue.JoinableLifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.JoinableFifoQueue

Redis-backed last-in, first-out queue otherwise compatible with multiprocessing.JoinableQueue.

Contains all of the additional methods from yarqueue.JoinableLifoQueue.

yarqueue.queue.JoinableQueue

alias of yarqueue.queue.JoinableFifoQueue

class yarqueue.queue.LifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.FifoQueue

Redis-backed last-in, first-out queue otherwise compatible with multiprocessing.Queue.

Contains all of the additional methods from yarqueue.Queue.

yarqueue.queue.Queue

alias of yarqueue.queue.FifoQueue

yarqueue.serializer module

class yarqueue.serializer.BaseSerializer[source]

Bases: abc.ABC

abstract dumps(obj) → bytes[source]

Return the serialized representation of the object as a bytes object.

abstract loads(bytes_object: bytes) → object[source]

Return the deserialized object from its bytes representation.

class yarqueue.serializer.Json(dumps_kwargs=None, loads_kwargs=None)[source]

Bases: yarqueue.serializer.BaseSerializer

dumps(obj) → bytes[source]

Return the serialized representation of the object as a bytes object.

loads(bytes_object: bytes) → object[source]

Return the deserialized object from its bytes representation.

class yarqueue.serializer.Null[source]

Bases: yarqueue.serializer.BaseSerializer

dumps(obj)[source]

Return the serialized representation of the object as a bytes object.

loads(obj)[source]

Return the deserialized object from its bytes representation.

class yarqueue.serializer.Pickle(protocol=None, dumps_kwargs=None, loads_kwargs=None)[source]

Bases: yarqueue.serializer.BaseSerializer

dumps(obj) → bytes[source]

Return the serialized representation of the object as a bytes object.

loads(bytes_object: bytes) → object[source]

Return the deserialized object from its bytes representation.

property protocol

Module contents

Top-level package for Yet Another Redis Queue.

yarqueue.Queue

alias of yarqueue.queue.FifoQueue

class yarqueue.FifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.base_queue.BaseQueue

Redis-backed first-in, first-out queue compatible with multiprocessing.Queue.

Additionally, contains a put_many() method for adding several items to the redis list at once; a get_many() method which yields a given number of items as they become available, and a clear() method to empty and delete the underlying list.

clear()[source]

Empty and delete the underlying Redis list

get(block=True, timeout=None) → object[source]

Remove and return an item from the queue.

If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Parameters
  • block – whether to wait until an item is available

  • timeout – if blocking, how long to wait (None if infinite)

Returns

item returned from the queue

get_many(n_items: int, block=True, timeout=None) → Iterator[source]

Yield items from the queue.

Parameters
  • n_items – how many items to get (can be float("inf"))

  • block – as used in get

  • timeout – timeout as used in get, per item fetched

put(obj, block=True, timeout=None) → None[source]

Put obj into the queue.

If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the queue.Full exception

(timeout is ignored in that case).

Parameters
  • obj – object to add to the queue

  • block – whether to wait until item can be added

  • timeout – if blocking, how long to wait (None if infinite)

put_many(objs: Iterable, block=True, timeout=None)[source]

Put multiple items on the queue at once.

May be faster than individual calls. However, if the queue has a maxsize, no items will be put on until all of them can be.

Parameters
  • objs – iterable of objects to add. Must be finite.

  • block – as used in put

  • timeout – how long to wait, in seconds, for all items to be added together

qsize() → int[source]

Return the size of the queue.

class yarqueue.LifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.FifoQueue

Redis-backed last-in, first-out queue otherwise compatible with multiprocessing.Queue.

Contains all of the additional methods from yarqueue.Queue.

class yarqueue.DeQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.FifoQueue

Redis-backed double-ended queue otherwise compatible with multiprocessing.Queue.

Contains all of the additional methods from yarqueue.Queue.

put(), put_many(), get(), and get_many() behave has they do in the parent (Fifo)Queue; i.e. putting on the right, and getting from the left. Additionally, each has an explicit _left and _right variant.

get_left(block=True, timeout=None) → object[source]

Get from the left (start) of the list.

get_many_left(n_items: int, block=True, timeout=None) → Iterator[source]

Yield elements from the left (start) of the list.

get_many_right(n_items: int, block=True, timeout=None) → Iterator[source]

Yield elements from the right (end) of the list.

get_right(block=True, timeout=None) → object[source]

Get from the right (end) of the list.

put_left(obj, block=True, timeout=None) → None[source]

Put on the left (start) of the list.

put_many_left(objs: Iterable, block=True, timeout=None)[source]

Put many elements on the left (start) of the list

put_many_right(objs: Iterable, block=True, timeout=None)[source]

Put many elements on the right (end) of the list

put_right(obj, block=True, timeout=None) → None[source]

Put on the right (end) of the list.

yarqueue.JoinableQueue

alias of yarqueue.queue.JoinableFifoQueue

class yarqueue.JoinableFifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.FifoQueue, yarqueue.base_queue.BaseJoinableQueue

Redis-backed first-in, first-out queue compatible with multiprocessing.JoinableQueue.

Additionally, contains an n_tasks() method exposing the number of items put onto the queue without task_done() being called for them, and an n_in_progress() method to count how many have been fetched from the queue with task_done() being called.

clear()[source]

Empty and delete the underlying Redis list

n_tasks() → int[source]

How many items have been put onto the list without a respective task_done() call

task_done() → None[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

class yarqueue.JoinableLifoQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.JoinableFifoQueue

Redis-backed last-in, first-out queue otherwise compatible with multiprocessing.JoinableQueue.

Contains all of the additional methods from yarqueue.JoinableLifoQueue.

class yarqueue.JoinableDeQueue(maxsize=0, name: Optional[str] = None, redis: Optional[redis.client.Redis] = None, serializer: Optional[yarqueue.serializer.BaseSerializer] = <yarqueue.serializer.Pickle object>)[source]

Bases: yarqueue.queue.JoinableFifoQueue, yarqueue.queue.DeQueue

Redis-backed double-ended queue otherwise compatible with multiprocessing.JoinableFifoQueue.

exception yarqueue.QueueTimeoutError[source]

Bases: Exception

class yarqueue.Pickle(protocol=None, dumps_kwargs=None, loads_kwargs=None)[source]

Bases: yarqueue.serializer.BaseSerializer

dumps(obj) → bytes[source]

Return the serialized representation of the object as a bytes object.

loads(bytes_object: bytes) → object[source]

Return the deserialized object from its bytes representation.

property protocol
class yarqueue.Json(dumps_kwargs=None, loads_kwargs=None)[source]

Bases: yarqueue.serializer.BaseSerializer

dumps(obj) → bytes[source]

Return the serialized representation of the object as a bytes object.

loads(bytes_object: bytes) → object[source]

Return the deserialized object from its bytes representation.