Ich nehme Nachrichten von Amazon SQS Warteschlange. Ich habe Tausende von Nachrichten in der Warteschlange. Wenn ich die Anwendung starte (geschrieben in Java mit Springframework), fängt es an, Nachrichten von der Warteschlange abzufragen und nach dem Empfang von 500 Nachrichten stoppt es. Wenn ich die Anwendung erneut starte, verbraucht sie weitere 500 Nachrichten.Amazon SQS Java SDK stoppt nach dem Konsumieren von 500 Nachrichten
Mein Code ist wie ...
Anschluss Fabrik
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("3-15");
factory.setReceiveTimeout(3000L);
return factory;
}
@Bean(name = "sqsJmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory());
factory.setConcurrency("3-15");
factory.setReceiveTimeout(3000L);
return factory;
}
Listener
@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue")
public void onMessage(Message message) {
//Processing message
}
Ist alles, was ich brauche in amazon Warteschlange oder in Verbindung Fabrik Bohne konfigurieren ?
Danke :-)
Aktualisiert: Hinzugefügt Thread-Dump
Während Anwendung verbraucht Nachrichten
DefaultMessageListenerContainer in Thread-Dump wird wie
"[email protected]" prio=5 tid=0x18 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
- locked <0x2230> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x2231> (a sun.security.ssl.AppInputStream)
at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291)
at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1340)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:127)
at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:33)
at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:42)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:477)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:410)
at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:157)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
at java.lang.Thread.run(Thread.java:745)
ConsumerPrefetchThread in Thread-Dump wie
"[email protected]" daemon prio=5 tid=0x1b nid=NA runnable
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
- locked <0x23a7> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x23a8> (a sun.security.ssl.AppInputStream)
at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291)
at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1021)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:319)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:216)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:180)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
wenn Anwendung stoppt Nachrichten
ConsumerPrefetchThread in Thread-Dump raubend ist wie
"[email protected]" prio=5 tid=0x18 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151)
at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133)
at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167)
at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40)
at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117)
at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40)
at sun.reflect.GeneratedMethodAccessor240.invoke(Unknown Source:-1)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:185)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:104)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:90)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:66)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
at java.lang.Thread.run(Thread.java:745)
ConsumerPrefetchThread in Thread-Dump ist wie
"[email protected]" daemon prio=5 tid=0x1b nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.waitForPrefetch(SQSMessageConsumerPrefetch.java:273)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:174)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Versuchen Sie, einen Thread Dump zu nehmen, wenn Ihr Verbraucher stoppt, um zu sehen, was die Container-Threads tun. Vielleicht ist es in Ihrem Code stecken geblieben? –
@GaryRussell Ich habe Thread Dump hinzugefügt. In dieser Anwendung verwende ich auch ActiveMQ. Ich dränge Nachrichten, die ich von SQS zu den ActiveMQ-Warteschlangen bekommen werde. – Piyush