2015-05-03 8 views

Antwort

6

Verpflichtet commitOffsets auf der High-Level-Consumer-Block, bis Offsets erfolgreich festgeschrieben werden?

Es sieht aus wie commitOffsets() Schleifen durch jeden Verbraucher und ruft updatePersistentPath wenn seine Offset geändert hat, und wenn ja, schreibt Daten über zkClient.writeData(path, getBytes(data)). Es scheint jedoch, dass commitOffsets()Block blockiert, bis alle Offsets festgeschrieben sind.

Hier ist der Quellcode für commitOffsets() (ref):

public void commitOffsets() { 
    if (zkClient == null) { 
     logger.error("zk client is null. Cannot commit offsets"); 
     return; 
    } 
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) { 
     ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey()); 
     for (PartitionTopicInfo info : e.getValue().values()) { 
      final long lastChanged = info.getConsumedOffsetChanged().get(); 
      if (lastChanged == 0) { 
       logger.trace("consume offset not changed"); 
       continue; 
      } 
      final long newOffset = info.getConsumedOffset(); 
      //path: /consumers/<group>/offsets/<topic>/<brokerid-partition> 
      final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName(); 
      try { 
       ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset); 
      } catch (Throwable t) { 
       logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t); 
      } finally { 
       info.resetComsumedOffsetChanged(lastChanged); 
       if (logger.isDebugEnabled()) { 
        logger.debug("Committed [" + path + "] for topic " + info); 
       } 
      } 
     } 
    } 
} 

und für updatePersistentPath(...) (ref):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) { 
    try { 
     zkClient.writeData(path, getBytes(data)); 
    } catch (ZkNoNodeException e) { 
     createParentPath(zkClient, path); 
     try { 
      zkClient.createPersistent(path, getBytes(data)); 
     } catch (ZkNodeExistsException e2) { 
      zkClient.writeData(path, getBytes(data)); 
     } 
    } 
}