Ein Projekt, an dem ich arbeite, erfordert das Lesen von Nachrichten von SQS, und ich beschloss, Akka zu verwenden, um die Verarbeitung dieser Nachrichten zu verteilen.Consumer Poll Rate mit Akka, SQS und Camel
Da SQS von Camel unterstützt wird und die Funktionalität für Akka in der Consumer-Klasse integriert ist, dachte ich, es wäre am besten, den Endpunkt zu implementieren und Nachrichten auf diese Weise zu lesen, obwohl ich nicht viele Beispiele dafür gesehen habe Leute, die das tun.
Mein Problem ist, dass ich meine Warteschlange nicht schnell genug abfragen kann, um meine Warteschlange leer oder fast leer zu halten. Was ich ursprünglich dachte war, dass ich einen Consumer dazu bringen konnte, Nachrichten über Camel von SQS mit einer Rate von X/s zu empfangen. Von dort konnte ich einfach mehr Consumers erstellen, um die Geschwindigkeit zu erreichen, mit der ich die verarbeiteten Nachrichten benötigte.
Mein Verbraucher:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
Wie gezeigt, habe ich delay=1
sowie &maxMessagesPerPoll=10
setze die Rate von Nachrichten zu verbessern, aber ich bin nicht in der Lage mehr Verbraucher mit dem gleichen Endpunkt, um zu laichen.
las ich in der Dokumentation, dass By default endpoints are assumed not to support multiple consumers.
und ich glaube, dass dies auch für die SQS-Endpunkte gilt, als mehrere Verbraucher Laichen wird mir nur einen Verbraucher geben, wo nach dem System für eine Minute ausgeführt wird, ist die Ausgangsnachricht Count for actor: x
statt dem andere, die Count for actor: 0
ausgeben.
Wenn das überhaupt sinnvoll ist; Ich kann ungefähr 33 Nachrichten/Sekunde mit dieser aktuellen Implementierung auf dem einzelnen Verbraucher lesen.
Ist dies der richtige Weg, Nachrichten aus einer SQS-Warteschlange in Akka zu lesen? Wenn ja, kann ich das nach außen skalieren, so dass ich meine Nachrichtenverbrauchsrate näher an die von 900 Nachrichten/Sekunde erhöhen kann?