[#582] Optimized federation retirement by reducing the number of SQL calls
(calling `Instances.set_reachable/1` only if instance had `unreachable_since`, calling `Instances.set_unreachable/1` only if instance had nil `unreachable_since`).
This commit is contained in:
parent
d5d91ae689
commit
b40b4bc4e5
|
@ -26,7 +26,7 @@ def changeset(struct, params \\ %{}) do
|
||||||
|> unique_constraint(:host)
|
|> unique_constraint(:host)
|
||||||
end
|
end
|
||||||
|
|
||||||
def filter_reachable([]), do: []
|
def filter_reachable([]), do: %{}
|
||||||
|
|
||||||
def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
|
def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
|
||||||
hosts =
|
hosts =
|
||||||
|
@ -34,17 +34,28 @@ def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
|
||||||
|> Enum.map(&(&1 && host(&1)))
|
|> Enum.map(&(&1 && host(&1)))
|
||||||
|> Enum.filter(&(to_string(&1) != ""))
|
|> Enum.filter(&(to_string(&1) != ""))
|
||||||
|
|
||||||
unreachable_hosts =
|
unreachable_since_by_host =
|
||||||
Repo.all(
|
Repo.all(
|
||||||
from(i in Instance,
|
from(i in Instance,
|
||||||
where:
|
where: i.host in ^hosts,
|
||||||
i.host in ^hosts and
|
select: {i.host, i.unreachable_since}
|
||||||
i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
|
|
||||||
select: i.host
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|> Map.new(& &1)
|
||||||
|
|
||||||
Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts))
|
reachability_datetime_threshold = Instances.reachability_datetime_threshold()
|
||||||
|
|
||||||
|
for entry <- Enum.filter(urls_or_hosts, &is_binary/1) do
|
||||||
|
host = host(entry)
|
||||||
|
unreachable_since = unreachable_since_by_host[host]
|
||||||
|
|
||||||
|
if !unreachable_since ||
|
||||||
|
NaiveDateTime.compare(unreachable_since, reachability_datetime_threshold) == :gt do
|
||||||
|
{entry, unreachable_since}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|> Enum.filter(& &1)
|
||||||
|
|> Map.new(& &1)
|
||||||
end
|
end
|
||||||
|
|
||||||
def reachable?(url_or_host) when is_binary(url_or_host) do
|
def reachable?(url_or_host) when is_binary(url_or_host) do
|
||||||
|
|
|
@ -744,7 +744,7 @@ def publish(actor, activity) do
|
||||||
|
|
||||||
public = is_public?(activity)
|
public = is_public?(activity)
|
||||||
|
|
||||||
remote_inboxes =
|
reachable_inboxes_metadata =
|
||||||
(Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
|
(Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
|
||||||
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|
||||||
|> Enum.map(fn %{info: %{source_data: data}} ->
|
|> Enum.map(fn %{info: %{source_data: data}} ->
|
||||||
|
@ -757,17 +757,18 @@ def publish(actor, activity) do
|
||||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||||
json = Jason.encode!(data)
|
json = Jason.encode!(data)
|
||||||
|
|
||||||
Enum.each(remote_inboxes, fn inbox ->
|
Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
|
||||||
Federator.enqueue(:publish_single_ap, %{
|
Federator.enqueue(:publish_single_ap, %{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
json: json,
|
json: json,
|
||||||
actor: actor,
|
actor: actor,
|
||||||
id: activity.data["id"]
|
id: activity.data["id"],
|
||||||
|
unreachable_since: unreachable_since
|
||||||
})
|
})
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
|
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
|
||||||
Logger.info("Federating #{id} to #{inbox}")
|
Logger.info("Federating #{id} to #{inbox}")
|
||||||
host = URI.parse(inbox).host
|
host = URI.parse(inbox).host
|
||||||
|
|
||||||
|
@ -791,11 +792,11 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
|
||||||
{"digest", digest}
|
{"digest", digest}
|
||||||
]
|
]
|
||||||
) do
|
) do
|
||||||
Instances.set_reachable(inbox)
|
if params[:unreachable_since], do: Instances.set_reachable(inbox)
|
||||||
result
|
result
|
||||||
else
|
else
|
||||||
{_post_result, response} ->
|
{_post_result, response} ->
|
||||||
Instances.set_unreachable(inbox)
|
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
|
||||||
{:error, response}
|
{:error, response}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -124,8 +124,8 @@ def handle(:incoming_ap_doc, params) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:publish_single_salmon, {user_or_url, feed, poster}) do
|
def handle(:publish_single_salmon, params) do
|
||||||
Salmon.send_to_user(user_or_url, feed, poster)
|
Salmon.send_to_user(params)
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle(:publish_single_ap, params) do
|
def handle(:publish_single_ap, params) do
|
||||||
|
|
|
@ -162,30 +162,29 @@ def remote_users(%{data: %{"to" => to} = data}) do
|
||||||
|> Enum.filter(fn user -> user && !user.local end)
|
|> Enum.filter(fn user -> user && !user.local end)
|
||||||
end
|
end
|
||||||
|
|
||||||
# push an activity to remote accounts
|
@doc "Pushes an activity to remote account."
|
||||||
#
|
def send_to_user(%{recipient: %{info: %{salmon: salmon}}} = params),
|
||||||
def send_to_user(%{info: %{salmon: salmon}}, feed, poster),
|
do: send_to_user(Map.put(params, :recipient, salmon))
|
||||||
do: send_to_user(salmon, feed, poster)
|
|
||||||
|
|
||||||
def send_to_user(url, feed, poster) when is_binary(url) do
|
def send_to_user(%{recipient: url, feed: feed, poster: poster} = params) when is_binary(url) do
|
||||||
with {:ok, %{status: code}} when code in 200..299 <-
|
with {:ok, %{status: code}} when code in 200..299 <-
|
||||||
poster.(
|
poster.(
|
||||||
url,
|
url,
|
||||||
feed,
|
feed,
|
||||||
[{"Content-Type", "application/magic-envelope+xml"}]
|
[{"Content-Type", "application/magic-envelope+xml"}]
|
||||||
) do
|
) do
|
||||||
Instances.set_reachable(url)
|
if params[:unreachable_since], do: Instances.set_reachable(url)
|
||||||
Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
|
Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
|
||||||
:ok
|
:ok
|
||||||
else
|
else
|
||||||
e ->
|
e ->
|
||||||
Instances.set_unreachable(url)
|
unless params[:unreachable_since], do: Instances.set_reachable(url)
|
||||||
Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
|
Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
|
||||||
:error
|
:error
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def send_to_user(_, _, _), do: :noop
|
def send_to_user(_), do: :noop
|
||||||
|
|
||||||
@supported_activities [
|
@supported_activities [
|
||||||
"Create",
|
"Create",
|
||||||
|
@ -218,13 +217,20 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity
|
||||||
remote_users = remote_users(activity)
|
remote_users = remote_users(activity)
|
||||||
|
|
||||||
salmon_urls = Enum.map(remote_users, & &1.info.salmon)
|
salmon_urls = Enum.map(remote_users, & &1.info.salmon)
|
||||||
reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
|
reachable_urls_metadata = Instances.filter_reachable(salmon_urls)
|
||||||
|
reachable_urls = Map.keys(reachable_urls_metadata)
|
||||||
|
|
||||||
remote_users
|
remote_users
|
||||||
|> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
|
|> Enum.filter(&(&1.info.salmon in reachable_urls))
|
||||||
|> Enum.each(fn remote_user ->
|
|> Enum.each(fn remote_user ->
|
||||||
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
|
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
|
||||||
Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster})
|
|
||||||
|
Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{
|
||||||
|
recipient: remote_user,
|
||||||
|
feed: feed,
|
||||||
|
poster: poster,
|
||||||
|
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
|
||||||
|
})
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -70,7 +70,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity)
|
||||||
subscriptions = Repo.all(query)
|
subscriptions = Repo.all(query)
|
||||||
|
|
||||||
callbacks = Enum.map(subscriptions, & &1.callback)
|
callbacks = Enum.map(subscriptions, & &1.callback)
|
||||||
reachable_callbacks = Instances.filter_reachable(callbacks)
|
reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
|
||||||
|
reachable_callbacks = Map.keys(reachable_callbacks_metadata)
|
||||||
|
|
||||||
subscriptions
|
subscriptions
|
||||||
|> Enum.filter(&(&1.callback in reachable_callbacks))
|
|> Enum.filter(&(&1.callback in reachable_callbacks))
|
||||||
|
@ -79,7 +80,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity)
|
||||||
xml: response,
|
xml: response,
|
||||||
topic: topic,
|
topic: topic,
|
||||||
callback: sub.callback,
|
callback: sub.callback,
|
||||||
secret: sub.secret
|
secret: sub.secret,
|
||||||
|
unreachable_since: reachable_callbacks_metadata[sub.callback]
|
||||||
}
|
}
|
||||||
|
|
||||||
Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
|
Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
|
||||||
|
@ -268,7 +270,7 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do
|
def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
|
||||||
signature = sign(secret || "", xml)
|
signature = sign(secret || "", xml)
|
||||||
Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
|
Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
|
||||||
|
|
||||||
|
@ -281,12 +283,12 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) d
|
||||||
{"X-Hub-Signature", "sha1=#{signature}"}
|
{"X-Hub-Signature", "sha1=#{signature}"}
|
||||||
]
|
]
|
||||||
) do
|
) do
|
||||||
Instances.set_reachable(callback)
|
if params[:unreachable_since], do: Instances.set_reachable(callback)
|
||||||
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
|
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
|
||||||
{:ok, code}
|
{:ok, code}
|
||||||
else
|
else
|
||||||
{_post_result, response} ->
|
{_post_result, response} ->
|
||||||
Instances.set_unreachable(callback)
|
unless params[:unreachable_since], do: Instances.set_reachable(callback)
|
||||||
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
|
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
|
||||||
{:error, response}
|
{:error, response}
|
||||||
end
|
end
|
||||||
|
|
|
@ -163,8 +163,8 @@ test "with relays deactivated, it does not publish to the relay", %{
|
||||||
{:ok, _activity} =
|
{:ok, _activity} =
|
||||||
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
|
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
|
||||||
|
|
||||||
assert called(Federator.enqueue(:publish_single_salmon, {remote_user2, :_, :_}))
|
assert called(Federator.enqueue(:publish_single_salmon, %{recipient: remote_user2}))
|
||||||
refute called(Federator.enqueue(:publish_single_websub, {remote_user1, :_, :_}))
|
refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1}))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ test "returns true on non-binary input" do
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "filter_reachable/1" do
|
describe "filter_reachable/1" do
|
||||||
test "keeps only reachable elements of supplied list" do
|
setup do
|
||||||
host = "consistently-unreachable.name"
|
host = "consistently-unreachable.name"
|
||||||
url1 = "http://eventually-unreachable.com/path"
|
url1 = "http://eventually-unreachable.com/path"
|
||||||
url2 = "http://domain.com/path"
|
url2 = "http://domain.com/path"
|
||||||
|
@ -55,7 +55,26 @@ test "keeps only reachable elements of supplied list" do
|
||||||
Instances.set_consistently_unreachable(host)
|
Instances.set_consistently_unreachable(host)
|
||||||
Instances.set_unreachable(url1)
|
Instances.set_unreachable(url1)
|
||||||
|
|
||||||
assert [url1, url2] == Instances.filter_reachable([host, url1, url2])
|
result = Instances.filter_reachable([host, url1, url2, nil])
|
||||||
|
%{result: result, url1: url1, url2: url2}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "returns a map with keys containing 'not marked consistently unreachable' elements of supplied list",
|
||||||
|
%{result: result, url1: url1, url2: url2} do
|
||||||
|
assert is_map(result)
|
||||||
|
assert Enum.sort([url1, url2]) == result |> Map.keys() |> Enum.sort()
|
||||||
|
end
|
||||||
|
|
||||||
|
test "returns a map with `unreachable_since` values for keys",
|
||||||
|
%{result: result, url1: url1, url2: url2} do
|
||||||
|
assert is_map(result)
|
||||||
|
assert %NaiveDateTime{} = result[url1]
|
||||||
|
assert is_nil(result[url2])
|
||||||
|
end
|
||||||
|
|
||||||
|
test "returns an empty map for empty list or list containing no hosts / url" do
|
||||||
|
assert %{} == Instances.filter_reachable([])
|
||||||
|
assert %{} == Instances.filter_reachable([nil])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue