pleroma/lib/pleroma/jobs.ex

153 lines
3.8 KiB
Elixir

# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Jobs do
@moduledoc """
A basic job queue
"""
use GenServer
require Logger
def init(args) do
{:ok, args}
end
def start_link do
queues =
Pleroma.Config.get(Pleroma.Jobs)
|> Enum.map(fn {name, _} -> create_queue(name) end)
|> Enum.into(%{})
state = %{
queues: queues,
refs: %{}
}
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def create_queue(name) do
{name, {:sets.new(), []}}
end
@doc """
Enqueues a job.
Returns `:ok`.
## Arguments
- `queue_name` - a queue name(must be specified in the config).
- `mod` - a worker module (must have `perform` function).
- `args` - a list of arguments for the `perform` function of the worker module.
- `priority` - a job priority (`0` by default).
## Examples
Enqueue `Module.perform/0` with `priority=1`:
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
:ok
Enqueue `Module.perform(:job_name)` with `priority=5`:
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
:ok
Enqueue `Module.perform(:another_job, data)` with `priority=1`:
iex> data = "foobar"
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
:ok
Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
:ok
"""
def enqueue(queue_name, mod, args, priority \\ 1)
if Mix.env() == :test do
def enqueue(_queue_name, mod, args, _priority) do
apply(mod, :perform, args)
end
else
@spec enqueue(atom(), atom(), [any()], integer()) :: :ok
def enqueue(queue_name, mod, args, priority) do
GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
end
end
def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
{running_jobs, queue} = state[:queues][queue_name]
queue = enqueue_sorted(queue, {mod, args}, priority)
state =
state
|> update_queue(queue_name, {running_jobs, queue})
|> maybe_start_job(queue_name, running_jobs, queue)
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
queue_name = state.refs[ref]
{running_jobs, queue} = state[:queues][queue_name]
running_jobs = :sets.del_element(ref, running_jobs)
state =
state
|> remove_ref(ref)
|> update_queue(queue_name, {running_jobs, queue})
|> maybe_start_job(queue_name, running_jobs, queue)
{:noreply, state}
end
def maybe_start_job(state, queue_name, running_jobs, queue) do
if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
queue != [] do
{{mod, args}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
mref = Process.monitor(pid)
state
|> add_ref(queue_name, mref)
|> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
else
state
end
end
def enqueue_sorted(queue, element, priority) do
[%{item: element, priority: priority} | queue]
|> Enum.sort_by(fn %{priority: priority} -> priority end)
end
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
defp add_ref(state, queue_name, ref) do
refs = Map.put(state[:refs], ref, queue_name)
Map.put(state, :refs, refs)
end
defp remove_ref(state, ref) do
refs = Map.delete(state[:refs], ref)
Map.put(state, :refs, refs)
end
defp update_queue(state, queue_name, data) do
queues = Map.put(state[:queues], queue_name, data)
Map.put(state, :queues, queues)
end
end