kazoo.recipe.queue

Zookeeper based queue implementations.

Maintainer:

None

Status:

Possibly Buggy

Note

This queue was reported to cause memory leaks over long running periods. See: https://github.com/python-zk/kazoo/issues/175

New in version 0.6: The Queue class.

New in version 1.0: The LockingQueue class.

Public API

class kazoo.recipe.queue.Queue(client, path)[source]

A distributed queue with optional priority support.

This queue does not offer reliable consumption. An entry is removed from the queue prior to being processed. So if an error occurs, the consumer has to re-queue the item or it will be lost.

__init__(client, path)[source]
Parameters:
  • client – A KazooClient instance.

  • path – The queue path to use in ZooKeeper.

__len__()[source]

Return queue size.

get()[source]

Get item data and remove an item from the queue.

Returns:

Item data or None.

Return type:

bytes

put(value, priority=100)[source]

Put an item into the queue.

Parameters:
  • value – Byte string to put into the queue.

  • priority – An optional priority as an integer with at most 3 digits. Lower values signify higher priority.

class kazoo.recipe.queue.LockingQueue(client, path)[source]

A distributed queue with priority and locking support.

Upon retrieving an entry from the queue, the entry gets locked with an ephemeral node (instead of deleted). If an error occurs, this lock gets released so that others could retake the entry. This adds a little penalty as compared to Queue implementation.

The user should call the LockingQueue.get() method first to lock and retrieve the next entry. When finished processing the entry, a user should call the LockingQueue.consume() method that will remove the entry from the queue.

This queue will not track connection status with ZooKeeper. If a node locks an element, then loses connection with ZooKeeper and later reconnects, the lock will probably be removed by Zookeeper in the meantime, but a node would still think that it holds a lock. The user should check the connection status with Zookeeper or call LockingQueue.holds_lock() method that will check if a node still holds the lock.

Note

LockingQueue requires ZooKeeper 3.4 or above, since it is using transactions.

__init__(client, path)[source]
Parameters:
  • client – A KazooClient instance.

  • path – The queue path to use in ZooKeeper.

__len__()[source]

Returns the current length of the queue.

Returns:

queue size (includes locked entries count).

consume()[source]

Removes a currently processing entry from the queue.

Returns:

True if element was removed successfully, False otherwise.

Return type:

bool

get(timeout=None)[source]

Locks and gets an entry from the queue. If a previously got entry was not consumed, this method will return that entry.

Parameters:

timeout – Maximum waiting time in seconds. If None then it will wait until an entry appears in the queue.

Returns:

A locked entry value or None if the timeout was reached.

Return type:

bytes

holds_lock()[source]

Checks if a node still holds the lock.

Returns:

True if a node still holds the lock, False otherwise.

Return type:

bool

put(value, priority=100)[source]

Put an entry into the queue.

Parameters:
  • value – Byte string to put into the queue.

  • priority – An optional priority as an integer with at most 3 digits. Lower values signify higher priority.

put_all(values, priority=100)[source]

Put several entries into the queue. The action only succeeds if all entries where put into the queue.

Parameters:
  • values – A list of values to put into the queue.

  • priority – An optional priority as an integer with at most 3 digits. Lower values signify higher priority.

release()[source]

Removes the lock from currently processed item without consuming it.

Returns:

True if the lock was removed successfully, False otherwise.

Return type:

bool