2015-01-12 9 views
7

Ich muss ungefähr 250 numerische Werte pro Sekunde speichern, pro Kunde, was ungefähr 900k Zahlen pro Stunde ist. Es wird wahrscheinlich keine ganztägige Aufzeichnung sein (wahrscheinlich zwischen 5-10 Stunden pro Tag), aber ich werde meine Daten basierend auf der Client-ID und dem Tag, an dem das Lesen gemacht wird, partitionieren. Die maximale Zeilenlänge liegt bei ca. 22-23M, was noch überschaubar ist. Neverteless, mein Schema sieht wie folgt aus:Cassandra-Cluster mit schlechter Einfügeleistung und Stabilität der Einfügung

CREATE TABLE measurement (
    clientid text, 
    date text, 
    event_time timestamp, 
    value int, 
    PRIMARY KEY ((clientid,date), event_time) 
); 

Der Schlüsselraum einen Replikationsfaktor von 2, nur zum Testen, der Schnatz ist GossipingPropertyFileSnitch und NetworkTopologyStrategy. Ich weiß, dass Replikationsfaktor 3 mehr Produktionsstandard ist.

Als nächstes habe ich einen kleinen Cluster auf den Unternehmen Servern, drei Bare-Metal-virtualisierten Maschinen mit 2 CPUs x 2 Kerne und 16 GB RAM und viel Platz geschaffen. Ich bin in Gigabit LAN mit ihnen. Der Cluster ist funktionsfähig und basiert auf dem nodetool.

Hier ist der Code, den ich mit meinem Setup testen:

 Cluster cluster = Cluster.builder() 
       .addContactPoint("192.168.1.100") 
       .addContactPoint("192.168.1.102") 
       .build(); 
     Session session = cluster.connect(); 
     DateTime time = DateTime.now(); 
     BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true); 

    try { 

     ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts 

     String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)"; 
     PreparedStatement preparedStatement = session.prepare(insertQuery); 
     BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also 

     //generating the entries 
     for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements 
      time = time.plus(4); //4ms between each entry 
      BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important 
      batch.add(bound); 

      //The batch statement must have 65535 statements at most 
      if (batch.size() >= 65534) { 
       queryQueue.put(batch); 
       batch = new BatchStatement(); 
      } 
     } 
     queryQueue.put(batch); //the last batch, perhaps shorter than 65535 

     //storing the data 
     System.out.println("Starting storing"); 
     while (!queryQueue.isEmpty()) { 
      pool.execute(() -> { 
       try { 

        long threadId = Thread.currentThread().getId(); 
        System.out.println("Started: " + threadId); 
        BatchStatement statement = queryQueue.take(); 
        long start2 = System.currentTimeMillis(); 
        session.execute(statement); 
        System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2)); 
       } catch (Exception ex) { 
        System.out.println(ex.toString()); 
       } 
      }); 

     } 
     pool.shutdown(); 
     pool.awaitTermination(120,TimeUnit.SECONDS); 


    } catch (Exception ex) { 
     System.out.println(ex.toString()); 
    } finally { 
     session.close(); 
     cluster.close(); 
    } 

ich mit dem Code kam durch Beiträge hier und auf anderen Blogs und Websites zu lesen. Wie ich verstanden habe, ist es wichtig, dass der Client mehrere Threads verwendet, deshalb habe ich das getan. Ich habe auch versucht, asynchrone Operationen zu verwenden.

Das Endergebnis ist dies, egal, welchen Ansatz ich verwende, ein Batch führt in 5-6 Sekunden, obwohl es bis zu 10 dauern kann. Es nimmt das gleiche, wenn ich nur eine Charge eingeben (also nur ~ 65k Spalten) oder wenn ich eine dumme Single-Thread-Anwendung verwende. Ehrlich gesagt, habe ich ein bisschen mehr erwartet. Zumal ich mit einer lokalen Instanz mehr oder weniger ähnliche Leistung auf meinem Laptop bekomme.

Das zweite, vielleicht wichtigere Problem, sind die Ausnahmen, denen ich in einer unvorhersehbaren Weise gegenüberstehe. Diese beiden:

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra Timeout bei Schreibabfrage bei Konsistenz ONE (1 Replik wurden erforderlich, aber nur 0 quittiert den Schreib)

und

