2016-06-15 9 views
0

Wir verwenden Cassandra 3.5 mit Spark 1.6.1 in 2-Knoten-Cluster (8 Kerne und 16 G Speicher pro Knoten).Wie kann man entscheiden, ob die Leistung der Spark-Anwendung nahe am Maximum liegt (für gegebene Kerne und Speicher)?

Es gibt die folgende Cassandra Tabelle

CREATE TABLE schema.trade (
symbol text, 
date int, 
trade_time timestamp, 
reporting_venue text, 
trade_id bigint, 
ref_trade_id bigint, 
action_type text, 
price double, 
quantity int, 
condition_code text, 
PRIMARY KEY ((symbol, date), trade_time, trade_id) 
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'}; 

Und ich will Anteil an Volumen berechnen: Summe aller Volumen von Geschäften in der entsprechenden Sicherheit während des Zeitraums groupped durch Austausch und Zeitleiste (1 oder 5 Protokoll). ich ein Beispiel erstellt haben:

void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) { 
    char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000; 
    LOG.info("start"); 
    JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade") 
      .filter(row -> 
         row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) && 
         row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() && 
         row.getDateTime("trade_time").getMillis() < timeTill.getTime()) 
      .mapToPair(row -> 
       new Tuple2<>(
        new Tuple2(
          new Timestamp(
            (row.getDateTime("trade_time").getMillis()/(barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER 
          ), 
          row.getString("reporting_venue")), 
        row.getInt("quantity") 
       ) 
      ).reduceByKey((a, b) -> a + b); 
    LOG.info(counts.collect().toString()); 
    LOG.info("finish"); 
} 

[2016.06.15 09: 25: 27,014] [INFO] [main] [EquityTCAAnalytics] starten
[2016.06.15 09.25 : 28.000] [INFO] [main] [NettyUtil] Fand Nettys nativen epoll-Transport im Klassenpfad und benutzte ihn
[2016-06-15 09: 25: 28.518] [INFO] [main] [Cluster] Neuer Cassandra-Host/node1: 9042 hinzugefügt
[2016-06-15 09: 25: 28.519] [INFO] [main] [LocalNodeFirstLoadBalancingPolicy] Host hinzugefügt node1 (datacenter1)
[2016-06-15 09: 25: 28.519] [INFO] [Haupt] [Clu ster] Neuer Cassandra host/node2: 9042 hinzugefügt
[2016-06-15 09: 25: 28.520] [INFO] [main] [CassandraConnector] Verbunden mit Cassandra cluster: Cassandra
[2016-06-15 09:25 : 29.115] [INFO] [main] [SparkContext] Start job: sammeln at EquityTCAAnalytics.java:88
[2016-06-15 09: 25: 29.385] [INFO] [dag-scheduler-event-loop] [DAGScheduler ] RDD 2 registrieren (mapToPair at EquityTCAANalytics.java:78)
[2016-06-15 09: 25: 29.388] [INFO] [dag-scheduler-ereignis-schleife] [DAGScheduler] Hat job 0 (vereinnahmen at EquityTCAAnalytics. Java: 88) mit 5 Ausgabe-Partitionen
[2016-06-15 09: 25: 29.388] [INFO] [dag-scheduler-ereignis-schleife] [DAGScheduler] Letzte stufe: ResultStage 1 (collect at EquityTCAAnalytics.java:88)
[2016-06-15 09: 25: 29.389] [INFO] [dag-scheduler-ereignis-schleife] [DAGScheduler] Eltern der letzten stufe: Liste (ShuffleMapStage 0)
[2016-06-15 09:25 : 29.391] [INFO] [dag-scheduler-event-loop] [DAGScheduler] Fehlende Eltern: Liste (ShuffleMapStage 0)
[2016-06-15 09: 25: 29.400] [INFO] [dag-scheduler-event- Schleife] [DAGScheduler] Einreichen von ShuffleMapStage 0 (MapPartitionsRDD [2] at mapToPair at EquityTCAANalytics.java:78), die keine fehlenden Eltern hat
[2016-06-15 09: 25: 29.594] [INFO] [dag-scheduler- Ereignisschleife] [MemoryStore] Block Broadcast_0 gespeichert als Werte im Speicher (geschätzte Größe 10,8 KB, frei 10,8 KB)
[2016-06-15 09: 25: 29.642] [INFO] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_0_piece0 speichern d als Bytes im Speicher (geschätzte Größe 5,4 KB, kostenlos 16,3 KB)
[2016-06-15 09: 25: 29.647] [INFO] [Dispatcher-Ereignis-Schleife-7] [BlockManagerInfo] Broadcast_0_Piece0 im Speicher auf Node2 hinzugefügt : 44871 (Größe: 5.4 KB, frei: 2.4 GB)
[2016-06-15 09: 25: 29.650] [INFO] [dag-scheduler-ereignis-loop] [SparkContext] erstellte Sendung 0 von Sendung bei DAGScheduler. scala: 1006
[2016-06-15 09: 25: 29.658] [INFO] [dag-scheduler-event-loop] [DAGScheduler] Einreichen von 5 fehlenden Aufgaben aus ShuffleMapStage 0 (MapPartitionsRDD [2] at mapToPair at EquityTCAANalytics.java : 78)
[2016-06-15 09: 25: 29.661] [INFO] [dag-scheduler-ereignis-loop] [TaskSchedulerImpl] Hinzufügen von Aufgabensatz 0.0 mit 5 Aufgaben
[2016-06-15 09:25 : 30.006] [INFO] [Dispatcher-Event-Schleife-7] [SparkDeploySchedulerBackend] Registrierter Executor NettyRpcEndpointRef (null) (Knoten1: 41122) mit ID 0 [2016-06-15 09: 25: 30.040] [INFO] [Dispatcher -event-loop-7] [TaskSetManager] Aufgabe 0.0 in Stufe 0.0 starten (TID 0, node1, Partition 0, NODE_LOCAL, 11725 bytes)
[2016-06-15 09: 25: 30.051] [INFO] [Dispatcher- event-loop-7] [TaskSetManager] Aufgabe 1.0 in Stufe 0.0 starten (TID 1, Knoten1, Partition 1, NODE_LOCAL, 11317 Bytes)
[2016-06-15 09: 25: 30.054] [INFO] [Dispatcher-Ereignis -loop-7] [TaskSetManager] Task 2.0 in Stufe 0.0 starten (TID 2, Knoten1, Partition 2, NODE_LOCAL, 11929 Byte)
[2016-06-15 09: 25: 30.057] [INFO] [Dispatcher-Event- loop-7] [TaskSetManager] Task 3.0 in der Stufe 0.0 starten (TID 3, node1, partition 3, NO DE_LOCAL, 11249 Byte)
[2016-06-15 09: 25: 30.059] [INFO] [Dispatcher-Ereignis-Schleife-7] [TaskSetManager] Aufgabe 4.0 in Stufe 0.0 (TID 4, Knoten1, Partition 4, NODE_LOCAL , 11560 Byte)
[2016-06-15 09: 25: 30.077] [INFO] [Dispatcher-Ereignis-Schleife-7] [SparkDeploySchedulerBackend] Registered Ausführender NettyRpcEndpointRef (null) (CassandraCH4.ehubprod.local: 33668) mit ID 1
[2016-06-15 09: 25: 30.111] [INFO] [Dispatcher-Ereignis-Schleife-4] [BlockManagerMasterEndpoint] Registrieren Block-Manager Knoten1: 36512 mit 511.1 MB RAM, BlockManagerId (0, Knoten1, 36512)
[2016-06-15 09: 25: 30.168] [INFO] [Dispatcher-Ereignis-Schleife-3] [BlockManagerMasterEndpoint] Registrierung Block-Manager CassandraCH4.ehubprod.local: 33610 mit 511.1 MB RAM, BlockManagerId (1, CassandraCH4.ehubprod .lokal , 33610)
[2016-06-15 09: 25: 30.818] [INFO] [Dispatcher-Ereignis-Schleife-2] [BlockManagerInfo] Broadcast_0_Piece0 im Speicher auf Knoten1 hinzugefügt: 36512 (Größe: 5.4 KB, frei: 511.1 MB)
[2016-06-15 09: 25: 36.764] [INFO] [pool-21-thread-1] [CassandraConnector] Vom Cassandra-Cluster getrennt: Cassandra
[2016-06-15 09: 25: 48.914] [INFO] [Aufgabe-Ergebnis-Getter-0] [TaskSetManager] Beendete Aufgabe 4.0 in Stufe 0.0 (TID 4) in 18854 ms auf Knoten1 (1/5)
[2016-06-15 09: 25: 55.541] [ INFO] [Aufgabe-Ergebnis-Getter-1] [TaskSetManager] Beendete Aufgabe 2.0 in Stufe 0.0 (TID 2) in 25489 ms auf Node1 (2/5) [2016-06-15 09: 25: 57.837] [INFO ] [task-result-getter-2] [TaskSetManager] Aufgabe 1.0 in Stufe 0.0 (TID 1) in 27795 ms beendet on node1 (3/5)
[2016-06-15 09: 25: 57.931] [INFO] [Aufgabe-Ergebnis-Getter-3] [TaskSetManager] Beendete Aufgabe 0.0 in Stufe 0.0 (TID 0) in 27919 ms an node1 (4/5)
[2016-06-15 09: 26: 01.357] [INFO] [Aufgabe-Ergebnis-Getter-0] [TaskSetManager] Beendete Aufgabe 3.0 in Stufe 0.0 (TID 3) in 31302 ms auf Knoten1 (5/5)
[2016-06-15 09: 26: 01.358] [INFO] [dag-scheduler-ereignis-schleife] [DAGScheduler] ShuffleMapStage 0 (mapToPair at EquityTCAANalytics.java:78) beendet in 31.602 s
[2016-06-15 09: 26: 01.360] [INFO] [dag-scheduler-event-loop] [DAGScheduler] sucht nach neuen Laufstufen
[2016-06-15 09: 26: 01.360] [INFO] [dag-scheduler-ereignisschleife] [DAGScheduler] running: Set()
[2016-06-15 09: 26: 01.360] [INFO] [Aufgabe-Ergebnis-Getter-0] [TaskSchedulerImpl] Entfernte TaskSet 0.0, deren Aufgaben alle abgeschlossen haben, aus Pool
[2016-06-15 09: 26: 01.362] [INFO] [dag-scheduler-ereignis-schleife] [DAGScheduler] waiting: Setzen (ResultStage 1)
[2016-06-15 09: 26: 01.362] [INFO] [dag-scheduler-event- Schleife] [DAGScheduler] fehlgeschlagen: Set()
[2016-06-15 09: 26: 01.365] [INFO] [DAG-Scheduler-Event-Schleife] [DAGScheduler] Senden ResultStage 1 (ShuffledRDD [3] bei ReduceByKey um EquityTCACAnalytics.java:87), die keine fehlenden Eltern hat
[2016-06-15 09: 26: 01.373] [INFO] [dag-scheduler-ereignis-schleife] [MemoryStore] Block broadcast_1 gespeichert als Werte im Speicher (geschätzt Größe 3,6 KB, frei 19.9 KB)
[2016.06.15 09: 26: 01,382] [INFO] [dag-Scheduler-event-loop] [Memorys] Satz broadcast_1_piece0 als Bytes im Speicher (geschätzte Größe 2.1 KB, frei 21,9 KB gespeichert)
[2016-06-15 09: 26: 01.383] [INFO] [Dispatcher-Ereignis-Schleife-1] [BlockManagerInfo] hinzugefügt Broadcast_1_piece0 im Speicher auf Knoten2: 44871 (Größe: 2,1 KB, frei: 2,4 GB)
[ 2016-06-15 09: 26: 01.384] [INFO] [dag-scheduler-ereignis-loop] [SparkContext] Erstellte broadcast 1 aus broadcast bei DAGScheduler.scala: 1006
[2016-06-15 09: 26: 01,385 ] [INFO] [dag-Scheduler-event-loop] [DAGScheduler] Übermittlung 5 fehlende Aufgaben aus ResultStag 1 (ShuffledRDD [3] bei reduceByKey bei EquityTCAAnalytics.java:87)
[2016.06.15 09: 26: 01,386 ] [INFO] [dag-schedule ler-event-loop] [TaskSchedulerImpl] Aufgabe 1.0 mit 5 Aufgaben hinzufügen
[2016-06-15 09: 26: 01.390] [INFO] [dispatcher-event-loop-4] [TaskSetManager] Aufgabe 0.0 in der Phase starten 1.0 (TID 5, Knoten1, Partition 0, NODE_LOCAL, 2786 Byte)
[2016-06-15 09: 26: 01.390] [INFO] [Dispatcher-Ereignis-Schleife-4] [TaskSetManager] Aufgabe 1.0 in Phase 1.0 gestartet (TID 6, Knoten1, Partition 1, NODE_LOCAL, 2786 Byte)
[2016-06-15 09: 26: 01.397] [INFO] [Dispatcher-Event-Schleife-4] [TaskSetManager] Task 2.0 in Stufe 1.0 starten (TID 7, node1, partition 2, NODE_LOCAL, 2786 Bytes)
[2016.06.15 09: 26: 01,398] [INFO] [Dispatcher-event-loop-4] [TaskSetManager] Ausgehend Aufgabe 3.0 in der Stufe 1.0 (TID 8, Knoten1, Partition 3, NODE_LOCAL, 2786 Bytes)
[2016.06.15 09: 26: 01,406] [INFO] [Dispatcher-event-loop-4] [TaskSetManager] Ausgehend Aufgabe 4.0 in der Stufe 1.0 (TID 9, node1, Trennwand 4, NODE_LOCAL, 2786 Bytes)
[2016-06-15 09: 26: 01.429] [INFO] [Dispatcher-Ereignis-Schleife-4] [BlockManagerInfo] hinzugefügt Broadcast_1_piece0 im Speicher auf Knoten1: 36512 (Größe: 2,1 KB, frei: 511,1 MB)
[2016 -06-15 09: 26: 01,452] [INFO] [Dispatcher-Event-Loop-6] [MapOutputTrackerMasterEndpoint] gestellte auf Karte Ausgabestellen für Shuffle 0 bis node1 senden: 41122
[2016.06.15 09.26: 01.456] [INFO] [Dispatcher-Event-Schleife-6] [MapOutputTrackerMaster] Größe der Ausgabestatus für Shuffle 0 ist 161 Byte
[2016-06-15 09: 26: 01.526] [INFO] [Task-Ergebnis-Getter -1] [TaskSetManager] Beendete Aufgabe 4.0 in Stufe 1.0 (TID 9) in 128 ms auf Knoten1 (1/5)
[2016-06-15 09: 26: 01.575] [INFO] [Aufgabe-Ergebnis-Getter-3] [TaskSetManager] Beendete Aufgabe 2.0 in Stufe 1.0 (TID 7) in 184 ms auf Knoten1 (2/5)
[2016-06-15 09: 26: 01.580] [INFO] [Aufgabe-Ergebnis-Getter-2] [TaskSetManager] Beendete Aufgabe 0.0 in Stufe 1.0 (TID 5) in 193 ms auf Knoten1 (3/5)
[2016-06-15 09: 26: 01.589] [INFO] [Aufgabe-Ergebnis-Getter-3] [TaskSetManager] Beendete Aufgabe 1.0 in Stufe 1.0 (TID 6) in 199 ms auf Knoten1 (4/5)
[2016.06.15 09: 26: 01,599] [INFO] [task-Ergebnis-Getter-2] [TaskSetManager] Finished Aufgabe 3.0 in der Stufe 1.0 (TID 8) in 200 ms auf node1 (5/5)
[2016-06-15 09: 26: 01.599] [INFO] [Aufgabe-Ergebnis-Getter-2] [TaskSchedulerImpl] Entfernte TaskSet 1.0, deren Aufgaben alle aus Pool abgeschlossen haben
[2016-06-15 09: 26: 01.599] [INFO] [dag-scheduler-ereignis-schleife] [DAGScheduler] ResultStage 1 (vereinnahmen at EquityTCAANalytics.java:88) beendet in 0,202 s
[2016-06 -15 09: 26: 01.612] [INFO] [main] [DAGScheduler] Stelle 0 beendet: sammeln at EquityTCAAnalytics.java:88, hat 32.496470 s
[2016-06-15 09: 26: 01.634] [INFO ] [main] [EquityTCAANalytics] [((2016-06-10 13: 45: 00.0, DA), 6944), ((2016-06-10 14: 25: 00.0, B), 5241), ..., ((2016-06-10 10: 55: 00.0, QD), 109080), ((2016-06-10 14: 55: 00.0, A), 1300)]
[2016-06-15 09:26: 01.641] [INFO] [main] [EquityTCAAnalytics] finish

Ist 32,5 s normal?

+1

Normal zu sein ist relativ zu der Aufgabe, die Sie erreichen möchten, und der Datenmenge, mit der Sie es zu tun haben. Auch mögliche Engpässe Netzwerk IO. Ihre Frage ist also mit den gegebenen Informationen nicht zu beantworten. – eliasah

Antwort

0

Ich würde sagen%% der CPU und/oder Speicherverbrauch wäre ein Ausgangspunkt. Wenn Ihre Kerne nicht ausgelastet sind, kann dies bedeuten, dass Ihr Prozess nicht parallel genug ist. Speichergroße Chargen hängen von Ihren Daten ab, aber in der Regel wird mehr Arbeitsspeicher verwendet, als zu IO zurückzukehren.