1

ich mit um mehr als 500 Millionen Platten eine cassandra Tabelle habe (In 6 Knoten), jetzt versuche ich zum Einfügen von Daten unter Verwendung von Funken cassandra-Stecker in Amazon EMRCassandra schreiben gibt sehr langsam perfomance mit Funken

Tabelle Struktur

CREATE TABLE dmp.dmp_user_profiles_latest (
     pid text PRIMARY KEY, 
     xnid int, 
     day_count map<text, int>, 
     first_seen map<text, timestamp>, 
     last_seen map<text, timestamp>, 
     usage_count map<text, int>, 
     city text, 
     country text, 
     lid set<text>, 

    )WITH bloom_filter_fp_chance = 0.01 
    AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}' 
    AND comment = '' 
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'} 
    AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} 
    AND dclocal_read_repair_chance = 0.1 
    AND default_time_to_live = 0 
    AND gc_grace_seconds = 172800 
    AND max_index_interval = 2048 
    AND memtable_flush_period_in_ms = 0 
    AND min_index_interval = 128 
    AND read_repair_chance = 0.1 
    AND speculative_retry = '99.0PERCENTILE'; 
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count); 
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country); 

die folgenden sind meine Optionen

--class com.mobi.vserv.driver.Query5kPids1 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.yarn.executor.memoryOverhead=1024  
--conf spark.yarn.driver.memoryOverhead=1024 
--executor-memory 1g 
--executor-cores 2 
--driver-memory 4g 

-Funke einreichen Aber in den Protokollen ich Cassandra gesehen nimmt zum Laden von 2 lakh ca. 4-5 Minuten schriftlich (200000) Aufzeichnungen (während der Gesamtausführungszeit 6+ Minuten)

Ich habe auch

conf.set("spark.cassandra.output.batch.size.rows", "auto"); 
conf.set("spark.cassandra.output.concurrent.writes", "500"); 
conf.set("spark.cassandra.output.batch.size.bytes", "100000"); 
conf.set("spark.cassandra.output.throughput_mb_per_sec","1"); 

die folgend in Spark-conf hinzugefügt aber noch keine Performance-Verbesserung gibt es auch doesn die Anzahl der Kerne in Amazon EMR zu erhöhen‘ t helfen.

Bitte beachten Sie, dass ich in meiner Cassandra-Tabelle keine Partitionierung/Clustering-Spalte verwendet haben, so könnte dies der Grund für solch eine langsame Leistung sein.

Bitte beachten Sie Netzwerkgeschwindigkeit 30 MB PS ein Primärschlüssel ist eine alphanumerische Werte, zB - a9be3eb4-751f-48ee-b593-b3f89e18622d

Cassandra.yaml

cluster_name: 'dmp Cluster' 
num_tokens: 100 
hinted_handoff_enabled: true 
max_hint_window_in_ms: 10800000 # 3 hours 
hinted_handoff_throttle_in_kb: 1024 
max_hints_delivery_threads: 2 
batchlog_replay_throttle_in_kb: 1024 
authenticator: AllowAllAuthenticator 
authorizer: AllowAllAuthorizer 
permissions_validity_in_ms: 2000 
partitioner: org.apache.cassandra.dht.Murmur3Partitioner 
data_file_directories: 
    - /data/cassandra/data 
disk_failure_policy: stop 
commit_failure_policy: stop 

key_cache_size_in_mb: 

key_cache_save_period: 14400 
row_cache_size_in_mb: 0 
row_cache_save_period: 0 
counter_cache_size_in_mb: 
counter_cache_save_period: 7200 
saved_caches_directory: /data/cassandra/saved_caches 
commitlog_sync: periodic 
commitlog_sync_period_in_ms: 10000 
seed_provider: 
- class_name: org.apache.cassandra.locator.SimpleSeedProvider 
    parameters: 
- seeds: "10.142.76.97,10.182.19.301" 

concurrent_reads: 256 
concurrent_writes: 128 
concurrent_counter_writes: 32 