com.datastax.driver.core.exceptions.NoHostAvailableException: Alle Host (s) versucht, für die Suche nach fehlgeschlagen (versucht: /192.168.1.102:9042 (com.datastax.dri ver.core.TransportException: [/192.168.1.102:9042] Verbindung wurde geschlossen), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Verbindung hat wurde geschlossen), /192.168.1.101:9042 (com.datastax.driver.core.TransportException:

In der unteren Zeile [/192.168.1.101:9042] Verbindung wurde)) geschlossen ist, bin ich etwas falsch machen? Sollte ich die Art, wie ich Daten lade, neu organisieren oder das Schema ändern? Ich habe versucht, die Zeilenlänge zu reduzieren (also habe ich 12-Stunden-Reihen), aber das machte keinen großen Unterschied.

============================== Update:

Ich war unfreundlich und vergaß ein Beispiel einfügen des Codes, den ich verwendet habe, nachdem die Frage beantwortet wurde. Es funktioniert einigermaßen gut, jedoch setze ich meine Forschung mit KairosDB und Binärübertragung mit Astyanax fort.Es sieht so aus, als ob ich mit CQL bessere Ergebnisse erzielen kann, obwohl KairosDB einige Probleme haben kann, wenn es in Überlastung ist (aber ich arbeite daran) und Astyanax ist ein bisschen ausschweifend für meinen Geschmack. Trotzdem, hier ist der Code, ich irre mich irgendwo.

Die Semaphor-Slot-Nummer hat keine Auswirkungen auf die Leistung, wenn sie über 5000 hinausgeht, und sie ist fast konstant.

String insertQuery = "insert into keyspace.measurement  (userid,time_by_hour,time,value) values (?, ?, ?, ?)"; 
     PreparedStatement preparedStatement =  session.prepare(insertQuery); 
     Semaphore semaphore = new Semaphore(15000); 

    System.out.println("Starting " + Thread.currentThread().getId()); 
    DateTime time = DateTime.parse("2015-01-05T12:00:00"); 
    //generating the entries 
    long start = System.currentTimeMillis(); 

    for (int i = 0; i < 900000; i++) { 

     BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important 
     semaphore.acquire(); 
     ResultSetFuture resultSetFuture = session.executeAsync(statement); 
     Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { 
      @Override 
      public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) { 

       semaphore.release(); 
      } 

      @Override 
      public void onFailure(Throwable throwable) { 
       System.out.println("Error: " + throwable.toString()); 
       semaphore.release(); 
      } 
     }); 
     time = time.plus(4); //4ms between each entry 
    } 

Antwort

4

Was sind Ihre Ergebnisse bei der Verwendung von nicht protokolliertem Batching? Möchten Sie wirklich Stapelanweisungen verwenden? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

+0

Nicht drastisch anders. Ich bin mir ziemlich sicher, dass ich Batch verwenden möchte, da ich bereits an ähnlichen Projekten in anderen Projekten gearbeitet habe und die Anweisungen normalerweise langsamer waren. Es macht sowieso keinen Sinn, schneller zu sein. –

+1

Spod hat Recht. Chargen in Cassandra sind keine Leistungsoptimierung. Protokollierte Batches sollten nur verwendet werden, wenn Atomizität erforderlich ist und es zu Leistungseinbußen kommt, um atomare Schreibvorgänge zu erreichen. Selbst nicht protokollierte Batches sind oft langsamer als direkte asynchrone Abfragen, sie erzwingen im Wesentlichen unnötige Koordination (es sei denn, Sie werden nach Schlüssel sortiert und Token verwendet - vielleicht sind Sie hier). Ich neige dazu, gerade asynchrone Schreibvorgänge zu empfehlen. Hier ist ein weiterer Artikel, um diese Ansicht zu unterstützen: http://losetechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/ – phact

+1

In Bezug auf Ihre Timeouts wird dies passieren, sobald Sie beginnen überwältigen Sie Ihre c * Knoten mit zu vielen Schreibvorgängen. Es ist einfach, dies mit asynchronen Abfragen zu tun, während Ihr Programm schreibt, so schnell wie es nonstop möglich ist. Nach dem Entfernen Ihrer Chargen (insbesondere der Protokollierung) sollten Sie eine Verbesserung sehen, aber Sie müssen möglicherweise Ihre Schreibvorgänge drosseln oder Ihre Timeouts sogar erhöhen, wenn Ihre SLA dies zulässt. – phact