Source code for kaneda.queues.zmq

from __future__ import absolute_import

import logging

try:
    import zmq
except ImportError:
    zmq = None

from kaneda.exceptions import ImproperlyConfigured

from .base import BaseQueue


[docs]class ZMQQueue(BaseQueue): """ ZeroMQ queue :param connection_url: ZMQ connection url (tcp://127.0.0.1:5555). :param timeout: ZMQ socket timeout (milliseconds). """ settings_namespace = 'ZMQ' def __init__(self, connection_url, timeout=300): if not zmq: raise ImproperlyConfigured('You need to install pyzmq to use the ZMQ queue.') context = zmq.Context() self.socket = context.socket(zmq.PUSH) self.socket.SNDTIMEO = timeout self.socket.bind(connection_url) def report(self, name, metric, value, tags, id_): payload = locals() del payload['self'] try: return self.socket.send_json(payload) except Exception as e: logger = logging.getLogger(__name__) logger.exception(e)