2016-07-29 18 views
1

Wir beobachten ein seltsames Verhalten mit unserem Servicetest und Embedded Kafka.Spring Kafka, Testen mit Embedded Kafka

Der Test ist ein Spock-Test, wir die JUnit-Regel KafkaEmbedded verwenden und brokersAsString propagieren wie folgt:

@ClassRule 
@Shared 
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1) 

@Autowired 
KafkaListenerEndpointRegistry endpointRegistry 

def setupSpec() { 
    System.setProperty("kafka.bootstrapServers", embeddedKafka.getBrokersAsString()) 
} 

aus dem Code of KafkaEmbedded Inspektion, eine Instanz mit KafkaEmbedded(int count) führt zu einer Kafka-Server mit zwei Partitionen Konstruktion pro Thema.

Um Probleme bei der Partitionszuweisung und der Server-Client-Synchronisierung im Test zu beheben, folgen wir der Strategie in der ContainerTestUtils-Klasse von spring-kafka.

public static void waitForAssignment(KafkaMessageListenerContainer<String, String> container, int partitions) 
     throws Exception { 

     log.info(
      "Waiting for " + container.getContainerProperties().getTopics() + " to connect to " + partitions + " " + 
       "partitions.") 

     int n = 0; 
     int count = 0; 
     while (n++ < 600 && count < partitions) { 
      count = 0; 
      container.getAssignedPartitions().each { 
       TopicPartition it -> 
        log.info(it.topic() + ":" + it.partition() + "; ") 
      } 

      if (container.getAssignedPartitions() != null) { 
       count = container.getAssignedPartitions().size(); 
      } 
      if (count < partitions) { 
       Thread.sleep(100); 
      } 
     } 
    } 

Wenn wir beobachten die Protokolle wir folgendes Muster feststellen:

2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {staggering=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {moa=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.696 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {staggering=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.699 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {moa=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.699 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.807 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.811 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {staggering=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:02.812 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {moa=LEADER_NOT_AVAILABLE} 
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:03.602 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : SyncGroup for group timeslot-service-group-06x failed due to coordinator rebalance, rejoining the group 
2016-07-29 11:24:03.637 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[] 
2016-07-29 11:24:03.637 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[] 
2016-07-29 11:24:04.065 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0] 
2016-07-29 11:24:04.066 INFO 1160 --- [   main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 50810 (http) 
2016-07-29 11:24:04.073 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : Started AllocationsDeliveryZonesServiceSpec in 20.616 seconds (JVM running for 25.456) 
2016-07-29 11:24:04.237 INFO 1160 --- [   main] org.eclipse.jetty.server.Server   : jetty-9.2.17.v20160517 
2016-07-29 11:24:04.265 INFO 1160 --- [   main] o.e.jetty.server.handler.ContextHandler : Started [email protected]{/__admin,null,AVAILABLE} 
2016-07-29 11:24:04.270 INFO 1160 --- [   main] o.e.jetty.server.handler.ContextHandler : Started [email protected]{/,null,AVAILABLE} 
2016-07-29 11:24:04.279 INFO 1160 --- [   main] o.eclipse.jetty.server.ServerConnector : Started [email protected]{HTTP/1.1}{0.0.0.0:50811} 
2016-07-29 11:24:04.430 INFO 1160 --- [   main] o.eclipse.jetty.server.ServerConnector : Started [email protected]{SSL-http/1.1}{0.0.0.0:50812} 
2016-07-29 11:24:04.430 INFO 1160 --- [   main] org.eclipse.jetty.server.Server   : Started @25813ms 
2016-07-29 11:24:04.632 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : waiting... 
2016-07-29 11:24:04.662 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : Waiting for [moa] to connect to 2 partitions.^ 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0] 
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[moa-0] 
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[deliveryZipCode_v1-0] 
2016-07-29 11:24:13.740 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
[...] 
2016-07-29 11:24:16.644 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
2016-07-29 11:24:16.666 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[staggering-0] 
2016-07-29 11:24:16.750 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
[...] 
2016-07-29 11:24:23.559 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
2016-07-29 11:24:23.660 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:23.660 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group. 
2016-07-29 11:24:23.662 INFO 1160 --- [   main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0; 
2016-07-29 11:24:23.686 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[moa-0] 
2016-07-29 11:24:23.686 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[deliveryZipCode_v1-0] 
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[moa-0] 
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0] 
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[deliveryZipCode_v1-0] 

Bitte beachten Sie auch die [..] Hinweislinien weggelassen

Wir setzen metadata.max.age.ms-3.000 ms Als Ergebnis es versucht, die Metadaten häufig zu aktualisieren.

Was uns jetzt verwirrt ist, dass, wenn wir warten, bis zwei Partitionen verbinden, die Wartezeit ausläuft. Nur wenn wir auf die Verbindung einer Partition warten, läuft nach einiger Zeit alles erfolgreich.

Haben wir den Code falsch verstanden, dass es im eingebetteten Kafka zwei Partitionen pro Thema gibt? Ist es normal, dass nur einer unseren Zuhörern zugewiesen wird?

Antwort

0

Ich kann nicht die Flockigkeit erklären, die Sie sehen; Ja, jedes Thema erhält standardmäßig 2 Partitionen. Ich habe gerade einen der Rahmen-Container-Tests durchgeführt und sehe dies ...

09:24:06.139 INFO [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions revoked:[] 
09:24:06.611 INFO [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions assigned:[testTopic3-1, testTopic3-0] 
+0

Die Flockigkeit, die wir ursprünglich sahen, war über Timing und Zuordnung von Themen zu den Verbrauchern. Grundsätzlich was die (https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java) Adressen. Über die Partitionen habe ich herausgefunden, was mich verwirrt. Die konfigurierte Anzahl von Partitionen aus dem Konstruktor wird in der before() -Methode verwendet, wenn Themen vordefiniert sind. Wenn die Themen implizit erstellt werden, wird 1 als Standard für die Anzahl der Partitionen verwendet. –