2014-11-18 5 views
5

Bei der Suche nach, wie ein Kafka Thema über die API zu erstellen, fand ich dieses Beispiel in Scala:Wie erstelle ich Kafka ZKStringSerializer in Java?

import kafka.admin.AdminUtils 
import kafka.utils.ZKStringSerializer 
import org.I0Itec.zkclient.ZkClient 

// Create a ZooKeeper client 
val sessionTimeoutMs = 10000 
val connectionTimeoutMs = 10000 
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, 
          connectionTimeoutMs, ZKStringSerializer) 

// Create a topic with 8 partitions and a replication factor of 3 
val topicName = "myTopic" 
val numPartitions = 8 
val replicationFactor = 3 
val topicConfig = new Properties 
AdminUtils.createTopic(zkClient, topicName, 
         numPartitions, replicationFactor, topicConfig) 

Quelle: https://stackoverflow.com/a/23360100/871012

Die letzten arg ZKStringSerializer offenbar ein Scala-Objekt ist. Es ist mir nicht klar, wie dieses Beispiel in Java funktioniert.

Dieser Beitrag How to create a scala object in clojure fragt die gleiche Frage in Clojure und die Antwort war:

ZKStringSerializer$/MODULE$ 

die in Java würde (glaube ich) übersetzen zu:

ZKStringSerializer$.MODULE$ 

Aber wenn ich versuche, dass (oder eine beliebige Anzahl anderer Variationen) kompiliert keiner von ihnen.
Der Übersetzungsfehler ist:

KafkaTopicCreator.java:[16,18] cannot find symbol 
symbol: variable ZKStringSerializer$ 
location: class org.sample.KafkaTopicCreator 

I kafka_2.9.2-0.8.1.1 und Java bin mit 8.

Antwort

17

für Java die folgende versuchen,

Ersteinfuhr unter Anweisung

import kafka.utils.ZKStringSerializer$; 

Erstellen Sie ein Objekt für den ZkClient auf folgende Weise:

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181" 
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$); 
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

Der obige Code wird für kafka> 0.9 nicht funktionieren, da die api geändert wurde, Verwenden den Code unten für kafka> 0,9

import java.util.Properties; 
import kafka.admin.AdminUtils; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 
import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.ZkConnection; 

public class KafkaTopicCreationInJava 
{ 
    public static void main(String[] args) throws Exception { 
     ZkClient zkClient = null; 
     ZkUtils zkUtils = null; 
     try { 
      String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; 
      int sessionTimeOutInMs = 15 * 1000; // 15 secs 
      int connectionTimeOutInMs = 10 * 1000; // 10 secs 

      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

      String topicName = "testTopic"; 
      int noOfPartitions = 2; 
      int noOfReplication = 3; 
      Properties topicConfiguration = new Properties(); 

      AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); 

     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } finally { 
      if (zkClient != null) { 
       zkClient.close(); 
      } 
     } 
    } 
}