Ich habe ein Problem mit einer langwierigen Migration, die ich parallel ausführen wollte (sie kann parallel ausgeführt werden). Eigentlich geht es bei der Migration darum, alle Datensätze in der Datenbank zu übernehmen und zeit- und ressourcenintensive Operationen auf jedem von ihnen durchzuführen.Wie man Timeouts in poolboy behandelt?
Manchmal einzelne Datensatz Migration hängen, so gebe ich 10 Minuten zu beenden. Wenn die Migration noch nicht beendet ist, möchte ich sie ohne Ausnahme herunterfahren (siehe unten).
Ich verwende auch poolboy erlang-Paket, um die Implementierung zu parallelisieren, da Migration nicht nur die Zeit, sondern auch Ressourcen verbraucht. Das Problem ist, dass ich nicht weiß, wie man Fehler behandelt, wenn Timeout passiert und Code bricht. Mein Aufsicht Baum ist:
defmodule MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2 do
use Ecto.Migration
alias MyReelty.Repo
alias MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.Migrator
# parallel nature of migration force us to disable transaction
@disable_ddl_transaction true
@migrator_waiting_time 10 * 60 * 1000 # timeout
@poolboy_waiting_time @migrator_waiting_time + 10 * 1000 # give a time for graceful shutdown
@pool_name :migrator
@pool_size 3
@pool_config [
{ :name, { :local, @pool_name }},
{ :worker_module, Migrator },
{ :size, @pool_size },
{ :max_overflow, 0 },
{ :strategy, :fifo }
]
def up do
children = [
:poolboy.child_spec(@pool_name, @pool_config)
]
opts = [strategy: :one_for_one, name: MyReelty.Supervisor]
Supervisor.start_link(children, opts)
rows = Review |> Repo.all
IO.puts "Total amount of reviews is: #{length(rows)}"
parallel_migrations(rows)
end
def parallel_migrations(rows) do
Enum.map(rows, fn(row) ->
pooled_migration(@pool_name, row)
end)
end
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) -> Migrator.move(pid, { x, @migrator_waiting_time }) end),
@poolboy_waiting_time
)
end
defmodule Migrator do
alias MyReelty.Repo
alias MyReelty.Review
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil, [])
end
def move(server, { params, waiting_time }) do
GenServer.call(server, { :move, params }, waiting_time)
end
def handle_call({ :move, result }, _from, state) do
big_time_and_resource_consuming_task_here
{:reply, %{}, state}
end
end
end
Das Problem, wenn die Migration einiger Datensatz in der Datenbank mehr als 10 Minuten dauert ich diese Art von Ausnahme haben:
20:18:16.917 [error] Task #PID<0.282.0> started from #PID<0.70.0> terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
(elixir) lib/gen_server.ex:604: GenServer.call/3
(poolboy) src/poolboy.erl:76: :poolboy.transaction/3
(elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<5.53617785/0 in MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.parallel_migrations/1>
Args: []
20:18:16.918 [error] GenServer MyReelty.Repo terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
Last message: {:EXIT, #PID<0.70.0>, {:timeout, {GenServer, :call, [#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000]}}}
State: {:state, {:local, MyReelty.Repo}, :one_for_one, [{:child, #PID<0.231.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, MyReelty.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: true, username: "adik", types: true, name: MyReelty.Repo.Pool, otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}, {Ecto.Adapters.Postgres.DateTime, []}, {Postgrex.Extensions.JSON, [library: Poison]}], pool_size: 1, pool_timeout: 5000, timeout: 15000, adapter: Ecto.Adapters.Postgres, database: "my_dev", hostname: "localhost", pool_size: 10, pool: DBConnection.Poolboy, port: 5432]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {MyReelty.Repo, :my_reelty, Ecto.Adapters.Postgres, [otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}], pool_size: 1]}}
Ich versuchte terminate/2
oder handle_info/2
zu Migrator
einfügen und spiele damit, aber ich habe diese Funktionen noch nicht erreicht, um aufgerufen zu werden. Wie kann ich Timeouts umgehen und verhindern, dass sie meine Migration unterbrechen?
AKTUALISIERT
I @ johlo der Hinweis verwendet, aber ich habe noch Zeit heraus zu bekommen. Meine Funktion ist:
def init(_) do
Process.flag(:trap_exit, true)
{:ok, %{}}
end
Es neigt dazu zu arbeiten! By the way, wie kann ich 'Migrator' Prozess hier töten? – asiniy
Wenn Sie den ': exit' abfangen, können Sie [Process.exit/2] (http://elixir-lang.org/docs/stable/elixir/Process.html#exit/2) verwenden, um ein': kill' zu senden Signal an den 'Migrator'. Oder Sie können eine gewöhnliche 'GenServer'-Nachricht senden, die besagt, dass sie aufhören soll, aber diese wird nur verarbeitet, wenn sie mit der vorherigen' move'-Aufgabe fertig ist. – johlo
Gib mir ein paar Tage, um es zu testen und ein Kopfgeld zu vergeben, okay? – asiniy