From b40b4bc4e5b49ac2b35746cee7b1db92428d3ee1 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 3 Feb 2019 12:41:27 +0300 Subject: [PATCH 1/2] [#582] Optimized federation retirement by reducing the number of SQL calls (calling `Instances.set_reachable/1` only if instance had `unreachable_since`, calling `Instances.set_unreachable/1` only if instance had nil `unreachable_since`). --- lib/pleroma/instances/instance.ex | 25 ++++++++++++----- lib/pleroma/web/activity_pub/activity_pub.ex | 13 ++++----- lib/pleroma/web/federator/federator.ex | 4 +-- lib/pleroma/web/salmon/salmon.ex | 28 ++++++++++++-------- lib/pleroma/web/websub/websub.ex | 12 +++++---- test/web/federator_test.exs | 4 +-- test/web/instances/instances_test.exs | 23 ++++++++++++++-- 7 files changed, 74 insertions(+), 35 deletions(-) diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index a87590d8b..4a4ca26dd 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -26,7 +26,7 @@ def changeset(struct, params \\ %{}) do |> unique_constraint(:host) end - def filter_reachable([]), do: [] + def filter_reachable([]), do: %{} def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do hosts = @@ -34,17 +34,28 @@ def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do |> Enum.map(&(&1 && host(&1))) |> Enum.filter(&(to_string(&1) != "")) - unreachable_hosts = + unreachable_since_by_host = Repo.all( from(i in Instance, - where: - i.host in ^hosts and - i.unreachable_since <= ^Instances.reachability_datetime_threshold(), - select: i.host + where: i.host in ^hosts, + select: {i.host, i.unreachable_since} ) ) + |> Map.new(& &1) - Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts)) + reachability_datetime_threshold = Instances.reachability_datetime_threshold() + + for entry <- Enum.filter(urls_or_hosts, &is_binary/1) do + host = host(entry) + unreachable_since = unreachable_since_by_host[host] + + if !unreachable_since || + NaiveDateTime.compare(unreachable_since, reachability_datetime_threshold) == :gt do + {entry, unreachable_since} + end + end + |> Enum.filter(& &1) + |> Map.new(& &1) end def reachable?(url_or_host) when is_binary(url_or_host) do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 06e8c3f1c..5f6c8e7d3 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -744,7 +744,7 @@ def publish(actor, activity) do public = is_public?(activity) - remote_inboxes = + reachable_inboxes_metadata = (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) |> Enum.filter(fn user -> User.ap_enabled?(user) end) |> Enum.map(fn %{info: %{source_data: data}} -> @@ -757,17 +757,18 @@ def publish(actor, activity) do {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - Enum.each(remote_inboxes, fn inbox -> + Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> Federator.enqueue(:publish_single_ap, %{ inbox: inbox, json: json, actor: actor, - id: activity.data["id"] + id: activity.data["id"], + unreachable_since: unreachable_since }) end) end - def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do + def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do Logger.info("Federating #{id} to #{inbox}") host = URI.parse(inbox).host @@ -791,11 +792,11 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do {"digest", digest} ] ) do - Instances.set_reachable(inbox) + if params[:unreachable_since], do: Instances.set_reachable(inbox) result else {_post_result, response} -> - Instances.set_unreachable(inbox) + unless params[:unreachable_since], do: Instances.set_unreachable(inbox) {:error, response} end end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 46f7a4973..bb7676cf0 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -124,8 +124,8 @@ def handle(:incoming_ap_doc, params) do end end - def handle(:publish_single_salmon, {user_or_url, feed, poster}) do - Salmon.send_to_user(user_or_url, feed, poster) + def handle(:publish_single_salmon, params) do + Salmon.send_to_user(params) end def handle(:publish_single_ap, params) do diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 07ca42a5f..4d519ece4 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -162,30 +162,29 @@ def remote_users(%{data: %{"to" => to} = data}) do |> Enum.filter(fn user -> user && !user.local end) end - # push an activity to remote accounts - # - def send_to_user(%{info: %{salmon: salmon}}, feed, poster), - do: send_to_user(salmon, feed, poster) + @doc "Pushes an activity to remote account." + def send_to_user(%{recipient: %{info: %{salmon: salmon}}} = params), + do: send_to_user(Map.put(params, :recipient, salmon)) - def send_to_user(url, feed, poster) when is_binary(url) do + def send_to_user(%{recipient: url, feed: feed, poster: poster} = params) when is_binary(url) do with {:ok, %{status: code}} when code in 200..299 <- poster.( url, feed, [{"Content-Type", "application/magic-envelope+xml"}] ) do - Instances.set_reachable(url) + if params[:unreachable_since], do: Instances.set_reachable(url) Logger.debug(fn -> "Pushed to #{url}, code #{code}" end) :ok else e -> - Instances.set_unreachable(url) + unless params[:unreachable_since], do: Instances.set_reachable(url) Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end) :error end end - def send_to_user(_, _, _), do: :noop + def send_to_user(_), do: :noop @supported_activities [ "Create", @@ -218,13 +217,20 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity remote_users = remote_users(activity) salmon_urls = Enum.map(remote_users, & &1.info.salmon) - reachable_salmon_urls = Instances.filter_reachable(salmon_urls) + reachable_urls_metadata = Instances.filter_reachable(salmon_urls) + reachable_urls = Map.keys(reachable_urls_metadata) remote_users - |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls)) + |> Enum.filter(&(&1.info.salmon in reachable_urls)) |> Enum.each(fn remote_user -> Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) - Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster}) + + Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ + recipient: remote_user, + feed: feed, + poster: poster, + unreachable_since: reachable_urls_metadata[remote_user.info.salmon] + }) end) end end diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index 8f7d53b03..cf51dce76 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -70,7 +70,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity) subscriptions = Repo.all(query) callbacks = Enum.map(subscriptions, & &1.callback) - reachable_callbacks = Instances.filter_reachable(callbacks) + reachable_callbacks_metadata = Instances.filter_reachable(callbacks) + reachable_callbacks = Map.keys(reachable_callbacks_metadata) subscriptions |> Enum.filter(&(&1.callback in reachable_callbacks)) @@ -79,7 +80,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity) xml: response, topic: topic, callback: sub.callback, - secret: sub.secret + secret: sub.secret, + unreachable_since: reachable_callbacks_metadata[sub.callback] } Pleroma.Web.Federator.enqueue(:publish_single_websub, data) @@ -268,7 +270,7 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do end) end - def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do + def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do signature = sign(secret || "", xml) Logger.info(fn -> "Pushing #{topic} to #{callback}" end) @@ -281,12 +283,12 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) d {"X-Hub-Signature", "sha1=#{signature}"} ] ) do - Instances.set_reachable(callback) + if params[:unreachable_since], do: Instances.set_reachable(callback) Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) {:ok, code} else {_post_result, response} -> - Instances.set_unreachable(callback) + unless params[:unreachable_since], do: Instances.set_reachable(callback) Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end) {:error, response} end diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index c6d10ef78..7bb249d74 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -163,8 +163,8 @@ test "with relays deactivated, it does not publish to the relay", %{ {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called(Federator.enqueue(:publish_single_salmon, {remote_user2, :_, :_})) - refute called(Federator.enqueue(:publish_single_websub, {remote_user1, :_, :_})) + assert called(Federator.enqueue(:publish_single_salmon, %{recipient: remote_user2})) + refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1})) end end diff --git a/test/web/instances/instances_test.exs b/test/web/instances/instances_test.exs index adb8560a7..2530c09fe 100644 --- a/test/web/instances/instances_test.exs +++ b/test/web/instances/instances_test.exs @@ -47,7 +47,7 @@ test "returns true on non-binary input" do end describe "filter_reachable/1" do - test "keeps only reachable elements of supplied list" do + setup do host = "consistently-unreachable.name" url1 = "http://eventually-unreachable.com/path" url2 = "http://domain.com/path" @@ -55,7 +55,26 @@ test "keeps only reachable elements of supplied list" do Instances.set_consistently_unreachable(host) Instances.set_unreachable(url1) - assert [url1, url2] == Instances.filter_reachable([host, url1, url2]) + result = Instances.filter_reachable([host, url1, url2, nil]) + %{result: result, url1: url1, url2: url2} + end + + test "returns a map with keys containing 'not marked consistently unreachable' elements of supplied list", + %{result: result, url1: url1, url2: url2} do + assert is_map(result) + assert Enum.sort([url1, url2]) == result |> Map.keys() |> Enum.sort() + end + + test "returns a map with `unreachable_since` values for keys", + %{result: result, url1: url1, url2: url2} do + assert is_map(result) + assert %NaiveDateTime{} = result[url1] + assert is_nil(result[url2]) + end + + test "returns an empty map for empty list or list containing no hosts / url" do + assert %{} == Instances.filter_reachable([]) + assert %{} == Instances.filter_reachable([nil]) end end From 3913b0196e47c829df90aa835ade2efdb7c43850 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 3 Feb 2019 13:28:13 +0300 Subject: [PATCH 2/2] [#582] Made single-pub task call Instance.set_reachable/1 if `set_reachable` is not specified. Added tests. --- lib/pleroma/web/activity_pub/activity_pub.ex | 4 +- lib/pleroma/web/salmon/salmon.ex | 4 +- lib/pleroma/web/websub/websub.ex | 4 +- test/web/activity_pub/activity_pub_test.exs | 73 +++++++++++++++++++- test/web/federator_test.exs | 35 ++++++++-- 5 files changed, 108 insertions(+), 12 deletions(-) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 5f6c8e7d3..4635e7fcd 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -792,7 +792,9 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do {"digest", digest} ] ) do - if params[:unreachable_since], do: Instances.set_reachable(inbox) + if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], + do: Instances.set_reachable(inbox) + result else {_post_result, response} -> diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 4d519ece4..b1c2dc7fa 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -173,7 +173,9 @@ def send_to_user(%{recipient: url, feed: feed, poster: poster} = params) when is feed, [{"Content-Type", "application/magic-envelope+xml"}] ) do - if params[:unreachable_since], do: Instances.set_reachable(url) + if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], + do: Instances.set_reachable(url) + Logger.debug(fn -> "Pushed to #{url}, code #{code}" end) :ok else diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index cf51dce76..90ba79962 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -283,7 +283,9 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = {"X-Hub-Signature", "sha1=#{signature}"} ] ) do - if params[:unreachable_since], do: Instances.set_reachable(callback) + if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], + do: Instances.set_reachable(callback) + Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) {:ok, code} else diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs index 2ada4f2e5..a55961ac4 100644 --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@ -698,7 +698,57 @@ test "returned pinned statuses" do end describe "publish_one/1" do - test_with_mock "it calls `Instances.set_unreachable` on target inbox on non-2xx HTTP response code", + test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is not specified", + Instances, + [:passthrough], + [] do + actor = insert(:user) + inbox = "http://200.site/users/nick1/inbox" + + assert {:ok, _} = ActivityPub.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) + + assert called(Instances.set_reachable(inbox)) + end + + test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is set", + Instances, + [:passthrough], + [] do + actor = insert(:user) + inbox = "http://200.site/users/nick1/inbox" + + assert {:ok, _} = + ActivityPub.publish_one(%{ + inbox: inbox, + json: "{}", + actor: actor, + id: 1, + unreachable_since: NaiveDateTime.utc_now() + }) + + assert called(Instances.set_reachable(inbox)) + end + + test_with_mock "does NOT call `Instances.set_reachable` on successful federation if `unreachable_since` is nil", + Instances, + [:passthrough], + [] do + actor = insert(:user) + inbox = "http://200.site/users/nick1/inbox" + + assert {:ok, _} = + ActivityPub.publish_one(%{ + inbox: inbox, + json: "{}", + actor: actor, + id: 1, + unreachable_since: nil + }) + + refute called(Instances.set_reachable(inbox)) + end + + test_with_mock "calls `Instances.set_unreachable` on target inbox on non-2xx HTTP response code", Instances, [:passthrough], [] do @@ -724,7 +774,7 @@ test "returned pinned statuses" do assert called(Instances.set_unreachable(inbox)) end - test_with_mock "it does NOT call `Instances.set_unreachable` if target is reachable", + test_with_mock "does NOT call `Instances.set_unreachable` if target is reachable", Instances, [:passthrough], [] do @@ -735,6 +785,25 @@ test "returned pinned statuses" do refute called(Instances.set_unreachable(inbox)) end + + test_with_mock "does NOT call `Instances.set_unreachable` if target instance has non-nil `unreachable_since`", + Instances, + [:passthrough], + [] do + actor = insert(:user) + inbox = "http://connrefused.site/users/nick1/inbox" + + assert {:error, _} = + ActivityPub.publish_one(%{ + inbox: inbox, + json: "{}", + actor: actor, + id: 1, + unreachable_since: NaiveDateTime.utc_now() + }) + + refute called(Instances.set_unreachable(inbox)) + end end def data_uri do diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 7bb249d74..05f813291 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -95,15 +95,18 @@ test "with relays deactivated, it does not publish to the relay", %{ info: %{ap_enabled: true, source_data: %{"inbox" => inbox2}} }) - Instances.set_unreachable( - URI.parse(inbox2).host, - Instances.reachability_datetime_threshold() - ) + dt = NaiveDateTime.utc_now() + Instances.set_unreachable(inbox1, dt) + + Instances.set_consistently_unreachable(URI.parse(inbox2).host) {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called(Federator.enqueue(:publish_single_ap, %{inbox: inbox1})) + assert called( + Federator.enqueue(:publish_single_ap, %{inbox: inbox1, unreachable_since: dt}) + ) + refute called(Federator.enqueue(:publish_single_ap, %{inbox: inbox2})) end @@ -128,11 +131,20 @@ test "with relays deactivated, it does not publish to the relay", %{ callback: "https://pleroma2.soykaf.com/cb" }) + dt = NaiveDateTime.utc_now() + Instances.set_unreachable(sub2.callback, dt) + Instances.set_consistently_unreachable(sub1.callback) {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) - assert called(Federator.enqueue(:publish_single_websub, %{callback: sub2.callback})) + assert called( + Federator.enqueue(:publish_single_websub, %{ + callback: sub2.callback, + unreachable_since: dt + }) + ) + refute called(Federator.enqueue(:publish_single_websub, %{callback: sub1.callback})) end @@ -158,12 +170,21 @@ test "with relays deactivated, it does not publish to the relay", %{ info: %{salmon: "https://domain2.com/salmon"} }) + dt = NaiveDateTime.utc_now() + Instances.set_unreachable(remote_user2.ap_id, dt) + Instances.set_consistently_unreachable("domain.com") {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called(Federator.enqueue(:publish_single_salmon, %{recipient: remote_user2})) + assert called( + Federator.enqueue(:publish_single_salmon, %{ + recipient: remote_user2, + unreachable_since: dt + }) + ) + refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1})) end end