2015-10-17 11 views
6

Ich baue eine Job-Warteschlange in Elixir als eine akademische Übung. Derzeit müssen sich meine Mitarbeiter bei der Erstellung manuell bei der Warteschlange registrieren (siehe MyQuestion.Worker.start_link).Elixir/OTP Supervisor Kind Spawn und Terminierung zu erkennen

Ich möchte, dass mein Vorgesetzter die verfügbaren Mitarbeiter beim Erstellen/Neustart in die Warteschlange einträgt, da dies den Test von Mitarbeitern erleichtert und die Kopplung minimiert.

Gibt es eine Möglichkeit zu tun, was ich in dem Code unten in MyQuestion.Supervisor beschrieben habe?

defmodule MyQuestion.Supervisor do 
    use Supervisor 

    def start_link do 
    supervisor = Supervisor.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    children = [ 
     worker(MyQuestion.JobQueue, []), 
     worker(MyQuestion.Worker, [], id: :worker_0), 
     worker(MyQuestion.Worker, [], id: :worker_1)] 
    supervise(children, strategy: :rest_for_one) 
    end 

    # LOOKING FOR SOMETHING LIKE THIS 
    # on worker spawn, I want to add the worker to the queue 
    def child_spawned(pid, {MyQuestion.Worker, _, _}) do 
    # add worker to queue 
    MyQuestion.JobQueue.add_new_worker(pid) 
    end 

    # LOOKING FOR SOMETHING LIKE THIS 
    # I want some way to do the following (imagine the callback existed) 
    def child_terminated(pid, reason, state) 
    # with this information I could tell the job queue to mark 
    # the job associated with the pid as failed and to retry 
    # or maybe extract the job id from the worker state, etc. 
    MyQuestion.JobQueue.remove_worker(pid) 
    MyQuestion.JobQueue.restart_job_for_failed_worker(pid) 
    end 

end 

defmodule MyQuestion.JobQueue do 
    def start_link do 
    Agent.start_link(fn -> [] end, name: __MODULE__) 
    end 

    def new_worker(pid) do 
    # register pid with agent state in available worker list, etc. 
    end 

    def add_job(job_description) do 
    # find idle worker and run job 
    <... snip ...> 
    end 

    <... snip ...> 
end 

defmodule MyQuestion.Worker do 
    use GenServer 
    def start_link do 
    # start worker 
    {:ok, worker} = GenServer.start_link(__MODULE__, []) 

    # Now we have a worker pid, so we can register that pid with the queue 
    # I wish this could be in the supervisor or else where. 
    MyQuestion.JobQueue.add_new_worker(worker) 

    # must return gen server's start link 
    {:ok, worker} 
    end 

    <... snip ...> 
end 

Antwort

1

Der Schlüssel war eine Kombination Process.monitor(pid) des Anrufers - dann werden Sie Anrufe handle_info erhalten - und Supervisor.start_child manuell aufrufen, die gibt Ihnen pids.

Ich hatte zuvor versucht, handle_info zu verwenden, konnte aber nie dazu gebracht werden, es zu nennen. muss von demselben Prozess aufgerufen werden, der die Benachrichtigungen erhalten soll. Sie müssen ihn also innerhalb einer handle_call-Funktion aufrufen, um den Monitor mit Ihrem Serverprozess zu verknüpfen. Es kann eine Funktion geben, Code als einen anderen Prozess auszuführen (d. H. run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)), aber ich konnte nichts finden.

Attached ist eine sehr naive Implementierung einer Jobwarteschlange. Ich bin nur einen Tag in Elixir, also ist der Code sowohl unordentlich als auch nicht-idiomatisch, aber ich füge ihn an, weil es anscheinend an Code-Beispielen fehlt.

Betrachten Sie HeavyIndustry.JobQueue, handle_info, create_new_worker. Es gibt ein offensichtliches Problem mit diesem Code: Er kann Mitarbeiter beim Absturz neu starten, aber es ist nicht in der Lage, die Warteschlange beim nächsten Job von diesem Code aus zu starten (weil GenServer.call innerhalb handle_info erforderlich ist, was uns blockiert). Ich denke, Sie könnten dies beheben, indem Sie den Prozess, der die Jobs startet, vom Prozess trennen, der die Jobs verfolgt. Wenn Sie den Beispielcode ausführen, werden Sie feststellen, dass er schließlich keine Jobs mehr ausführt, obwohl sich noch einer in der Warteschlange befindet (:crash).

