diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index c8a0617a5..55b5be488 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -42,7 +42,9 @@ def start(_type, _args) do setup_instrumenters() load_custom_modules() - if adapter() == Tesla.Adapter.Gun do + adapter = Application.get_env(:tesla, :adapter) + + if adapter == Tesla.Adapter.Gun do if version = Pleroma.OTPVersion.version() do [major, minor] = version @@ -74,7 +76,7 @@ def start(_type, _args) do Pleroma.Plugs.RateLimiter.Supervisor ] ++ cachex_children() ++ - http_pools_children(Config.get(:env)) ++ + http_children(adapter, @env) ++ [ Pleroma.Stats, Pleroma.JobQueueMonitor, @@ -206,15 +208,13 @@ defp task_children(_) do end # start hackney and gun pools in tests - defp http_pools_children(:test) do + defp http_children(_, :test) do hackney_options = Config.get([:hackney_pools, :federation]) hackney_pool = :hackney_pool.child_spec(:federation, hackney_options) [hackney_pool, Pleroma.Pool.Supervisor] end - defp http_pools_children(_), do: http_pools(adapter()) - - defp http_pools(Tesla.Adapter.Hackney) do + defp http_children(Tesla.Adapter.Hackney, _) do pools = [:federation, :media] pools = @@ -230,9 +230,7 @@ defp http_pools(Tesla.Adapter.Hackney) do end end - defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor] + defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor] - defp http_pools(_), do: [] - - defp adapter, do: Application.get_env(:tesla, :adapter) + defp http_children(_, _), do: [] end diff --git a/lib/pleroma/config/transfer_task.ex b/lib/pleroma/config/transfer_task.ex index 4a4c022f0..b6d80adb7 100644 --- a/lib/pleroma/config/transfer_task.ex +++ b/lib/pleroma/config/transfer_task.ex @@ -5,6 +5,7 @@ defmodule Pleroma.Config.TransferTask do use Task + alias Pleroma.Config alias Pleroma.ConfigDB alias Pleroma.Repo @@ -36,36 +37,31 @@ defmodule Pleroma.Config.TransferTask do def start_link(_) do load_and_update_env() - if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo) + if Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Repo) :ignore end - @spec load_and_update_env([ConfigDB.t()]) :: :ok | false - def load_and_update_env(deleted \\ [], restart_pleroma? \\ true) do - with {_, true} <- {:configurable, Pleroma.Config.get(:configurable_from_database)} do + @spec load_and_update_env([ConfigDB.t()], boolean()) :: :ok + def load_and_update_env(deleted_settings \\ [], restart_pleroma? \\ true) do + with {_, true} <- {:configurable, Config.get(:configurable_from_database)} do # We need to restart applications for loaded settings take effect - in_db = Repo.all(ConfigDB) - - with_deleted = in_db ++ deleted # TODO: some problem with prometheus after restart! - reject = [nil, :prometheus] - - reject_for_restart = + reject_restart = if restart_pleroma? do - reject + [nil, :prometheus] else - [:pleroma | reject] + [:pleroma, nil, :prometheus] end started_applications = Application.started_applications() - with_deleted - |> Enum.map(&merge_and_update(&1)) + (Repo.all(ConfigDB) ++ deleted_settings) + |> Enum.map(&merge_and_update/1) |> Enum.uniq() - |> Enum.reject(&(&1 in reject_for_restart)) + |> Enum.reject(&(&1 in reject_restart)) |> maybe_set_pleroma_last() - |> Enum.each(&restart(started_applications, &1, Pleroma.Config.get(:env))) + |> Enum.each(&restart(started_applications, &1, Config.get(:env))) :ok else @@ -108,18 +104,14 @@ defp merge_and_update(setting) do key = ConfigDB.from_string(setting.key) group = ConfigDB.from_string(setting.group) - default = Pleroma.Config.Holder.config(group, key) + default = Config.Holder.config(group, key) value = ConfigDB.from_binary(setting.value) merged_value = - if Ecto.get_meta(setting, :state) == :deleted do - default - else - if can_be_merged?(default, value) do - ConfigDB.merge_group(group, key, default, value) - else - value - end + cond do + Ecto.get_meta(setting, :state) == :deleted -> default + can_be_merged?(default, value) -> ConfigDB.merge_group(group, key, default, value) + true -> value end :ok = update_env(group, key, merged_value) diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index 57a847c30..20823a765 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -49,8 +49,6 @@ def open(%URI{} = uri, name, opts) do key = "#{uri.scheme}:#{uri.host}:#{uri.port}" - Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}") - conn_pid = if Connections.count(name) < opts[:max_connection] do do_open(uri, opts) @@ -109,9 +107,9 @@ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do else error -> Logger.warn( - "Received error on opening connection with http proxy #{ - Connections.compose_uri_log(uri) - } #{inspect(error)}" + "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{ + inspect(error) + }" ) error @@ -145,9 +143,9 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do else error -> Logger.warn( - "Received error on opening connection with socks proxy #{ - Connections.compose_uri_log(uri) - } #{inspect(error)}" + "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{ + inspect(error) + }" ) error @@ -163,9 +161,7 @@ defp do_open(%URI{host: host, port: port} = uri, opts) do else error -> Logger.warn( - "Received error on opening connection #{Connections.compose_uri_log(uri)} #{ - inspect(error) - }" + "Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}" ) error @@ -184,16 +180,17 @@ defp add_http2_opts(opts, "https", tls_opts) do defp add_http2_opts(opts, _, _), do: opts defp close_least_used_and_do_open(name, uri, opts) do - Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}") - - with [{close_key, least_used} | _conns] <- - Connections.get_unused_conns(name), - :ok <- Gun.close(least_used.conn) do - Connections.remove_conn(name, close_key) + with [{key, conn} | _conns] <- Connections.get_unused_conns(name), + :ok <- Gun.close(conn.conn) do + Connections.remove_conn(name, key) do_open(uri, opts) else [] -> {:error, :pool_overflowed} end end + + def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do + "#{scheme}://#{host}#{path}" + end end diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 2c13666ec..510722ff9 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -7,7 +7,7 @@ defmodule Pleroma.HTTP.AdapterHelper do @type proxy :: {Connection.host(), pos_integer()} - | {Connection.proxy_type(), pos_integer()} + | {Connection.proxy_type(), Connection.host(), pos_integer()} @callback options(keyword(), URI.t()) :: keyword() @callback after_request(keyword()) :: :ok diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index 55c2b192a..f14b95c19 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -20,8 +20,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do ] @spec options(keyword(), URI.t()) :: keyword() - def options(connection_opts \\ [], %URI{} = uri) do - formatted_proxy = + def options(incoming_opts \\ [], %URI{} = uri) do + proxy = Pleroma.Config.get([:http, :proxy_url], nil) |> AdapterHelper.format_proxy() @@ -30,8 +30,8 @@ def options(connection_opts \\ [], %URI{} = uri) do @defaults |> Keyword.merge(config_opts) |> add_scheme_opts(uri) - |> AdapterHelper.maybe_add_proxy(formatted_proxy) - |> maybe_get_conn(uri, connection_opts) + |> AdapterHelper.maybe_add_proxy(proxy) + |> maybe_get_conn(uri, incoming_opts) end @spec after_request(keyword()) :: :ok @@ -43,44 +43,35 @@ def after_request(opts) do :ok end - defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts + defp add_scheme_opts(opts, %{scheme: "http"}), do: opts - defp add_scheme_opts(opts, %URI{scheme: "https"}) do + defp add_scheme_opts(opts, %{scheme: "https"}) do opts |> Keyword.put(:certificates_verification, true) - |> Keyword.put(:transport, :tls) |> Keyword.put(:tls_opts, log_level: :warning) end - defp maybe_get_conn(adapter_opts, uri, connection_opts) do + defp maybe_get_conn(adapter_opts, uri, incoming_opts) do {receive_conn?, opts} = adapter_opts - |> Keyword.merge(connection_opts) + |> Keyword.merge(incoming_opts) |> Keyword.pop(:receive_conn, true) if Connections.alive?(:gun_connections) and receive_conn? do - try_to_get_conn(uri, opts) + checkin_conn(uri, opts) else opts end end - defp try_to_get_conn(uri, opts) do + defp checkin_conn(uri, opts) do case Connections.checkin(uri, :gun_connections) do nil -> - Logger.debug( - "Gun connections pool checkin was not successful. Trying to open conn for next request." - ) - - Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end) + Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts]) opts conn when is_pid(conn) -> - Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}") - - opts - |> Keyword.put(:conn, conn) - |> Keyword.put(:close_conn, false) + Keyword.merge(opts, conn: conn, close_conn: false) end end end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index 0fc88f708..76de3fcfe 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -71,15 +71,15 @@ def parse_proxy(proxy) when is_binary(proxy) do {:ok, parse_host(host), port} else {_, _} -> - Logger.warn("parsing port in proxy fail #{inspect(proxy)}") + Logger.warn("Parsing port failed #{inspect(proxy)}") {:error, :invalid_proxy_port} :error -> - Logger.warn("parsing port in proxy fail #{inspect(proxy)}") + Logger.warn("Parsing port failed #{inspect(proxy)}") {:error, :invalid_proxy_port} _ -> - Logger.warn("parsing proxy fail #{inspect(proxy)}") + Logger.warn("Parsing proxy failed #{inspect(proxy)}") {:error, :invalid_proxy} end end @@ -89,7 +89,7 @@ def parse_proxy(proxy) when is_tuple(proxy) do {:ok, type, parse_host(host), port} else _ -> - Logger.warn("parsing proxy fail #{inspect(proxy)}") + Logger.warn("Parsing proxy failed #{inspect(proxy)}") {:error, :invalid_proxy} end end diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index 466a94adc..583b56484 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -56,10 +56,9 @@ def post(url, body, headers \\ [], options \\ []), {:ok, Env.t()} | {:error, any()} def request(method, url, body, headers, options) when is_binary(url) do uri = URI.parse(url) - received_adapter_opts = Keyword.get(options, :adapter, []) - adapter_opts = Connection.options(uri, received_adapter_opts) + adapter_opts = Connection.options(uri, options[:adapter] || []) options = put_in(options[:adapter], adapter_opts) - params = Keyword.get(options, :params, []) + params = options[:params] || [] request = build_request(method, headers, options, url, body, params) adapter = Application.get_env(:tesla, :adapter) diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex index 7529e9240..772833509 100644 --- a/lib/pleroma/pool/connections.ex +++ b/lib/pleroma/pool/connections.ex @@ -87,18 +87,11 @@ def handle_cast({:add_conn, key, conn}, state) do @impl true def handle_cast({:checkout, conn_pid, pid}, state) do - Logger.debug("checkout #{inspect(conn_pid)}") - state = with true <- Process.alive?(conn_pid), {key, conn} <- find_conn(state.conns, conn_pid), used_by <- List.keydelete(conn.used_by, pid, 0) do - conn_state = - if used_by == [] do - :idle - else - conn.conn_state - end + conn_state = if used_by == [], do: :idle, else: conn.conn_state put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) else @@ -123,26 +116,23 @@ def handle_cast({:remove_conn, key}, state) do @impl true def handle_call({:checkin, uri}, from, state) do key = "#{uri.scheme}:#{uri.host}:#{uri.port}" - Logger.debug("checkin #{key}") case state.conns[key] do - %{conn: conn, gun_state: :up} = current_conn -> - Logger.debug("reusing conn #{key}") - + %{conn: pid, gun_state: :up} = conn -> time = :os.system_time(:second) - last_reference = time - current_conn.last_reference - current_crf = crf(last_reference, 100, current_conn.crf) + last_reference = time - conn.last_reference + crf = crf(last_reference, 100, conn.crf) state = put_in(state.conns[key], %{ - current_conn + conn | last_reference: time, - crf: current_crf, + crf: crf, conn_state: :active, - used_by: [from | current_conn.used_by] + used_by: [from | conn.used_by] }) - {:reply, conn, state} + {:reply, pid, state} %{gun_state: :down} -> {:reply, nil, state} @@ -164,50 +154,48 @@ def handle_call(:count, _from, state) do def handle_call(:unused_conns, _from, state) do unused_conns = state.conns - |> Enum.filter(fn {_k, v} -> - v.conn_state == :idle and v.used_by == [] - end) - |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> - x.crf <= y.crf and x.last_reference <= y.last_reference - end) + |> Enum.filter(&filter_conns/1) + |> Enum.sort(&sort_conns/2) {:reply, unused_conns, state} end + defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true + defp filter_conns(_), do: false + + defp sort_conns({_, c1}, {_, c2}) do + c1.crf <= c2.crf and c1.last_reference <= c2.last_reference + end + @impl true def handle_info({:gun_up, conn_pid, _protocol}, state) do - state = - with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), - {key, conn} <- find_conn(state.conns, conn_pid, conn_key), - {true, key} <- {Process.alive?(conn_pid), key} do - time = :os.system_time(:second) - last_reference = time - conn.last_reference - current_crf = crf(last_reference, 100, conn.crf) + %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid) + host = + case :inet.ntoa(host) do + {:error, :einval} -> host + ip -> ip + end + + key = "#{scheme}:#{host}:#{port}" + + state = + with {_key, conn} <- find_conn(state.conns, conn_pid, key), + {true, key} <- {Process.alive?(conn_pid), key} do put_in(state.conns[key], %{ conn | gun_state: :up, - last_reference: time, - crf: current_crf, conn_state: :active, retries: 0 }) else - :error_gun_info -> - Logger.debug(":gun.info caused error") - state - {false, key} -> - Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}") - put_in( state.conns, Map.delete(state.conns, key) ) nil -> - Logger.debug(":gun_up message for conn which is not found in state") - :ok = Gun.close(conn_pid) state @@ -224,7 +212,6 @@ def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do with {key, conn} <- find_conn(state.conns, conn_pid), {true, key} <- {Process.alive?(conn_pid), key} do if conn.retries == retries do - Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}") :ok = Gun.close(conn.conn) put_in( @@ -240,18 +227,13 @@ def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do end else {false, key} -> - # gun can send gun_down for closed conn, maybe connection is not closed yet - Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}") - put_in( state.conns, Map.delete(state.conns, key) ) nil -> - Logger.debug(":gun_down message for conn which is not found in state") - - :ok = Gun.close(conn_pid) + Logger.debug(":gun_down for conn which isn't found in state") state end @@ -275,7 +257,7 @@ def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do ) else nil -> - Logger.debug(":DOWN message for conn which is not found in state") + Logger.debug(":DOWN for conn which isn't found in state") state end @@ -283,18 +265,6 @@ def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do {:noreply, state} end - defp compose_key_gun_info(pid) do - %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = Gun.info(pid) - - host = - case :inet.ntoa(origin_host) do - {:error, :einval} -> origin_host - ip -> ip - end - - "#{scheme}:#{host}:#{port}" - end - defp find_conn(conns, conn_pid) do Enum.find(conns, fn {_key, conn} -> conn.conn == conn_pid @@ -310,8 +280,4 @@ defp find_conn(conns, conn_pid, conn_key) do def crf(current, steps, crf) do 1 + :math.pow(0.5, current / steps) * crf end - - def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do - "#{scheme}://#{host}#{path}" - end end diff --git a/test/http/adapter_helper/gun_test.exs b/test/http/adapter_helper/gun_test.exs index 6af8be15d..18025b986 100644 --- a/test/http/adapter_helper/gun_test.exs +++ b/test/http/adapter_helper/gun_test.exs @@ -6,7 +6,6 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do use ExUnit.Case, async: true use Pleroma.Tests.Helpers - import ExUnit.CaptureLog import Mox alias Pleroma.Config @@ -63,7 +62,6 @@ test "https url with non standart port" do opts = Gun.options([receive_conn: false], uri) assert opts[:certificates_verification] - assert opts[:transport] == :tls end test "get conn on next request" do @@ -73,14 +71,12 @@ test "get conn on next request" do on_exit(fn -> Logger.configure(level: level) end) uri = URI.parse("http://some-domain2.com") - assert capture_log(fn -> - opts = Gun.options(uri) + opts = Gun.options(uri) - assert opts[:conn] == nil - assert opts[:close_conn] == nil - end) =~ - "Gun connections pool checkin was not successful. Trying to open conn for next request." + assert opts[:conn] == nil + assert opts[:close_conn] == nil + Process.sleep(50) opts = Gun.options(uri) assert is_pid(opts[:conn]) diff --git a/test/pool/connections_test.exs b/test/pool/connections_test.exs index 06f32b74e..aeda54875 100644 --- a/test/pool/connections_test.exs +++ b/test/pool/connections_test.exs @@ -355,7 +355,7 @@ test "connection can't get up", %{name: name} do refute Conn.open(url, name) refute Connections.checkin(url, name) end) =~ - "Received error on opening connection http://gun-not-up.com {:error, :timeout}" + "Opening connection to http://gun-not-up.com failed with error {:error, :timeout}" end test "process gun_down message and then gun_up", %{name: name} do