Ich habe eine Tabelle, die etwa 40 Millionen Zeilen (ca. 120 Millionen IDs) hat, und ich muss einige der Zeilen mit einem Skript aktualisieren, das ich in Ruby schrieb. Meine DB ist eine PostgreSQL-Datenbank.DB update Skript überspringt Zeilen zu aktualisieren, wenn ich Multithreading
Das Problem, das ich habe, ist, dass, wenn ich ein einzelnes Thread-Skript schreibe, es sehr sehr langsam läuft (da es jede Zeile separat aktualisieren muss und Zeit beim Öffnen und Schließen der Verbindung verloren geht). Auf der positiven Seite aktualisiert es alle Zeilen erfolgreich.
Also, um es zu beschleunigen, dachte ich, ich würde ein Multithread-Skript schreiben, so dass ich mehrere Verbindungen nutzen könnte. Aber das Problem ist jetzt, dass einige der Zeilen einfach nicht aktualisiert werden, ohne mir zu sagen, warum (mein Skript hat eine Ausnahmebehandlung).
Hier ist mein Skript, das Multithreading verwendet.
require 'sequel'
require 'logger'
......
DB = Sequel.connect('DB Credentials') # max connections = 20
starting_id = 1
total_objects = 120061827
queue = Queue.new
Thread.abort_on_exception = true
collector = Thread.new {
(starting_id..total_objects).step(100000) do |id|
lower_limit = id
upper_limit = id + 100000
if upper_limit > total_objects
upper_limit = total_objects + 1
end
objects = DB["
SELECT id, extra_fields
FROM table
WHERE json_typeof(extra_fields->'field') = 'object'
AND id >= #{lower_limit}
AND id < #{upper_limit}
"]
while queue.length >= 100
progress_logger.info("Queue length limit reached. Sleeping Collector")
sleep(5)
end
queue.push(objects)
progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}")
end
Thread.exit
}
def update_extra_fields(object, extra_fields, updater_logger, error_logger)
begin
update_query = DB["
UPDATE table
SET extra_fields = '#{JSON.generate(extra_fields)}'
WHERE id = #{object[:id]}
"]
update_result = update_query.update
updater_logger.info("Updated Photo ID: #{object[:id]}") if update_result == 1
rescue Sequel::Error => e
error_logger.info("Could not update 'extra_fields' for #{object[:id]} because #{e.message}")
end
end
def construct_new_tags(tags)
#logic
return new_tags
end
def update_objects(objects, updater_logger, error_logger)
objects.each do |object|
extra_fields = JSON.parse(object[:extra_fields])
tags = extra_fields["field"]
new_tags = construct_new_tags(tags)
extra_fields["new_field"] = new_tags
update_extra_fields(object, extra_fields, updater_logger, error_logger)
end
end
num_threads = 0
all_threads = []
consumer = Thread.new {
while queue.length > 0
if num_threads <= 15
objects_block = queue.shift
all_threads << Thread.new {
num_threads += 1
update_objects(objects_block, updater_logger, error_logger)
num_threads -= 1
}
else
progress_logger.info("Threads limit reached. Sleeping Updater")
sleep(1)
end
end
}
collector.join
consumer.join
all_threads.each do |thread|
thread.join
end
puts "Queue END reached: EXIT"
- Nichts ist jemals in dem error_logger angemeldet wird
- Die Anzahl der Zeilen jedes Mal aktualisiert, Ich betreiben das Skript unterscheidet. So bin ich zuversichtlich, dass es kein bestimmtes Muster gibt, dass mir Zeilen zum Aktualisieren etc. fehlen.
- Wenn ich ein einzelnes Thread-Skript schreibe, das der gleichen Logik folgt. Alle Zeilen wurden erfolgreich aktualisiert.
Oh! Großer Fang! "Objekte" sollten "Fotos" sein, denn "Objekte" sind nichts, was ich gerade umbenannt habe, um die echten Namen zu verschleiern. Aber danke, dass Sie darauf hingewiesen haben – satnam
Die zweite Sache ist, dass wenn die Warteschlange größer als 100 ist, Sie nichts mit den 'Objekten' machen. – AlexN
Also habe ich meinen Code mit Ihrem aktualisiert und sehe immer noch, dass Zeilen übersprungen werden. Ich habe auch meinen ursprünglichen Beitrag aktualisiert, um zu zeigen, wie mein Skript jetzt aussieht. Ich stimme zu, dass ich ein entscheidendes Stück in meinem Code vermisse, aber immer noch etwa 33% der Zeilen werden übersprungen – satnam