2016-06-18 10 views
1

Ich verwende Datastax Enterprise 4.8 für Testzwecke in meiner Bachelorarbeit. Ich lade Wetterdaten in den Cluster (ca. 33 Mio Zeilen). Die Daten sieht etwa wie folgt ausCassandra-Cluster skaliert nicht. 3 Knoten sind sogar ein bisschen schneller als 6 Knoten (Code und Daten zur Verfügung gestellt)

//id;unix timestamp; validity; station info; temp in °C; humidity in % 
3;1950040101;5;24; 5.7000;83.0000 
3;1950040102;5;24; 5.6000;83.0000 
3;1950040103;5;24; 5.5000;83.0000 

Ich weiß, dass meine Daten Modell ist nicht sehr sauber (I dezimal für den Zeitstempel verwenden, aber ich wollte es nur auf diese Weise versuchen).

CREATE TABLE temp{ 
    id int, 
    timestamp decimal, 
    validity decimal, 
    structure decimal, 
    temperature float, 
    humidity float, 
    PRIMARY KEY((id),timestamp)); 

I basiert es etwa auf einem Artikel auf der DataStax Webseite: https://academy.datastax.com/resources/getting-started-time-series-data-modeling Die Insertion auf dem oft erwähnten Artikel auf lostechies Basis erfolgt

https://lostechies.com/ryansvihla/2016/04/29/cassandra-batch-loading-without-the-batch%E2%80%8A-%E2%80%8Athe-nuanced-edition/

Das ist mein Platzhalter-Code ist:

import java.io.BufferedReader; 
import java.io.File; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.math.BigDecimal; 
import java.util.Iterator; 
import java.util.LinkedList; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

import com.datastax.driver.core.BoundStatement; 
import com.datastax.driver.core.Cluster; 
import com.datastax.driver.core.ConsistencyLevel; 
import com.datastax.driver.core.PreparedStatement; 
import com.datastax.driver.core.ResultSet; 
import com.datastax.driver.core.ResultSetFuture; 
import com.datastax.driver.core.Session; 
import com.datastax.driver.extras.codecs.jdk8.InstantCodec; 
import com.google.common.base.Stopwatch; 
import com.google.common.util.concurrent.FutureCallback; 
import com.google.common.util.concurrent.Futures; 
import com.google.common.util.concurrent.MoreExecutors; 

public class BulkLoader { 

    private final int threads; 
    private final String[] contactHosts; 
    private final Cluster cluster; 
    private final Session session; 
    private final ExecutorService executor; 

    public BulkLoader(int threads, String... contactHosts) { 
     this.threads = threads; 
     this.contactHosts = contactHosts; 
     this.cluster = Cluster.builder().addContactPoints(contactHosts).build(); 

     cluster.getConfiguration().getCodecRegistry() 
       .register(InstantCodec.instance); 
     session = cluster.newSession(); 
     // fixed thread pool that closes on app exit 
     executor = MoreExecutors 
       .getExitingExecutorService((ThreadPoolExecutor) Executors 
         .newFixedThreadPool(threads)); 
    } 

    public static class IngestCallback implements FutureCallback<ResultSet> { 

     public void onSuccess(ResultSet result) { 
     } 

     public void onFailure(Throwable t) { 
      throw new RuntimeException(t); 
     } 
    } 

    public void ingest(Iterator<Object[]> boundItemsIterator, String insertCQL) 
      throws InterruptedException { 
     final PreparedStatement statement = session.prepare(insertCQL); 
     while (boundItemsIterator.hasNext()) { 
      BoundStatement boundStatement = statement.bind(boundItemsIterator 
        .next()); 
      boundStatement.setConsistencyLevel(ConsistencyLevel.QUORUM); 
      ResultSetFuture future = session.executeAsync(boundStatement); 
      Futures.addCallback(future, new IngestCallback(), executor); 
     } 
    } 

    public void stop() { 
     session.close(); 
     cluster.close(); 
    } 


