2014-04-30 5 views
5

Ich benutze pika.BlockingConnection in einem Verbraucher, der einige Aufgaben für jede Nachricht ausführt. Ich habe auch die Signalverarbeitung hinzugefügt, so dass der Verbraucher nach vollständiger Ausführung aller Aufgaben ordnungsgemäß stirbt.Signalverarbeitung pika/python

Während Nachricht verarbeitet wird und Signal empfangen wird, bekomme ich nur "signal received" von der Funktion, aber der Code wird nicht beendet. Also entschied ich mich, nach dem Signal zu suchen, das am Ende der Callback-Funktion empfangen wurde. Die Frage ist, wie oft ich nach dem Signal suche, da es in diesem Code viel mehr Funktionen geben wird. Gibt es einen besseren Weg, mit Signalen umzugehen, ohne die Dinge zu übertreiben?

import signal 
import sys 
import pika 
from time import sleep 

received_signal = False 
all_over = False 

def signal_handler(signal, frame): 
    global received_signal 
    print "signal received" 
    received_signal = True 

signal.signal(signal.SIGINT, signal_handler) 
signal.signal(signal.SIGTERM, signal_handler) 

mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test')) 
mq_channel = mq_connection.channel() 

def callback(ch, method, properties, body): 
    if received_signal: 
     print "Exiting, as a kill signal is already received" 
     exit(0) 
    print body 
    sleep(50) 
    mq_channel.basic_ack(delivery_tag=method.delivery_tag) 
    print "Message consumption complete" 

    if received_signal: 
     print "Exiting, as a kill signal is already received" 
     exit(0) 

try: 
    print ' [*] Waiting for messages. To exit press CTRL+C' 
    mq_channel.basic_consume(callback, queue='test') 
    mq_channel.start_consuming() 
except Exception: 
    mq_channel.close() 
    exit() 

Dies ist meine erste Frage hier, also lassen Sie mich wissen, wenn weitere Details erforderlich sind.

+0

Ihr aktueller Code wird SIGTERM oder SIGINT schlucken, bis die nächste Nachricht über die Warteschlange empfangen wird. An diesem Punkt sollte sie beendet werden. Willst du das wirklich? Warum sollte die 'signal_handler' Methode nicht direkt' sys.exit (0) 'aufrufen? – dano

+1

Ich möchte die Signalbehandlung auf zwei Arten erfolgen: 1) Während auf Nachrichten warten, sollte es nur sterben 2) Während des Verbrauchs einer Nachricht sollte es aktuelle Arbeit zu beenden, und dann sterben. Mein aktueller Code enthält die zweite Bedingung, aber nicht die erste. Das ist das Problem. Wäre das überhaupt möglich? – user3295878

+0

Ja, das ist möglich. Ich werde eine Antwort hinzufügen. – dano

Antwort

3

Ich denke, das tut, was Sie suchen:

#!/usr/bin/python 

import signal 
import sys 
import pika 
from contextlib import contextmanager 

received_signal = False 
processing_callback = False 

def signal_handler(signal, frame): 
    global received_signal 
    print "signal received" 
    received_signal = True 
    if not processing_callback: 
     sys.exit() 

signal.signal(signal.SIGINT, signal_handler) 
signal.signal(signal.SIGTERM, signal_handler) 

@contextmanager 
def block_signals(): 
    global processing_callback 
    processing_callback = True 
    try: 
     yield 
    finally: 
     processing_callback = False 
     if received_signal: 
      sys.exit() 

def callback(ch, method, properties, body): 
    with block_signals: 
     print body 
     sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't. 
     mq_channel.basic_ack(delivery_tag=method.delivery_tag) 
     print "Message consumption complete" 

if __name__ == "__main__":  
    try: 
     mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     mq_channel = mq_connection.channel() 
     print ' [*] Waiting for messages. To exit press CTRL+C' 
     mq_channel.basic_consume(callback, queue='test') 
     mq_channel.start_consuming() 
    except Exception as e: 
     mq_channel.close() 
     sys.exit() 

ich contextmanager die Signale zu verarbeiten verwendet zu blockieren, so dass die gesamte Logik außerhalb des Callback entfernt selbst verborgen ist. Dies sollte auch die Wiederverwendung des Codes erleichtern. Nur um zu verdeutlichen, wie es funktioniert, ist es gleichbedeutend mit:

def callback(ch, method, properties, body): 
    global processing_callback 
    processing_callback = True 
    try: 
     print body 
     sum(xrange(0, 200050000)) 
     mq_channel.basic_ack(delivery_tag=method.delivery_tag) 
     print "Message consumption complete" 
    finally: 
     processing_callback = False 
     if received_signal: 
      sys.exit() 
+0

Verwenden Sie Xrange statt Reichweite, oder Ihr Speicher wird in die Luft gehen und es beginnt zu schlagen, wie es auf die Festplatte ausgibt. – Dunk

+0

@Dunk, danke, behoben. – dano