2016-07-29 24 views
12

Ich habe ein Problem mit einer langwierigen Migration, die ich parallel ausführen wollte (sie kann parallel ausgeführt werden). Eigentlich geht es bei der Migration darum, alle Datensätze in der Datenbank zu übernehmen und zeit- und ressourcenintensive Operationen auf jedem von ihnen durchzuführen.Wie man Timeouts in poolboy behandelt?

Manchmal einzelne Datensatz Migration hängen, so gebe ich 10 Minuten zu beenden. Wenn die Migration noch nicht beendet ist, möchte ich sie ohne Ausnahme herunterfahren (siehe unten).

Ich verwende auch poolboy erlang-Paket, um die Implementierung zu parallelisieren, da Migration nicht nur die Zeit, sondern auch Ressourcen verbraucht. Das Problem ist, dass ich nicht weiß, wie man Fehler behandelt, wenn Timeout passiert und Code bricht. Mein Aufsicht Baum ist:

defmodule MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2 do 
    use Ecto.Migration 

    alias MyReelty.Repo 
    alias MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.Migrator 

    # parallel nature of migration force us to disable transaction 
    @disable_ddl_transaction true 

    @migrator_waiting_time 10 * 60 * 1000 # timeout 
    @poolboy_waiting_time @migrator_waiting_time + 10 * 1000 # give a time for graceful shutdown 

    @pool_name :migrator 
    @pool_size 3 
    @pool_config [ 
    { :name, { :local, @pool_name }}, 
    { :worker_module, Migrator }, 
    { :size, @pool_size }, 
    { :max_overflow, 0 }, 
    { :strategy, :fifo } 
    ] 

    def up do 
    children = [ 
     :poolboy.child_spec(@pool_name, @pool_config) 
    ] 
    opts = [strategy: :one_for_one, name: MyReelty.Supervisor] 
    Supervisor.start_link(children, opts) 

    rows = Review |> Repo.all 

    IO.puts "Total amount of reviews is: #{length(rows)}" 

    parallel_migrations(rows) 
    end 

    def parallel_migrations(rows) do 
    Enum.map(rows, fn(row) -> 
     pooled_migration(@pool_name, row) 
    end) 
    end 

    def pooled_migration(pool, x) do 
    :poolboy.transaction(
     pool, 
     (fn(pid) -> Migrator.move(pid, { x, @migrator_waiting_time }) end), 
     @poolboy_waiting_time 
    ) 
    end 

    defmodule Migrator do 
    alias MyReelty.Repo 
    alias MyReelty.Review 

    use GenServer 

    def start_link(_) do 
     GenServer.start_link(__MODULE__, nil, []) 
    end 

    def move(server, { params, waiting_time }) do 
     GenServer.call(server, { :move, params }, waiting_time) 
    end 

    def handle_call({ :move, result }, _from, state) do 
     big_time_and_resource_consuming_task_here  
     {:reply, %{}, state} 
    end 
    end 
end 

Das Problem, wenn die Migration einiger Datensatz in der Datenbank mehr als 10 Minuten dauert ich diese Art von Ausnahme haben:

20:18:16.917 [error] Task #PID<0.282.0> started from #PID<0.70.0> terminating 
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000) 
    ** (EXIT) time out 
    (elixir) lib/gen_server.ex:604: GenServer.call/3 
    (poolboy) src/poolboy.erl:76: :poolboy.transaction/3 
    (elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2 
    (elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5 
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3 
Function: #Function<5.53617785/0 in MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.parallel_migrations/1> 
    Args: [] 

20:18:16.918 [error] GenServer MyReelty.Repo terminating 
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000) 
    ** (EXIT) time out 
Last message: {:EXIT, #PID<0.70.0>, {:timeout, {GenServer, :call, [#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000]}}} 
State: {:state, {:local, MyReelty.Repo}, :one_for_one, [{:child, #PID<0.231.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, MyReelty.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: true, username: "adik", types: true, name: MyReelty.Repo.Pool, otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}, {Ecto.Adapters.Postgres.DateTime, []}, {Postgrex.Extensions.JSON, [library: Poison]}], pool_size: 1, pool_timeout: 5000, timeout: 15000, adapter: Ecto.Adapters.Postgres, database: "my_dev", hostname: "localhost", pool_size: 10, pool: DBConnection.Poolboy, port: 5432]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {MyReelty.Repo, :my_reelty, Ecto.Adapters.Postgres, [otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}], pool_size: 1]}} 

Ich versuchte terminate/2 oder handle_info/2 zu Migrator einfügen und spiele damit, aber ich habe diese Funktionen noch nicht erreicht, um aufgerufen zu werden. Wie kann ich Timeouts umgehen und verhindern, dass sie meine Migration unterbrechen?

AKTUALISIERT

I @ johlo der Hinweis verwendet, aber ich habe noch Zeit heraus zu bekommen. Meine Funktion ist:

def init(_) do 
Process.flag(:trap_exit, true) 
{:ok, %{}} 
end 

Antwort

6

Wenn die Migrator.move/2 (das heißt die GenServer.call) Funktion Zeiten heraus, es wird die gesamte MoveVideosFromVimeoToB2 Prozess zum Absturz bringen, da das ist der eigentliche Prozess, der die GenServer Anruf tätigt.

Die Lösung hier ist das Timeout in der anonymen Funktion in pooled_migration zu fangen, so etwas wie (ich bin nicht sehr vertraut mit Elixir Syntax, so dass es möglicherweise nicht kompiliert werden, aber man sollte auf die Idee kommen):

def pooled_migration(pool, x) do 
:poolboy.transaction(
    pool, 
    (fn(pid) -> 
     try do 
      Migrator.move(pid, { x, @migrator_waiting_time }) 
     catch 
      :exit, reason -> 
      # Ignore error, log it or something else 
      :ok 
     end 
    end), 
    @poolboy_waiting_time 
) 
end 

Es ist nicht die Migrator Prozess, die Zeitüberschreitung, es ist die GenServer Aufruf an die Migrator, die tut, und wir müssen try-catch das.

Beachten Sie auch, dass der Migrator Prozess nicht abgebrochen wird, es läuft noch, siehe timeouts Abschnitt in der .

UPDATE: Wie @asiniy in den Kommentaren erwähnt die @poolboy_waiting_time sollte :infinity so die poolboy.transaction Funktion nicht werfen einen Timeout-Fehler eingestellt werden, wenn für einen freien Migrator Arbeitsprozess warten. Da der Migrator schließlich austritt, ist dies sicher.

+0

Es neigt dazu zu arbeiten! By the way, wie kann ich 'Migrator' Prozess hier töten? – asiniy

+0

Wenn Sie den ': exit' abfangen, können Sie [Process.exit/2] (http://elixir-lang.org/docs/stable/elixir/Process.html#exit/2) verwenden, um ein': kill' zu senden Signal an den 'Migrator'. Oder Sie können eine gewöhnliche 'GenServer'-Nachricht senden, die besagt, dass sie aufhören soll, aber diese wird nur verarbeitet, wenn sie mit der vorherigen' move'-Aufgabe fertig ist. – johlo

+0

Gib mir ein paar Tage, um es zu testen und ein Kopfgeld zu vergeben, okay? – asiniy