2015-11-17 6 views
9

Ich habe einige JUnit-Tests für Code, der ein Kafka-Thema verwendet. Die gespielten Kafka-Themen, die ich ausprobiert habe, funktionieren nicht und die online gefundenen Beispiele sind sehr alt, daher funktionieren sie auch nicht mit 0.8.2.1. Wie erstelle ich ein Mock-Kafka-Thema mit 0.8.2.1?Wie kann ich ein Mock Kafka Topic für Junit-Tests instanziieren?

Um zu verdeutlichen: Ich entscheide mich dafür, eine tatsächlich eingebettete Instanz des Themas zu verwenden, um mit einer realen Instanz zu testen, anstatt die Übergabe in mockito zu verspotten. Dies ist der Fall, damit ich testen kann, ob meine benutzerdefinierten Encoder und Decoder tatsächlich funktionieren und nicht fehlschlagen, wenn ich eine echte Kafka-Instanz verwende.

Antwort

6

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

Dieses Beispiel wurde aktualisiert in der neuen Version 0.8.2.2 zu arbeiten. Hier ist der Code snippit mit Maven Abhängigkeiten:

pom.xml:

<dependencies> 
<dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>4.12</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
</dependencies> 

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.junit.Test; 
import kafka.admin.TopicCommand; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.producer.KeyedMessage; 
import kafka.producer.Producer; 
import kafka.producer.ProducerConfig; 
import kafka.server.KafkaConfig; 
import kafka.server.KafkaServer; 
import kafka.utils.MockTime; 
import kafka.utils.TestUtils; 
import kafka.utils.TestZKUtils; 
import kafka.utils.Time; 
import kafka.utils.ZKStringSerializer$; 
import kafka.zk.EmbeddedZookeeper; 
import static org.junit.Assert.*; 

/** 
* For online documentation 
* see 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
*/ 
public class KafkaProducerTest { 

    private int brokerId = 0; 
    private String topic = "test"; 

    @Test 
    public void producerTest() throws InterruptedException { 

     // setup Zookeeper 
     String zkConnect = TestZKUtils.zookeeperConnect(); 
     EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); 
     ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); 

     // setup Broker 
     int port = TestUtils.choosePort(); 
     Properties props = TestUtils.createBrokerConfig(brokerId, port, true); 

     KafkaConfig config = new KafkaConfig(props); 
     Time mock = new MockTime(); 
     KafkaServer kafkaServer = TestUtils.createServer(config, mock); 

     String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"}; 
     // create topic 
     TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments)); 

     List<KafkaServer> servers = new ArrayList<KafkaServer>(); 
     servers.add(kafkaServer); 
     TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000); 

     // setup producer 
     Properties properties = TestUtils.getProducerConfig("localhost:" + port); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     Producer producer = new Producer(producerConfig); 

     // setup simple consumer 
     Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1); 
     ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); 

     // send message 
     KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8)); 

     List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); 
     messages.add(data); 

     producer.send(scala.collection.JavaConversions.asScalaBuffer(messages)); 
     producer.close(); 

     // deleting zookeeper information to make sure the consumer starts from the beginning 
     // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka 
     zkClient.delete("/consumers/group0"); 

     // starting consumer 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, 1); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 

     if(iterator.hasNext()) { 
      String msg = new String(iterator.next().message(), StandardCharsets.UTF_8); 
      System.out.println(msg); 
      assertEquals("test-message", msg); 
     } else { 
      fail(); 
     } 

     // cleanup 
     consumer.shutdown(); 
     kafkaServer.shutdown(); 
     zkClient.close(); 
     zkServer.shutdown(); 
    } 
} 

Seien Sie sicher, Ihre mvn Abhängigkeit zu überprüfen: Baum für alle widerstreitenden Bibliotheken. Ich hatte Ausschlüsse für slf und log4j hinzuzufügen:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

Eine weitere Option, die ich bin auf der Suche in wird mit Apache Kurator: Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-test</artifactId> 
    <version>2.2.0-incubating</version> 
    <scope>test</scope> 
</dependency> 

TestingServer zkTestServer; 

@Before 
public void startZookeeper() throws Exception { 
    zkTestServer = new TestingServer(2181); 
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000)); 
} 

@After 
public void stopZookeeper() throws IOException { 
    cli.close(); 
    zkTestServer.stop(); 
} 
+0

können Sie Code für die Version 0.11.0.2 bereitstellen. Der obige Code funktioniert nicht – dhroove

2

Haben Sie versucht, Kafka-Konsumobjekte mit einem spöttischen Framework wie Mockito zu verspotten?

+0

ich eher eine Mock-Version von kafka haben würde, damit ich weiß, die Produzenten und Konsumenten arbeiten damit. Es gibt einige Beispiele hier und da online (zB: https://transilberman.wordpress.com/2013/07/19/how-to-unit-test-kafka). Sie sind jedoch für ältere Versionen, so dass es mit 0.8.2.1 nicht mehr funktioniert. – Chip