2016-08-03 18 views
0

Ich habe eine Tabelle, die etwa 40 Millionen Zeilen (ca. 120 Millionen IDs) hat, und ich muss einige der Zeilen mit einem Skript aktualisieren, das ich in Ruby schrieb. Meine DB ist eine PostgreSQL-Datenbank.DB update Skript überspringt Zeilen zu aktualisieren, wenn ich Multithreading

Das Problem, das ich habe, ist, dass, wenn ich ein einzelnes Thread-Skript schreibe, es sehr sehr langsam läuft (da es jede Zeile separat aktualisieren muss und Zeit beim Öffnen und Schließen der Verbindung verloren geht). Auf der positiven Seite aktualisiert es alle Zeilen erfolgreich.

Also, um es zu beschleunigen, dachte ich, ich würde ein Multithread-Skript schreiben, so dass ich mehrere Verbindungen nutzen könnte. Aber das Problem ist jetzt, dass einige der Zeilen einfach nicht aktualisiert werden, ohne mir zu sagen, warum (mein Skript hat eine Ausnahmebehandlung).

Hier ist mein Skript, das Multithreading verwendet.

require 'sequel' 
require 'logger' 
...... 

DB = Sequel.connect('DB Credentials') # max connections = 20 


starting_id = 1 
total_objects = 120061827 

queue = Queue.new 
Thread.abort_on_exception = true 

collector = Thread.new { 
    (starting_id..total_objects).step(100000) do |id| 
     lower_limit = id 
     upper_limit = id + 100000 
     if upper_limit > total_objects 
      upper_limit = total_objects + 1 
     end 

     objects = DB[" 
      SELECT id, extra_fields 
      FROM table 
      WHERE json_typeof(extra_fields->'field') = 'object' 
      AND id >= #{lower_limit} 
      AND id < #{upper_limit} 
     "] 

     while queue.length >= 100 
      progress_logger.info("Queue length limit reached. Sleeping Collector") 
      sleep(5) 
     end 

     queue.push(objects) 
     progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}") 
    end 
    Thread.exit 
} 

def update_extra_fields(object, extra_fields, updater_logger, error_logger) 
    begin 
     update_query = DB[" 
      UPDATE table 
      SET extra_fields = '#{JSON.generate(extra_fields)}' 
      WHERE id = #{object[:id]} 
     "] 

     update_result = update_query.update 
     updater_logger.info("Updated Photo ID: #{object[:id]}") if update_result == 1 
    rescue Sequel::Error => e 
     error_logger.info("Could not update 'extra_fields' for #{object[:id]} because #{e.message}") 
    end 
end 

def construct_new_tags(tags) 
    #logic 
    return new_tags 
end 

def update_objects(objects, updater_logger, error_logger) 
    objects.each do |object| 
     extra_fields = JSON.parse(object[:extra_fields]) 
     tags = extra_fields["field"] 
     new_tags = construct_new_tags(tags) 
     extra_fields["new_field"] = new_tags 
     update_extra_fields(object, extra_fields, updater_logger, error_logger) 
    end 
end 

num_threads = 0 
all_threads = [] 
consumer = Thread.new { 
    while queue.length > 0 
     if num_threads <= 15 
      objects_block = queue.shift 

      all_threads << Thread.new { 
       num_threads += 1 
       update_objects(objects_block, updater_logger, error_logger) 
       num_threads -= 1 
      } 
     else 
      progress_logger.info("Threads limit reached. Sleeping Updater") 
      sleep(1) 
     end 
    end 
} 

collector.join 
consumer.join  
all_threads.each do |thread| 
    thread.join 
end 


puts "Queue END reached: EXIT" 
  1. Nichts ist jemals in dem error_logger angemeldet wird
  2. Die Anzahl der Zeilen jedes Mal aktualisiert, Ich betreiben das Skript unterscheidet. So bin ich zuversichtlich, dass es kein bestimmtes Muster gibt, dass mir Zeilen zum Aktualisieren etc. fehlen.
  3. Wenn ich ein einzelnes Thread-Skript schreibe, das der gleichen Logik folgt. Alle Zeilen wurden erfolgreich aktualisiert.

Antwort

1

Ich denke, Sie haben einen Fehler in Ihrem Kollektor. Wenn die Warteschlangenlänge größer als 100 ist, werfen Sie einfach Ihre Daten weg und queue.push(photos) sollte queue.push(objects) sein.

 if queue.length >= 100 
      progress_logger.info("Queue length limit reached. Sleeping Collector") 
      sleep(5) 
     else 
      queue.push(photos) 
      progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}") 
     end 

Sie wollen wahrscheinlich etwas wie folgt aus:

queue = Queue.new 
... 
     while queue.length >= 100 
      progress_logger.info("Queue length limit reached. Sleeping Collector") 
      sleep(5) 
     end 

     queue.push(objects) 
     progress_logger.info("Pushing to Queue. FROM: #{lower_limit} TO: #{upper_limit}") 

... 

all_threads = [] 

consumer = Thread.new { 
    loop { 
    if num_threads <= 15 
    objects_block = queue.shift 

    all_threads << Thread.new { 
     num_threads += 1 
     update_objects(objects_block, updater_logger, error_logger) 
     num_threads -= 1 
    } 
    else 
    progress_logger.info("Threads limit reached. Sleeping Updater") 
    sleep(1) 
    end 
    } 
} 

collector.join 

while !queue.empty? 
    sleep(10) 
end 

all_threads.each do |thread| 
    thread.join 
end 
+0

Oh! Großer Fang! "Objekte" sollten "Fotos" sein, denn "Objekte" sind nichts, was ich gerade umbenannt habe, um die echten Namen zu verschleiern. Aber danke, dass Sie darauf hingewiesen haben – satnam

+0

Die zweite Sache ist, dass wenn die Warteschlange größer als 100 ist, Sie nichts mit den 'Objekten' machen. – AlexN

+0

Also habe ich meinen Code mit Ihrem aktualisiert und sehe immer noch, dass Zeilen übersprungen werden. Ich habe auch meinen ursprünglichen Beitrag aktualisiert, um zu zeigen, wie mein Skript jetzt aussieht. Ich stimme zu, dass ich ein entscheidendes Stück in meinem Code vermisse, aber immer noch etwa 33% der Zeilen werden übersprungen – satnam