6

Ich habe in meinem Projekt angefangen, Spring-Integration-Kafka zu verwenden, und ich kann Nachrichten von Kafka erzeugen und konsumieren. Aber jetzt möchte ich eine Nachricht an eine bestimmte Partition senden und auch eine Nachricht von einer bestimmten Partition konsumieren.spring-integration-kafka config consumer zum Empfangen der Nachricht von der Partition angeben

Beispiel möchte ich Nachricht an Partition 3 erzeugen, und die verbrauchen nur Nachricht von Partition erhalten 3.

Bis jetzt mein Thema 8 Partitionen hat und ich kann Nachricht an bestimmte Partition erzeugen, aber ich habe nicht gefunden den Weg zu konfigurieren, den Verbraucher nur erhalten Nachricht von bestimmten Partition noch.

Also jeden Vorschlag, wie ich den Verbraucher mit Spring-Integration-Kafka, oder irgendetwas anderes mit der KafkaConsumer.java-Klasse zu tun, damit es Nachricht von bestimmten Partition erhalten kann.

Danke.

Hier ist mein Code:

der kafka-Produzent-context.xml

<int:publish-subscribe-channel id="inputToKafka" /> 

<int-kafka:outbound-channel-adapter 
    id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" 
    auto-startup="true" order="1" channel="inputToKafka" /> 
<int-kafka:producer-context id="kafkaProducerContext" 
    producer-properties="producerProps"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration 
      broker-list="127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094" 
      async="true" topic="testTopic" 
      key-class-type="java.lang.String" 
      key-encoder="encoder" 
      value-class-type="java.lang.String" 
      value-encoder="encoder" 
      partitioner="partitioner" 
      compression-codec="default" /> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

<util:properties id="producerProps"> 
    <prop key="queue.buffering.max.ms">500</prop> 
    <prop key="topic.metadata.refresh.interval.ms">3600000</prop> 
    <prop key="queue.buffering.max.messages">10000</prop> 
    <prop key="retry.backoff.ms">100</prop> 
    <prop key="message.send.max.retries">2</prop> 
    <prop key="send.buffer.bytes">5242880</prop> 
    <prop key="socket.request.max.bytes">104857600</prop> 
    <prop key="socket.receive.buffer.bytes">1048576</prop> 
    <prop key="socket.send.buffer.bytes">1048576</prop> 
    <prop key="request.required.acks">1</prop> 
</util:properties> 

<bean id="encoder" 
    class="org.springframework.integration.kafka.serializer.common.StringEncoder" /> 

<bean id="partitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/> 

<task:executor id="taskExecutor" pool-size="5" 
    keep-alive="120" queue-capacity="500" /> 

die KafkaProducer.java

public class KafkaProducer { 

private static final Logger logger = LoggerFactory 
     .getLogger(KafkaProducer.class); 

@Autowired 
private MessageChannel inputToKafka; 

public void sendMessage(String message) { 

    try { 
     inputToKafka.send(MessageBuilder.withPayload(message) 
        .setHeader(KafkaHeaders.TOPIC, "testTopic") 
        .setHeader(KafkaHeaders.PARTITION_ID, 3).build()); 
    } catch (Exception e) { 
     logger.error(String.format(
       "Failed to send [ %s ] to topic %s ", message, topic), 
       e); 
    } 
} 

}

die kafka-Consumer-Kontext .xml

<int:channel id="inputFromKafka"> 
    <int:dispatcher task-executor="kafkaMessageExecutor" /> 
</int:channel> 

<int-kafka:zookeeper-connect id="zookeeperConnect" 
    zk-connect="127.0.0.1:2181" zk-connection-timeout="6000" 
    zk-session-timeout="6000" zk-sync-time="2000" /> 

<int-kafka:inbound-channel-adapter 
    id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" 
    auto-startup="true" channel="inputFromKafka"> 
    <int:poller fixed-delay="10" time-unit="MILLISECONDS" 
     max-messages-per-poll="5" /> 
</int-kafka:inbound-channel-adapter> 