memtable_allocation_type: heap_buffers 
memtable_flush_writers: 8 
index_summary_capacity_in_mb: 
index_summary_resize_interval_in_minutes: 60 
trickle_fsync: false 
trickle_fsync_interval_in_kb: 10240 
storage_port: 7000 
ssl_storage_port: 7001 
listen_address: 10.142.76.97 
start_rpc: true 
rpc_address: 10.23.244.172 
rpc_port: 9160 
rpc_keepalive: true 
rpc_server_type: sync 
thrift_framed_transport_size_in_mb: 15 
incremental_backups: false 
snapshot_before_compaction: false 
auto_snapshot: true 
tombstone_warn_threshold: 1000 
tombstone_failure_threshold: 100000 
column_index_size_in_kb: 64 
batch_size_warn_threshold_in_kb: 5 
concurrent_compactors: 4 
compaction_throughput_mb_per_sec: 64 
sstable_preemptive_open_interval_in_mb: 50 
read_request_timeout_in_ms: 500000 

range_request_timeout_in_ms: 1000000 

write_request_timeout_in_ms: 200000 

counter_write_request_timeout_in_ms: 500000 

cas_contention_timeout_in_ms: 100000 

endpoint_snitch: Ec2Snitch 

dynamic_snitch_update_interval_in_ms: 100 

dynamic_snitch_reset_interval_in_ms: 600000 

dynamic_snitch_badness_threshold: 0.1 

request_scheduler: org.apache.cassandra.scheduler.NoScheduler 

server_encryption_options: 
    internode_encryption: none 
    keystore: conf/.keystore 
    keystore_password: cassandra 
    truststore: conf/.truststore 
    truststore_password: cassandra 

client_encryption_options: 
    enabled: false 
    keystore: conf/.keystore 
    keystore_password: cassandra 

internode_compression: all 

inter_dc_tcp_nodelay: false 
+0

können wir Ihre Datenbankstruktur haben? – Whitefret

+0

Bitte überprüfen Sie jetzt. –

+0

Haben Sie Zugriff auf den Knoten? Um zu sehen, wie Ihre Datenbank über den Cluster verteilt ist? Vielleicht gehen alle Ihre Datensätze zum selben Knoten (so dass die Anzahl der Knoten unnötig ist) – Whitefret

Antwort

1

Wie im Kommentar gesprochen, es scheint, dass dein Problem von deinem Index auf day_count kommt.

Wie Sie in diesem page sehen, Index wird nicht effizient sein, wenn Sie sie ständig aktualisieren müssen, und es tut, wenn Sie einen anderen Wert in day_count einfügen (was möglicherweise jedes Mal ist).

Sie benötigen eine Datenbank zu überarbeiten, aber da dies die Produktionsumgebung ist, können Sie nicht nur DROP INDEX IF EXISTS keyspace.index_name wenn dieser Index notwendig ist, aber man konnte eine sekundäre Datenbank erstellen day_count als Primärschlüssel verwenden oder day_count als verwenden Bestellindex.

+1

Übrigens, ** IF ** das Problem kommt aus sekundären Index (wir haben keine absolute Gewissheit, es sei denn, wir messen die Insert-Rate ohne diese Indizes, ** measure guess **). Eine Lösung kann darin bestehen, den Index zu löschen, um alle Daten in Cassandra einzufügen und dann den Index neu zu erstellen. Es würde bedeuten, dass die Anwendung nicht nach Index abfragen kann, bis die Einfügung beendet ist. – doanduyhai

+0

Hi als Update, wir verwenden diesen Index nicht zur Abfrage, also ist es in Ordnung, wenn ich es ablege?Sie haben auch gesagt, dass es nicht effizient ist, wenn es jedes Mal aktualisiert wird, und wenn wir einen anderen Wert einfügen, aktualisieren wir in unserem Fall die Kartenwerte (Index) jedes Mal und in manchen Fällen fügen wir einen neuen Wert in die Karte ein. –

+0

Bitte vorschlagen. Wie muss ich den Index entfernen. –