Source code for kaneda.queues.celery

from __future__ import absolute_import

import logging

try:
    from celery import Celery
except ImportError:
    Celery = None

from kaneda.exceptions import ImproperlyConfigured

from .base import BaseQueue


[docs]class CeleryQueue(BaseQueue): """ Celery queue. :param app: app instance of Celery class. :param broker: broker connection url where Celery will attend the async reporting requests. :param queue_name: name of the queue being used by the Celery worker process. """ settings_namespace = 'CELERY' def __init__(self, app=None, broker=None, queue_name=''): if not Celery: raise ImproperlyConfigured('You need to install the celery library to use Celery queue.') if app: if not isinstance(app, Celery): raise ImproperlyConfigured('"queue" parameter is not an instance of Celery queue.') self.app = app else: self.app = Celery(broker=broker) self.queue_name = queue_name def report(self, name, metric, value, tags, id_): try: return self.app.send_task('kaneda.tasks.celery.report', args=(name, metric, value, tags, id_), queue=self.queue_name) except Exception as e: logger = logging.getLogger(__name__) logger.exception(e)