defmodule HeavyIndustry.Supervisor do 
    use Supervisor 

    def start_link do 
    Supervisor.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # default to supervising nothing, we will add 
    supervise([], strategy: :one_for_one) 
    end 

    def create_children(supervisor, worker_count) do 
    # create the job queue. defaults to no workers 
    Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]])) 
    end 
end 

defmodule HeavyIndustry.JobQueue do 
    use GenServer 

    @job_queue_name __MODULE__ 

    def start_link(args, _) do 
    GenServer.start_link(__MODULE__, args, name: @job_queue_name) 
    end 

    def init([supervisor, n]) do 
    # set some default state 
    state = %{ 
     supervisor: supervisor, 
     max_workers: n, 
     jobs: [], 
     workers: %{ 
     idle: [], 
     busy: [] 
     } 
    } 
    {:ok, state} 
    end 

    def setup() do 
    # we want to be aware of worker failures. we hook into this by calling 
    # Process.monitor(pid), but this links the calling process with the monitored 
    # process. To make sure the calls come to US and not the process that called 
    # setup, we create the workers by passing a message to our server process 
    state = GenServer.call(@job_queue_name, :setup) 

    # gross passing the whole state back here to monitor but the monitoring must 
    # be started from the server process and we can't call GenServer.call from 
    # inside the :setup call else we deadlock. 
    workers = state.workers.idle 
    GenServer.call(@job_queue_name, {:monitor_pids, workers}) 
    end 

    def add_job(from, job) do 
    # add job to queue 
    {:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}}) 

    # try to run the next job 
    case GenServer.call(@job_queue_name, :start_next_job) do 
     # started our job 
     {:ok, started_job_id = ^our_job_id} -> {:ok, :started} 
     # started *a* job 
     {:ok, _} -> {:ok, :pending} 
     # couldnt start any job but its ok... 
     {:error, :no_idle_workers} -> {:ok, :pending} 
     # something fell over... 
     {:error, e} -> {:error, e} 
     # yeah I know this is bad. 
     _ -> {:ok} 
    end 
    end 

    def start_next_job do 
    GenServer.call(@job_queue_name, :start_next_job) 
    end 

    ## 
    # Internal API 
    ## 

    def handle_call(:setup, _, state) do 
    workers = Enum.map(0..(state.max_workers-1), fn (n) -> 
     {:ok, pid} = start_new_worker(state.supervisor) 
     pid 
    end) 
    state = %{state | workers: %{state.workers | idle: workers}} 
    {:reply, state, state} 
    end 

    defp start_new_worker(supervisor) do 
    spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary) 
    # start worker 
    Supervisor.start_child(supervisor, spec) 
    end 

    def handle_call({:monitor_pids, list}, _, state) do 
    Enum.each(list, &Process.monitor(&1)) 
    {:reply, :ok, state} 
    end 

    def handle_call({:create_job, job}, from, state) do 
    job = %{ 
     job: job.job, 
     reply_to: job.reply_to, 
     id: :os.system_time, # id for task 
     status: :pending, # start pending, go active, then remove 
     pid: nil 
    } 
    # add new job to jobs list 
    state = %{state | jobs: state.jobs ++ [job]} 
    {:reply, {:ok, job.id}, state} 
    end 

    def handle_call(:start_next_job, _, state) do 
    IO.puts "==> Start Next Job" 
    IO.inspect state 
    IO.puts "==================" 

    reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do 
     {{:error, :no_idle_workers}, _} -> 
     # no workers for job, doesnt matter if we have a job 
     {:error, :no_idle_workers} 

     {_, nil} -> 
     # no job, doesnt matter if we have a worker 
     {:error, :no_more_jobs} 

     {{:ok, worker}, job} -> 
     # have worker, have job, do work 

     # update state to set job active and worker busy 
     jobs = state.jobs -- [job] 
     job = %{job | status: :active, pid: worker} 
     jobs = jobs ++ [job] 

     idle = state.workers.idle -- [worker] 
     busy = state.workers.busy ++ [worker] 

     state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}} 

     {:ok, task_id} = Task.start(fn -> 
      result = GenServer.call(worker, job.job) 

      remove_job(job) 
      free_worker(worker) 

      send job.reply_to, %{answer: result, job: job.job} 

      start_next_job 
     end) 
     {:ok, job.id} 
    end 

    {:reply, reply, state} 
    end 

    defp find_idle_worker(workers) do 
    case workers do 
     %{idle: [], busy: _} -> {:error, :no_idle_workers} 
     %{idle: [worker | idle], busy: busy} -> {:ok, worker} 
    end 
    end 

    defp find_next_job(jobs) do 
    jobs |> Enum.find(&(&1.status == :pending)) 
    end 

    defp free_worker(worker) do 
    GenServer.call(@job_queue_name, {:free_worker, worker}) 
    end 
    defp remove_job(job) do 
    GenServer.call(@job_queue_name, {:remove_job, job}) 
    end 

    def handle_call({:free_worker, worker}, from, state) do 
    idle = state.workers.idle ++ [worker] 
    busy = state.workers.busy -- [worker] 
    {:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}} 
    end 

    def handle_call({:remove_job, job}, from, state) do 
    jobs = state.jobs -- [job] 
    {:reply, :ok, %{state | jobs: jobs}} 
    end 

    def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do 
    IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job" 

    # find job for collapsed worker 
    # set job to pending again 
    job = Enum.find(state.jobs, &(&1.pid == pid)) 
    fixed_job = %{job | status: :pending, pid: nil} 
    jobs = (state.jobs -- [job]) ++ [fixed_job] 

    # remote worker from lists 
    idle = state.workers.idle -- [pid] 
    busy = state.workers.busy -- [pid] 

    # start new worker 
    {:ok, pid} = start_new_worker(state.supervisor) 

    # add worker from lists 
    idle = state.workers.idle ++ [pid] 

    # cant call GenServer.call from here to monitor pid, 
    # so duplicate the code a bit... 
    Process.monitor(pid) 

    # update state 
    state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}} 

    {:noreply, state} 
    end 
