From 0ffde499b8a8f31c82183253bdd692c75733ca2f Mon Sep 17 00:00:00 2001 From: rinpatch Date: Wed, 15 Jul 2020 15:24:47 +0300 Subject: [PATCH] Connection Pool: register workers using :via --- lib/pleroma/gun/connection_pool.ex | 8 +++++--- lib/pleroma/gun/connection_pool/worker.ex | 17 ++++++++--------- .../gun/connection_pool/worker_supervisor.ex | 3 +-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex index 545bfaf7f..e951872fe 100644 --- a/lib/pleroma/gun/connection_pool.ex +++ b/lib/pleroma/gun/connection_pool.ex @@ -15,7 +15,7 @@ def get_conn(uri, opts) do case Registry.lookup(@registry, key) do # The key has already been registered, but connection is not up yet - [{worker_pid, {nil, _used_by, _crf, _last_reference}}] -> + [{worker_pid, nil}] -> get_gun_pid_from_worker(worker_pid) [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] -> @@ -26,13 +26,13 @@ def get_conn(uri, opts) do # :gun.set_owner fails in :connected state for whatevever reason, # so we open the connection in the process directly and send it's pid back # We trust gun to handle timeouts by itself - case WorkerSupervisor.start_worker([uri, key, opts, self()]) do + case WorkerSupervisor.start_worker([key, uri, opts, self()]) do {:ok, _worker_pid} -> receive do {:conn_pid, pid} -> {:ok, pid} end - {:error, {:error, {:already_registered, worker_pid}}} -> + {:error, {:already_started, worker_pid}} -> get_gun_pid_from_worker(worker_pid) err -> @@ -56,6 +56,8 @@ defp get_gun_pid_from_worker(worker_pid) do end def release_conn(conn_pid) do + # :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid -> + # worker_pid end) query_result = Registry.select(@registry, [ {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]} diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex index 25fafc64c..0a94f16a2 100644 --- a/lib/pleroma/gun/connection_pool/worker.ex +++ b/lib/pleroma/gun/connection_pool/worker.ex @@ -4,20 +4,19 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do @registry Pleroma.Gun.ConnectionPool - def start_link(opts) do - GenServer.start_link(__MODULE__, opts) + def start_link([key | _] = opts) do + GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}}) end @impl true - def init([uri, key, opts, client_pid]) do - time = :os.system_time(:second) - # Register before opening connection to prevent race conditions - with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}), - {:ok, conn_pid} <- Gun.Conn.open(uri, opts), + def init([key, uri, opts, client_pid]) do + with {:ok, conn_pid} <- Gun.Conn.open(uri, opts), Process.link(conn_pid) do + time = :os.system_time(:second) + {_, _} = - Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} -> - {conn_pid, used_by, crf, last_reference} + Registry.update_value(@registry, key, fn _ -> + {conn_pid, [client_pid], 1, time} end) send(client_pid, {:conn_pid, conn_pid}) diff --git a/lib/pleroma/gun/connection_pool/worker_supervisor.ex b/lib/pleroma/gun/connection_pool/worker_supervisor.ex index 5b546bd87..d090c034e 100644 --- a/lib/pleroma/gun/connection_pool/worker_supervisor.ex +++ b/lib/pleroma/gun/connection_pool/worker_supervisor.ex @@ -1,5 +1,5 @@ defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do - @doc "Supervisor for pool workers. Does not do anything except enforce max connection limit" + @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit" use DynamicSupervisor @@ -35,7 +35,6 @@ defp free_pool do pid = spawn(fn -> {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil) - max_connections = Pleroma.Config.get([:connections_pool, :max_connections]) reclaim_max =