<bean id="consumerProperties" 
    class="org.springframework.beans.factory.config.PropertiesFactoryBean"> 
    <property name="properties"> 
     <props> 
      <prop key="auto.offset.reset">smallest</prop> 
      <prop key="socket.receive.buffer.bytes">1048576</prop> 
      <prop key="fetch.message.max.bytes">5242880</prop> 
      <prop key="auto.commit.interval.ms">1000</prop> 
     </props> 
    </property> 
</bean> 

<int-kafka:consumer-context id="consumerContext" 
    consumer-timeout="1000" zookeeper-connect="zookeeperConnect" 
    consumer-properties="consumerProperties"> 
    <int-kafka:consumer-configurations> 
     <int-kafka:consumer-configuration 
      group-id="defaultGrp" max-messages="20000"> 
      <int-kafka:topic id="testTopic" streams="3" /> 
     </int-kafka:consumer-configuration> 
    </int-kafka:consumer-configurations> 
</int-kafka:consumer-context> 

<task:executor id="kafkaMessageExecutor" pool-size="0-10" 
    keep-alive="120" queue-capacity="500" /> 

<int:outbound-channel-adapter channel="inputFromKafka" 
    ref="kafkaConsumer" method="processMessage" /> 

die KafkaConsumer.java

public class KafkaConsumer { 

private static final Logger log = LoggerFactory 
     .getLogger(KafkaConsumer.class); 

@Autowired 
KafkaService kafkaService; 

public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) { 
    for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs 
      .entrySet()) { 
     log.debug("Topic:" + entry.getKey()); 
     ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry 
       .getValue(); 
     log.debug("\n**** Partition: \n"); 
     Set<Integer> keys = messages.keySet(); 
     for (Integer i : keys) 
      log.debug("p:"+i); 
     log.debug("\n**************\n"); 
     Collection<List<byte[]>> values = messages.values(); 
     for (Iterator<List<byte[]>> iterator = values.iterator(); iterator 
       .hasNext();) { 
      List<byte[]> list = iterator.next(); 
      for (byte[] object : list) { 
       String message = new String(object); 
       log.debug("Message: " + message); 
       try { 
        kafkaService.receiveMessage(message); 
       } catch (Exception e) { 
        log.error(String.format("Failed to process message %s", 
          message)); 
       } 
      } 
     } 

    } 
} 
} 

So ist mein Problem hier. Wenn ich eine Nachricht auf Partition 3 oder einer Partition erstelle, erhält der Kafka Consumer immer die Nachricht. Alles was ich will ist: Der KafkaConsumer wird nur Nachrichten von Partition 3 empfangen, nicht von anderen Partitionen.

Nochmals vielen Dank.

+0

Ich bin auf ein Problem wie Sie gestoßen. Hast du irgendeine Lösung gefunden? – user3359139

+0

Ich möchte auch Kafka mit Spring integrieren, können Sie bitte alle Repo teilen, wo ich Arbeitscodes herunterladen kann – Sankalp

Antwort

1

Sie müssen die message-driven-channel-adapter verwenden.

Als Variante kann die KafkaMessageListenerContainer org.springframework.integration.kafka.core.Partition Array-Argument akzeptieren Themen und die Partitionen Paar angeben.

Sie benötigen einen Hörer Behälter verkabeln, this constructor verwenden und es mit dem Adapter bieten die listener-container Attribut.

Wir aktualisieren die Readme mit einem Beispiel.

+0

Danke Gary, ich versuche es zu tun und warte zum Beispiel. –

+0

Hallo Gary, es scheint der Message-Driven-Channel-Adapter verwendet den Kafka SimpleConsumer. Meine Frage ist also: Kann ich den Consumer so konfigurieren, dass er eine Nachricht von einer bestimmten Partition in Kafka High-Level Consumer empfängt? Weil ich mehrere Verbraucher habe. Vielen Dank. –

+0

Nein; Um bestimmte Themen auszuwählen, benötigen Sie den Message-Driven Adapter. –