Wir haben ein node.js-Skript, das einen socket.io-Server ausführt, dessen Clients Nachrichten von einer RabbitMQ-Warteschlange verarbeiten. Wir haben kürzlich zu Amazon AWS migriert und RabbitMQ ist jetzt ein Cluster von zwei Rechnern (redundante Instanzen). Die AMQP-Verbindung geht von Zeit zu Zeit verloren (dies ist eine Einschränkung, die von einer Hochverfügbarkeitsumgebung mit redundanten VMs kommt und wir müssen damit umgehen). Wenn ein Versuch unternommen wird, die Verbindung wiederherzustellen, wählt der DNS, mit welcher Instanz er sich verbinden soll (Es handelt sich um einen Cluster mit Datenreplikation, sodass es nicht darauf ankommt, mit welcher Instanz eine Verbindung hergestellt werden soll.amqp.node erkennt keinen Verbindungsabbruch
Das Problem ist, dass der Versuch, die Verbindung wiederherzustellen, nie gemacht wird; Nach einer Weile, wenn die Verbindung unterbrochen wird, bemerkt amqp.node anscheinend nicht, dass die Verbindung unterbrochen wurde. Außerdem hören die Kunden auf, Nachrichten zu empfangen, und der Server socket.io hört einfach auf, neue Verbindungen zu akzeptieren.
Wir haben eine Heartbeat-Timeout von 55 Sekunden (nicht mit der Heartbeat-Timeout socket.io zu verwechseln) bei der RabbitMQ-URL und die Suche nach 'Fehler' und 'schließen' Ereignisse mit Callback-API von amqp.node, aber sie sind anscheinend nie ausgestellt. Die Warteschlangen erwarten, dass die konsumierten Nachrichten bestätigt werden. Wir möchten, dass das Knotenscript eine verlorene Verbindung erkennt und sich selbst beendet, sodass die Umgebung automatisch einen neuen Prozess startet und eine Verbindung wieder herstellt.
Hier ist der Code, vielleicht machen wir etwas falsch mit der amqp.node Callback API oder etwas anderes.
var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red = '\033[31m';
blue = '\033[34m';
green = '\033[32m';
magenta = '\033[35m';
orange = '\033[43m';
reset = '\033[0m';
var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:[email protected]?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;
console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds.");
io.set('heartbeat interval', 10 * 60);
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients.");
console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds.");
io.set('heartbeat timeout', 11 * 60);
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds.");
io.sockets.on('connection', function(socket){
socket.on('error', function (exc) {
console.log(orange+"Ignoring exception: " + exc + reset);
});
socket.on('send-indice', function (data) {
// Some business logic
});
socket.on('disconnect', function() {
// Some business logic
});
});
function updatecli(data){
// Some business logic
}
amqp.connect(urlRabbit, null, function(err, conn) {
if (err !== null) {
return console.log("Error creating connection: " + err);
}
conn.on('error', function(err) {
console.log("Generated event 'error': " + err);
});
conn.on('close', function() {
console.log("Connection closed.");
process.exit();
});
processRabbitConnection(conn, function() {
conn.close();
});
});
function processRabbitConnection(conn, finalize) {
conn.createChannel(function(err, channel) {
if (err != null) {
console.log("Error creating channel: " + err);
return finalize();
}
channel.assertQueue(queue, null, function(err, ok) {
if (err !== null) {
console.log("Error asserting queue " + queue + ": " + err);
return finalize();
}
channel.consume(queue, function (msg) {
if (msg !== null) {
try {
var dataObj = JSON.parse(msg.content);
if (debug == true) {
//console.log(dataObj);
}
updatecli(dataObj);
} catch(err) {
console.log("Error in JSON: " + err);
}
channel.ack(msg);
}
}, null, function(err, ok) {
if (err !== null) {
console.log("Error consuming message: " + err);
return finalize();
}
});
});
});
}
serverio.listen(9128, function() {
console.log('Server: Socket IO Online - Port: 9128 - ' + new Date());
});