2014-02-13 7 views
9

Wie implementiere ich einen Mechanismus, der eine Nachricht nach einigen konfigurierbaren Wiederholungsversuchen ablehnt?Node-amqp - Zurückweisung der Nachricht nach X-Versuchen

Mit anderen Worten, wenn ich eine Warteschlange abonniere, möchte ich garantieren, dass die gleiche Nachricht nicht mehr als X mal neu geliefert wird.

Mein Codebeispiel:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { 
    try{ 
    doSomething(data); 
    } catch(e) { 
    message.reject(true); 
    } 
} 
+0

Mögliche duplizieren http://stackoverflow.com/q/17654475 – pinepain

+0

Wie werden Sie Nachrichten eindeutig identifiziert? Haben Sie eine eigene ID in der Nachrichtennutzlast? –

+1

In meinem Fall (aber ich bin nicht das OP) - ja, Nachrichten können durch ihre UUID identifiziert werden. Leider ist es nicht genug, einen einfachen Zähler im Abonnenten zu haben, da ich mehrere Abonnenten in derselben Warteschlange habe, um die Arbeit auszugleichen, und die Anzahl der Wiederholungen sollte global sein, nicht lokal für jeden Arbeiter. –

Antwort

1

Meiner Meinung nach ist die beste Lösung, diese Fehler in Ihrer Anwendung zu behandeln und sie abzulehnen, wenn die App entschieden hat, dass sie die Nachricht nicht verarbeiten kann.

Wenn Sie keine Informationen verlieren möchten, sollte die App die Nachricht nur ablehnen, nachdem sie dieselbe Nachricht an eine Fehlerwarteschlange gesendet hat.

Code nicht getestet:

q.subscribe({ack: true}, function() { 
    var numOfRetries = 0; 
    var args = arguments; 
    var self = this; 
    var promise = doWork.apply(self, args); 
    for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) { 
    promise = promise.fail(function() { return doWork.apply(self, args); }); 
    } 

    promise.fail(function() { 
    sendMessageToErrorQueue.apply(self, args); 
    rejectMessage.apply(self, args); 
    }) 
}) 
0

Eine mögliche Lösung ist die Nachricht hash irgendeine Art von Hash-Funktion verwenden Sie definieren, überprüfen dann ein Cache-Objekt für diesen Hash. Wenn es da ist, füge eins bis zum konfigurierbaren Maximum zum Cache hinzu, und wenn es nicht dort ist, setze es auf 1. Hier ist ein schneller und dreckiger Prototyp für dich (beachte, dass das mcache Objekt im Bereich für alle Abonnenten sein sollte):

var mcache = {}, maxRetries = 3; 

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { 
    var messagehash = hash(message); 
    if(mcache[messagehash] === undefined){ 
    mcache[messagehash] = 0; 
    } 
    if(mcache[messagehash] > maxRetries) { 
    q.shift(true,false); //reject true, requeue false (discard message) 
    delete mcache[messagehash]; //don't leak memory 
    } else { 
    try{ 
     doSomething(data); 
     q.shift(false); //reject false 
     delete mcache[messagehash]; //don't leak memory 
    } catch(e) { 
     mcache[messagehash]++; 
     q.shift(true,true); //reject true, requeue true 
    } 
    } 
} 

Wenn die Nachricht eine GUID hat, können Sie diese in der Hash-Funktion einfach zurückgeben.

+0

Dies führt den Cache als einzigen Fehlerpunkt ein. Ich hätte gerne eine Lösung, die vollständig RabbitMQ-Techniken verwendet. –

+0

In welchem ​​Szenario sehen Sie den Cache fehlgeschlagen? – gcochard

+0

Der Cache (physisch) muss auf jedem Server ausgeführt werden. Was passiert, wenn dieser Server ausfällt? Der Cache muss also hochverfügbar und ausfallsicher sein, was den erforderlichen Aufwand erhöht. Wenn es irgendeine Möglichkeit gäbe, RabbitMQ selbst dafür zu verwenden, würde ich eine solche Lösung sehr bevorzugen. –