diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex index 0a94f16a2..8467325f3 100644 --- a/lib/pleroma/gun/connection_pool/worker.ex +++ b/lib/pleroma/gun/connection_pool/worker.ex @@ -20,7 +20,10 @@ def init([key, uri, opts, client_pid]) do end) send(client_pid, {:conn_pid, conn_pid}) - {:ok, %{key: key, timer: nil}, :hibernate} + + {:ok, + %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}}, + :hibernate} else err -> {:stop, err} end @@ -45,6 +48,9 @@ def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) d state end + ref = Process.monitor(client_pid) + + state = put_in(state.client_monitors[client_pid], ref) {:noreply, state, :hibernate} end @@ -55,6 +61,9 @@ def handle_cast({:remove_client, client_pid}, %{key: key} = state) do {conn_pid, List.delete(used_by, client_pid), crf, last_reference} end) + {ref, state} = pop_in(state.client_monitors[client_pid]) + Process.demonitor(ref) + timer = if used_by == [] do max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) @@ -85,6 +94,26 @@ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_me {:stop, {:error, down_message}, state} end + @impl true + def handle_info({:DOWN, _ref, :process, pid, reason}, state) do + # Sometimes the client is dead before we demonitor it in :remove_client, so the message + # arrives anyway + + case state.client_monitors[pid] do + nil -> + {:noreply, state, :hibernate} + + _ref -> + :telemetry.execute( + [:pleroma, :connection_pool, :client_death], + %{client_pid: pid, reason: reason}, + %{key: state.key} + ) + + handle_cast({:remove_client, pid}, state) + end + end + # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478 defp crf(time_delta, prev_crf) do 1 + :math.pow(0.5, time_delta / 100) * prev_crf diff --git a/lib/pleroma/telemetry/logger.ex b/lib/pleroma/telemetry/logger.ex index d76dd37b5..4cacae02f 100644 --- a/lib/pleroma/telemetry/logger.ex +++ b/lib/pleroma/telemetry/logger.ex @@ -6,7 +6,8 @@ defmodule Pleroma.Telemetry.Logger do @events [ [:pleroma, :connection_pool, :reclaim, :start], [:pleroma, :connection_pool, :reclaim, :stop], - [:pleroma, :connection_pool, :provision_failure] + [:pleroma, :connection_pool, :provision_failure], + [:pleroma, :connection_pool, :client_death] ] def attach do :telemetry.attach_many("pleroma-logger", @events, &handle_event/4, []) @@ -59,4 +60,17 @@ def handle_event( "Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion" end) end + + def handle_event( + [:pleroma, :connection_pool, :client_death], + %{client_pid: client_pid, reason: reason}, + %{key: key}, + _ + ) do + Logger.warn(fn -> + "Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{ + inspect(reason) + }" + end) + end end