    public static List<Object[]> readCSV(File csv) { 
     BufferedReader fileReader = null; 
     List<Object[]> result = new LinkedList<Object[]>(); 
     try { 
      fileReader = new BufferedReader(new FileReader(csv)); 
      String line = ""; 
      while ((line = fileReader.readLine()) != null) { 
       String[] tokens = line.split(";"); 
       if (tokens.length < 6) { 
        System.out.println("Unvollständig"); 
        continue; 
       } 
       Object[] tmp = new Object[6]; 
       tmp[0] = (int) Integer.parseInt(tokens[0]); 
       tmp[1] = new BigDecimal(Integer.parseInt(tokens[1])); 
       tmp[2] = new BigDecimal(Integer.parseInt(tokens[2])); 
       tmp[3] = new BigDecimal(Integer.parseInt(tokens[2])); 
       tmp[4] = (float) Float.parseFloat(tokens[4]); 
       tmp[5] = (float) Float.parseFloat(tokens[5]); 
       result.add(tmp); 
      } 
     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } finally { 
      try { 
       fileReader.close(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 

     return result; 
    } 

    public static void main(String[] args) { 
     Stopwatch watch = Stopwatch.createStarted(); 
     File folder = new File(
       "C:/VirtualMachines/Kiosk/BachelorarbeitStraubinger/workspace/bulk/src/main/resources"); 
     List<Object[]> data = new LinkedList<Object[]>(); 
     BulkLoader loader = new BulkLoader(16, "10.2.57.38", "10.2.57.37", 
       "10.2.57.36", "10.2.57.35", "10.2.57.34", "10.2.57.33"); 
     int cnt = 0; 
     File[] listOfFiles = folder.listFiles(); 
     for (File file : listOfFiles) { 
      if (file.isFile() && file.getName().contains(".th")) { 
       data = readCSV(file); 
       cnt += data.size(); 
       try { 

        loader.ingest(
          data.iterator(), 
          "INSERT INTO wheather.temp (id, timestamp, validity, structure, temperature, humidity) VALUES(?,?,?,?,?,?)"); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } finally { 
        System.out.println(file.getName() 
          + " -> Datasets importet: " + cnt); 
       } 
      } 
     } 
     System.out.println("total time seconds = " 
       + watch.elapsed(TimeUnit.SECONDS)); 
     watch.stop(); 
     loader.stop(); 
    } 
} 

Der Replikationsfaktor ist 3 und ich lasse Test auf 6 laufen r 3 Knoten. Mit vNodes enabled und num_tokens = 256. bekomme ich ungefähr die gleichen Einfügezeiten, wenn ich es auf einem der Cluster ausführe. Irgendwelche Ideen, warum das ist?

+0

Ich habe versucht, verschiedene Placement-Strategien und Konsistenzebenen auch. Es hatte auch keine große Wirkung. Ich bekomme immer ca. 14 000 - 17 000 Einsätze pro Sekunde! – Minalcar

Antwort

1

Physik.

Sie maximieren wahrscheinlich den Durchsatz Ihrer App. Normalerweise wäre die Antwort, mehrere Clients/App-Server zu haben, aber es sieht so aus, als ob Sie von einer CSV lesen. Ich schlage vor, entweder die CSV in Teile zu zerlegen und mehrere Instanzen Ihrer App auszuführen oder gefälschte Daten und mehrere Instanzen davon zu generieren.

Edit: Ich denke auch, es ist erwähnenswert, dass mit solch einem Datenmodell, einer kleinen Nutzlast und richtiger Hardware ich mir vorstellen könnte, dass jeder Knoten 15-20K Einfügungen/Sekunde (nicht für Knotendichte/Verdichtung).

+0

Siehe meinen Kommentar zu der anderen Antwort. Wenn die verwendete Methode nicht effizient ist, warum wird hier besonders auf SO viel empfohlen? – Minalcar

2

Es ist wahrscheinlich, dass Sie die Clientanwendung/den Clientserver ausschöpfen. Wenn Sie aus einer statischen Datei lesen, können Sie davon profitieren, es in ein paar Stücke zu zerlegen und parallel laufen zu lassen, oder sogar Brian Hess 'Loader (https://github.com/brianmhess/cassandra-loader) oder den echten Cassandra Bulkloader (http://www.datastax.com/dev/blog/using-the-cassandra-bulk-loader-updated) zu betrachten, der sich dreht die Daten in eine Reihe von stables und streamt diese direkt ein. Beide sind wahrscheinlich schneller als Ihr bestehender Code.

+0

Das war auch mein erster Gedanke. Das Gute ist, dass es etwa 80 verschiedene CSV-Dateien gibt, die ich in den Cluster lade. Also habe ich sie in zwei Teile aufgeteilt und den Import auf zwei Maschinen durchgeführt. Wenn ich das versuchte, bekam ich mehrere Schreib-Timeouts. Ich weiß nicht wirklich, was ich davon halten soll. Auch wenn die Last auf meinem Cluster nicht so hoch ist, scheint die Client-Anwendung es ziemlich stark zu betonen. Beim Einfügen mit zwei Maschinen bekam ich auch 24k Einfügeoperationen. Was ist besser, aber wie gesagt, ich habe auch gelesen. – Minalcar

+0

Es sieht so aus, als ob Sie unbegrenzte gleichzeitige Futures zulassen. Versuchen Sie es mit einem Semaphor oder verwenden Sie Brians Programm, wie Jeff es vorschlägt. Es hat bereits eins. – phact

+0

Ich werde Brian Hess Loader versuchen. Ich habe nur eine Frage. Dieser Loader verbindet nur mit einem Knoten im Cluster. Ist das nicht problematisch in Bezug auf die Leistung? – Minalcar