Es ist derzeit schwierig, den Selleriearbeitern benutzerdefinierte Benutzer hinzuzufügen, aber dies ändert sich in der Entwicklungsversion (3.1), wo ich Unterstützung für Consumer-Bootschritte hinzugefügt habe.
Es gibt keine Dokumentation noch, wie ich es gerade beendet habe Umsetzung, aber hier ist ein Beispiel:
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body,))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
Beachten Sie, dass der API in der finalen Version ändern kann, ist eine Sache, die ich noch nicht sicher bin ungefähr ist, wie Kanäle nach get_consumer(connection)
gehandhabt werden. Derzeit ist der Kanal des Verbrauchers geschlossen, wenn die Verbindung unterbrochen wird, und beim Herunterfahren , aber die Leute möchten vielleicht Kanäle manuell behandeln. In diesem Fall besteht immer die Möglichkeit ConsumerStep anzupassen oder einen neuen StartStopStep zu schreiben.
Die Dokumentation kann jetzt unter http://cellery.readthedocs.org/en/latest/userguide/extending.html gefunden werden –