end 

defmodule HeavyIndustry.Worker do 
    use GenServer 

    def start_link do 
    GenServer.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # workers have no persistent state 
    IO.puts "==> Worker up! #{inspect self}" 
    {:ok, nil} 
    end 

    def handle_call({:sum, list}, from, _) do 
    sum = Enum.reduce(list, fn (n, acc) -> acc + n end) 
    {:reply, sum, nil} 
    end 

    def handle_call({:fib, n}, from, _) do 
    sum = fib_calc(n) 
    {:reply, sum, nil} 
    end 

    def handle_call({:stop}, from, state) do 
    {:stop, "my-stop-reason", "my-stop-reply", state} 
    end 

    def handle_call({:crash}, from, _) do 
    {:reply, "this will crash" ++ 1234, nil} 
    end 

    def handle_call({:timeout}, from, _) do 
    :timer.sleep 10000 
    {:reply, "this will timeout", nil} 
    end 

    # Slow fib 
    defp fib_calc(0), do: 0 
    defp fib_calc(1), do: 1 
    defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2) 

end 

defmodule Looper do 
    def start do 
    {:ok, pid} = HeavyIndustry.Supervisor.start_link 
    {:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2) 
    HeavyIndustry.JobQueue.setup() 
    add_jobs 
    loop 
    end 

    def add_jobs do 
    jobs = [ 
     {:sum, [100, 200, 300]}, 
     {:crash}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:sum, [88, 88, 99]}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:sum, 0..100}, 
     # {:stop}, # stop not really a failure 

     {:sum, [88, 88, 99]}, 
     # {:timeout}, 
     {:sum, [-1]} 
    ] 
    Enum.each(jobs, fn (job) -> 
     IO.puts "~~~~> Add job: #{inspect job}" 
     case HeavyIndustry.JobQueue.add_job(self, job) do 
     {:ok, :started} -> IO.puts "~~~~> Started job immediately" 
     {:ok, :pending} -> IO.puts "~~~~> Job in queue" 
     val -> IO.puts "~~~~> ... val: #{inspect val}" 
     end 
    end) 
    end 

    def loop do 
    receive do 
     value -> 
     IO.puts "~~~~> Received: #{inspect value}" 
     loop 
    end 
    end 
end 

Looper.start