Queues

Kaneda provides builtin queues to store metrics and events to perform asynchronous reporting. If you want to use your custom asynchronous queue system you need to subclass BaseQueue and implement your custom report method which is the responsible to pass metrics data to a job queue.

Celery

Celery is a simple, flexible and reliable distributed system to process vast amounts of messages. It can be configured using various broker systems such Redis or RabbitMQ.

Note

Before using Celery as async queue you need to install Celery library:

pip install celery
class kaneda.queues.CeleryQueue(app=None, broker=None, queue_name='')[source]

Celery queue.

Parameters:
  • app – app instance of Celery class.
  • broker – broker connection url where Celery will attend the async reporting requests.
  • queue_name – name of the queue being used by the Celery worker process.

To run the worker execute this command:

celery -A kaneda.tasks.celery worker

RQ

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It uses Redis as main broker system.

Note

Before using RQ as async queue you need to install RQ and Redis library:

pip install redis
pip install rq

To run the worker execute this command:

rqworker [queue]

The default queue is “kaneda”.

class kaneda.queues.RQQueue(queue=None, redis_url=None, queue_name='kaneda')[source]

RQ queue

Parameters:
  • queue – queue instance of RQ class.
  • redis_url – Redis connection url where RQ will attend the async reporting requests.
  • queue_name – name of the queue being used by the RQ worker process.

ZMQ

ZMQ (or ZeroMQ) is a library which extends the standard socket interfaces with features traditionally provided by specialised messaging middleware products. ZeroMQ sockets provide an abstraction of asynchronous message queues and much more.

Note

Before using ZMQ as async queue you need to install ZMQ library:

pip install pyzmq

To run the worker execute this command:

zmqworker --connection_url=<zmq_connection_url>

or define ZMQ settings in kanedasettings.py and simply execute the worker command with:

zmqworker
class kaneda.queues.ZMQQueue(connection_url, timeout=300)[source]

ZeroMQ queue

Parameters:
  • connection_url – ZMQ connection url (tcp://127.0.0.1:5555).
  • timeout – ZMQ socket timeout (milliseconds).