Source code for kaneda.queues.rq

from __future__ import absolute_import

import logging

try:
    from redis import Redis
    from rq import Queue
except ImportError:
    Redis = None
    Queue = None

from kaneda.exceptions import ImproperlyConfigured

from .base import BaseQueue


[docs]class RQQueue(BaseQueue): """ RQ queue :param queue: queue instance of RQ class. :param redis_url: Redis connection url where RQ will attend the async reporting requests. :param queue_name: name of the queue being used by the RQ worker process. """ settings_namespace = 'RQ' def __init__(self, queue=None, redis_url=None, queue_name='kaneda'): if not Redis: raise ImproperlyConfigured('You need to install redis to use the RQ queue.') if not Queue: raise ImproperlyConfigured('You need to install rq library to use the RQ queue.') if queue: if not isinstance(queue, Queue): raise ImproperlyConfigured('"queue" parameter is not an instance of RQ queue.') self.queue = queue elif redis_url: self.queue = Queue(queue_name, connection=Redis.from_url(redis_url)) else: self.queue = Queue(queue_name, connection=Redis()) def report(self, name, metric, value, tags, id_): try: return self.queue.enqueue('kaneda.tasks.rq.report', name, metric, value, tags, id_) except Exception as e: logger = logging.getLogger(__name__) logger.exception(e)