From 581123f8bb703023cb652267a1fc34292f862852 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Fri, 23 Aug 2019 18:28:23 +0300 Subject: [PATCH] [#1149] Introduced `quantum` job scheduler. Documentation & config changes. --- CHANGELOG.md | 2 ++ config/config.exs | 40 +++++++++++++++++--------- config/test.exs | 2 -- docs/config.md | 15 ++++++---- lib/pleroma/application.ex | 19 ++---------- lib/pleroma/scheduler.ex | 7 +++++ lib/pleroma/web/federator/federator.ex | 8 +----- mix.exs | 2 +- mix.lock | 6 +++- 9 files changed, 54 insertions(+), 47 deletions(-) create mode 100644 lib/pleroma/scheduler.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b0f4f40e..6dc19e79f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Mastodon API: Unsubscribe followers when they unfollow a user - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses) - Improve digest email template +- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban) +- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler ### Fixed - Not being able to pin unlisted posts diff --git a/config/config.exs b/config/config.exs index 1a6348bcd..43d114d70 100644 --- a/config/config.exs +++ b/config/config.exs @@ -51,6 +51,24 @@ telemetry_event: [Pleroma.Repo.Instrumenter], migration_lock: nil +scheduled_jobs = + with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], + true <- digest_config[:active] do + [{digest_config[:schedule], {Pleroma.DigestEmailWorker, :perform, []}}] + else + _ -> [] + end + +scheduled_jobs = + scheduled_jobs ++ + [{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}] + +config :pleroma, Pleroma.Scheduler, + global: true, + overlap: true, + timezone: :utc, + jobs: scheduled_jobs + config :pleroma, Pleroma.Captcha, enabled: false, seconds_valid: 60, @@ -449,23 +467,19 @@ "web" ] -job_queues = [ - federator_incoming: 50, - federator_outgoing: 50, - web_push: 50, - mailer: 10, - transmogrifier: 20, - scheduled_activities: 10, - background: 5 -] - -config :pleroma_job_queue, :queues, job_queues - config :pleroma, Oban, repo: Pleroma.Repo, verbose: false, prune: {:maxage, 60 * 60 * 24 * 7}, - queues: job_queues + queues: [ + federator_incoming: 50, + federator_outgoing: 50, + web_push: 50, + mailer: 10, + transmogrifier: 20, + scheduled_activities: 10, + background: 5 + ] config :pleroma, :workers, retries: [ diff --git a/config/test.exs b/config/test.exs index a0fa67516..62f2a04d2 100644 --- a/config/test.exs +++ b/config/test.exs @@ -61,8 +61,6 @@ config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock -config :pleroma_job_queue, disabled: true - config :pleroma, Oban, queues: false, prune: :disabled diff --git a/docs/config.md b/docs/config.md index ae8afad89..81923c640 100644 --- a/docs/config.md +++ b/docs/config.md @@ -400,9 +400,9 @@ You can then do curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" ``` -## :pleroma_job_queue +## Oban -[Pleroma Job Queue](https://git.pleroma.social/pleroma/pleroma_job_queue) configuration: a list of queues with maximum concurrent jobs. +[Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration. Pleroma has the following queues: @@ -416,12 +416,15 @@ Pleroma has the following queues: Example: ```elixir -config :pleroma_job_queue, :queues, - federator_incoming: 50, - federator_outgoing: 50 +config :pleroma, Oban, + repo: Pleroma.Repo, + queues: [ + federator_incoming: 50, + federator_outgoing: 50 + ] ``` -This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. +This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`. ## Pleroma.Web.Metadata * `providers`: a list of metadata providers to enable. Providers available: diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 384b03aa9..ce2d3ab59 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -31,6 +31,7 @@ def start(_type, _args) do children = [ Pleroma.Repo, + Pleroma.Scheduler, Pleroma.Config.TransferTask, Pleroma.Emoji, Pleroma.Captcha, @@ -69,9 +70,7 @@ def start(_type, _args) do # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Pleroma.Supervisor] - result = Supervisor.start_link(children, opts) - :ok = after_supervisor_start() - result + Supervisor.start_link(children, opts) end defp setup_instrumenters do @@ -162,18 +161,4 @@ defp hackney_pool_children do :hackney_pool.child_spec(pool, options) end end - - defp after_supervisor_start do - with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], - true <- digest_config[:active] do - # TODO: consider replacing with `quantum` scheduler - PleromaJobQueue.schedule( - digest_config[:schedule], - :digest_emails, - Pleroma.DigestEmailWorker - ) - end - - :ok - end end diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex new file mode 100644 index 000000000..d84cd99ad --- /dev/null +++ b/lib/pleroma/scheduler.ex @@ -0,0 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Scheduler do + use Quantum.Scheduler, otp_app: :pleroma +end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index d85fe824f..cf7e50fee 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -21,7 +21,7 @@ defmodule Pleroma.Web.Federator do defdelegate worker_args(queue), to: Pleroma.Workers.Helper def init do - # 1 minute + # To do: consider removing this call in favor of scheduled execution (`quantum`-based) refresh_subscriptions(schedule_in: 60) end @@ -146,12 +146,6 @@ def perform(:verify_websub, websub) do def perform(:refresh_subscriptions) do Logger.debug("Federator running refresh subscriptions") Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) end def ap_enabled_actor(id) do diff --git a/mix.exs b/mix.exs index eb023313d..9d8ded1ff 100644 --- a/mix.exs +++ b/mix.exs @@ -102,6 +102,7 @@ defp deps do {:ecto_sql, "~> 3.1"}, {:postgrex, ">= 0.13.5"}, {:oban, "~> 0.7"}, + {:quantum, "~> 2.3"}, {:gettext, "~> 0.15"}, {:comeonin, "~> 4.1.1"}, {:pbkdf2_elixir, "~> 0.12.3"}, @@ -142,7 +143,6 @@ defp deps do {:http_signatures, git: "https://git.pleroma.social/pleroma/http_signatures.git", ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, - {:pleroma_job_queue, "~> 0.3"}, {:telemetry, "~> 0.3"}, {:prometheus_ex, "~> 3.0"}, {:prometheus_plugs, "~> 1.1"}, diff --git a/mix.lock b/mix.lock index 8b8596375..6ebc66271 100644 --- a/mix.lock +++ b/mix.lock @@ -36,6 +36,8 @@ "excoveralls": {:hex, :excoveralls, "0.11.1", "dd677fbdd49114fdbdbf445540ec735808250d56b011077798316505064edb2c", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, "floki": {:hex, :floki, "0.20.4", "be42ac911fece24b4c72f3b5846774b6e61b83fe685c2fc9d62093277fb3bc86", [:mix], [{:html_entities, "~> 0.4.0", [hex: :html_entities, repo: "hexpm", optional: false]}, {:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"}, "gen_smtp": {:hex, :gen_smtp, "0.14.0", "39846a03522456077c6429b4badfd1d55e5e7d0fdfb65e935b7c5e38549d9202", [:rebar3], [], "hexpm"}, + "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"}, + "gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"}, "gettext": {:hex, :gettext, "0.17.0", "abe21542c831887a2b16f4c94556db9c421ab301aee417b7c4fbde7fbdbe01ec", [:mix], [], "hexpm"}, "hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "html_entities": {:hex, :html_entities, "0.4.0", "f2fee876858cf6aaa9db608820a3209e45a087c5177332799592142b50e89a6b", [:mix], [], "hexpm"}, @@ -46,6 +48,7 @@ "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "joken": {:hex, :joken, "2.0.1", "ec9ab31bf660f343380da033b3316855197c8d4c6ef597fa3fcb451b326beb14", [:mix], [{:jose, "~> 1.9", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm"}, "jose": {:hex, :jose, "1.9.0", "4167c5f6d06ffaebffd15cdb8da61a108445ef5e85ab8f5a7ad926fdf3ada154", [:mix, :rebar3], [{:base64url, "~> 0.0.1", [hex: :base64url, repo: "hexpm", optional: false]}], "hexpm"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"}, "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"}, @@ -65,7 +68,6 @@ "phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm"}, "phoenix_swoosh": {:hex, :phoenix_swoosh, "0.2.0", "a7e0b32077cd6d2323ae15198839b05d9caddfa20663fd85787479e81f89520e", [:mix], [{:phoenix, "~> 1.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.2", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:swoosh, "~> 0.1", [hex: :swoosh, repo: "hexpm", optional: false]}], "hexpm"}, - "pleroma_job_queue": {:hex, :pleroma_job_queue, "0.3.0", "b84538d621f0c3d6fcc1cff9d5648d3faaf873b8b21b94e6503428a07a48ec47", [:mix], [{:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}], "hexpm"}, "plug": {:hex, :plug, "1.8.2", "0bcce1daa420f189a6491f3940cc77ea7fb1919761175c9c3b59800d897440fc", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm"}, "plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, @@ -78,9 +80,11 @@ "prometheus_phoenix": {:hex, :prometheus_phoenix, "1.3.0", "c4b527e0b3a9ef1af26bdcfbfad3998f37795b9185d475ca610fe4388fdd3bb5", [:mix], [{:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.3 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, "prometheus_plugs": {:hex, :prometheus_plugs, "1.1.5", "25933d48f8af3a5941dd7b621c889749894d8a1082a6ff7c67cc99dec26377c5", [:mix], [{:accept, "~> 0.1", [hex: :accept, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}, {:prometheus_process_collector, "~> 1.1", [hex: :prometheus_process_collector, repo: "hexpm", optional: true]}], "hexpm"}, "quack": {:hex, :quack, "0.1.1", "cca7b4da1a233757fdb44b3334fce80c94785b3ad5a602053b7a002b5a8967bf", [:mix], [{:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: false]}, {:tesla, "~> 1.2.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm"}, + "quantum": {:hex, :quantum, "2.3.4", "72a0e8855e2adc101459eac8454787cb74ab4169de6ca50f670e72142d4960e9", [:mix], [{:calendar, "~> 0.17", [hex: :calendar, repo: "hexpm", optional: true]}, {:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}, {:gen_stage, "~> 0.12", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:swarm, "~> 3.3", [hex: :swarm, repo: "hexpm", optional: false]}, {:timex, "~> 3.1", [hex: :timex, repo: "hexpm", optional: true]}], "hexpm"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, "recon": {:git, "https://github.com/ferd/recon.git", "75d70c7c08926d2f24f1ee6de14ee50fe8a52763", [tag: "2.4.0"]}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"}, + "swarm": {:hex, :swarm, "3.4.0", "64f8b30055d74640d2186c66354b33b999438692a91be275bb89cdc7e401f448", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"}, "sweet_xml": {:hex, :sweet_xml, "0.6.6", "fc3e91ec5dd7c787b6195757fbcf0abc670cee1e4172687b45183032221b66b8", [:mix], [], "hexpm"}, "swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"}, "syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]},