HTTP: radically simplify pool checkin/checkout

Use a custom tesla middleware instead of adapter helper function +
custom redirect middleware.

This will also fix "Client died before releasing the connection"
messages when the request pool is overloaded. Since the checkout is
now done after passing ConcurrentLimiter.

This is technically less efficient, since the connection needs to be
checked in/out every time the middleware is left or entered respectively.
But I don't think the nanoseconds we might lose on redirects
to the same host are worth the complexity.
This commit is contained in:
rinpatch 2020-09-03 23:15:22 +03:00
parent 9433311923
commit d34fe2840d
6 changed files with 48 additions and 146 deletions

View File

@ -19,7 +19,6 @@ defmodule Pleroma.HTTP.AdapterHelper do
| {Connection.proxy_type(), Connection.host(), pos_integer()} | {Connection.proxy_type(), Connection.host(), pos_integer()}
@callback options(keyword(), URI.t()) :: keyword() @callback options(keyword(), URI.t()) :: keyword()
@callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
@spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
def format_proxy(nil), do: nil def format_proxy(nil), do: nil
@ -47,9 +46,6 @@ def options(%URI{} = uri, opts \\ []) do
|> adapter_helper().options(uri) |> adapter_helper().options(uri)
end end
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
defp adapter, do: Application.get_env(:tesla, :adapter) defp adapter, do: Application.get_env(:tesla, :adapter)
defp adapter_helper do defp adapter_helper do

View File

@ -6,7 +6,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper @behaviour Pleroma.HTTP.AdapterHelper
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Gun.ConnectionPool
alias Pleroma.HTTP.AdapterHelper alias Pleroma.HTTP.AdapterHelper
require Logger require Logger
@ -57,14 +56,6 @@ def pool_timeout(pool) do
Config.get([:pools, pool, :timeout], default) Config.get([:pools, pool, :timeout], default)
end end
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
def get_conn(uri, opts) do
case ConnectionPool.get_conn(uri, opts) do
{:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
err -> err
end
end
@prefix Pleroma.Gun.ConnectionPool @prefix Pleroma.Gun.ConnectionPool
def limiter_setup do def limiter_setup do
wait = Config.get([:connections_pool, :connection_acquisition_wait]) wait = Config.get([:connections_pool, :connection_acquisition_wait])

View File

@ -23,7 +23,4 @@ def options(connection_opts \\ [], %URI{} = uri) do
end end
defp add_scheme_opts(opts, _), do: opts defp add_scheme_opts(opts, _), do: opts
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
def get_conn(_uri, opts), do: {:ok, opts}
end end

View File

@ -62,8 +62,6 @@ def request(method, url, body, headers, options) when is_binary(url) do
uri = URI.parse(url) uri = URI.parse(url)
adapter_opts = AdapterHelper.options(uri, options[:adapter] || []) adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
case AdapterHelper.get_conn(uri, adapter_opts) do
{:ok, adapter_opts} ->
options = put_in(options[:adapter], adapter_opts) options = put_in(options[:adapter], adapter_opts)
params = options[:params] || [] params = options[:params] || []
request = build_request(method, headers, options, url, body, params) request = build_request(method, headers, options, url, body, params)
@ -79,11 +77,6 @@ def request(method, url, body, headers, options) when is_binary(url) do
adapter, adapter,
adapter_opts adapter_opts
) )
# Connection release is handled in a custom FollowRedirects middleware
err ->
err
end
end end
@spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()} @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
@ -110,7 +103,7 @@ defp maybe_limit(fun, _, _) do
end end
defp adapter_middlewares(Tesla.Adapter.Gun) do defp adapter_middlewares(Tesla.Adapter.Gun) do
[Pleroma.HTTP.Middleware.FollowRedirects] [Tesla.Middleware.FollowRedirects, Pleroma.Tesla.Middleware.ConnectionPool]
end end
defp adapter_middlewares(_), do: [] defp adapter_middlewares(_), do: []

View File

@ -0,0 +1,35 @@
# Pleroma: A lightweight social networking server
# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Tesla.Middleware.ConnectionPool do
@moduledoc """
Middleware to get/release connections from `Pleroma.Gun.ConnectionPool`
"""
@behaviour Tesla.Middleware
alias Pleroma.Gun.ConnectionPool
@impl Tesla.Middleware
def call(%Tesla.Env{url: url, opts: opts} = env, next, _) do
uri = URI.parse(url)
case ConnectionPool.get_conn(uri, opts[:adapter]) do
{:ok, conn_pid} ->
adapter_opts = Keyword.merge(opts[:adapter], conn: conn_pid, close_conn: false)
opts = Keyword.put(opts, :adapter, adapter_opts)
env = %{env | opts: opts}
res = Tesla.run(env, next)
unless opts[:adapter][:body_as] == :chunks do
ConnectionPool.release_conn(conn_pid)
end
res
err ->
err
end
end
end

View File

@ -1,110 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2015-2020 Tymon Tobolski <https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex>
# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.Middleware.FollowRedirects do
@moduledoc """
Pool-aware version of https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex
Follow 3xx redirects
## Options
- `:max_redirects` - limit number of redirects (default: `5`)
"""
alias Pleroma.Gun.ConnectionPool
@behaviour Tesla.Middleware
@max_redirects 5
@redirect_statuses [301, 302, 303, 307, 308]
@impl Tesla.Middleware
def call(env, next, opts \\ []) do
max = Keyword.get(opts, :max_redirects, @max_redirects)
redirect(env, next, max)
end
defp redirect(env, next, left) do
opts = env.opts[:adapter]
case Tesla.run(env, next) do
{:ok, %{status: status} = res} when status in @redirect_statuses and left > 0 ->
release_conn(opts)
case Tesla.get_header(res, "location") do
nil ->
{:ok, res}
location ->
location = parse_location(location, res)
case get_conn(location, opts) do
{:ok, opts} ->
%{env | opts: Keyword.put(env.opts, :adapter, opts)}
|> new_request(res.status, location)
|> redirect(next, left - 1)
e ->
e
end
end
{:ok, %{status: status}} when status in @redirect_statuses ->
release_conn(opts)
{:error, {__MODULE__, :too_many_redirects}}
{:error, _} = e ->
release_conn(opts)
e
other ->
unless opts[:body_as] == :chunks do
release_conn(opts)
end
other
end
end
defp get_conn(location, opts) do
uri = URI.parse(location)
case ConnectionPool.get_conn(uri, opts) do
{:ok, conn} ->
{:ok, Keyword.merge(opts, conn: conn)}
e ->
e
end
end
defp release_conn(opts) do
ConnectionPool.release_conn(opts[:conn])
end
# The 303 (See Other) redirect was added in HTTP/1.1 to indicate that the originally
# requested resource is not available, however a related resource (or another redirect)
# available via GET is available at the specified location.
# https://tools.ietf.org/html/rfc7231#section-6.4.4
defp new_request(env, 303, location), do: %{env | url: location, method: :get, query: []}
# The 307 (Temporary Redirect) status code indicates that the target
# resource resides temporarily under a different URI and the user agent
# MUST NOT change the request method (...)
# https://tools.ietf.org/html/rfc7231#section-6.4.7
defp new_request(env, 307, location), do: %{env | url: location}
defp new_request(env, _, location), do: %{env | url: location, query: []}
defp parse_location("https://" <> _rest = location, _env), do: location
defp parse_location("http://" <> _rest = location, _env), do: location
defp parse_location(location, env) do
env.url
|> URI.parse()
|> URI.merge(location)
|> URI.to_string()
end
end