[#161] Limited replies depth on incoming federation in order to prevent memory leaks on recursive replies fetching.

This commit is contained in:
Ivan Tashkinov 2019-06-29 20:04:50 +03:00
parent 9b1e75bac8
commit 5b7b1040b3
8 changed files with 181 additions and 84 deletions

View File

@ -44,20 +44,20 @@ def get_by_ap_id(ap_id) do
Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
end
def normalize(_, fetch_remote \\ true)
def normalize(_, fetch_remote \\ true, options \\ [])
# If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
# Use this whenever possible, especially when walking graphs in an O(N) loop!
def normalize(%Object{} = object, _), do: object
def normalize(%Activity{object: %Object{} = object}, _), do: object
def normalize(%Object{} = object, _, _), do: object
def normalize(%Activity{object: %Object{} = object}, _, _), do: object
# A hack for fake activities
def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _, _) do
%Object{id: "pleroma:fake_object_id", data: data}
end
# Catch and log Object.normalize() calls where the Activity's child object is not
# preloaded.
def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, fetch_remote) do
def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, fetch_remote, _) do
Logger.debug(
"Object.normalize() called without preloaded object (#{ap_id}). Consider preloading the object!"
)
@ -67,7 +67,7 @@ def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, fetch_remote) do
normalize(ap_id, fetch_remote)
end
def normalize(%Activity{data: %{"object" => ap_id}}, fetch_remote) do
def normalize(%Activity{data: %{"object" => ap_id}}, fetch_remote, _) do
Logger.debug(
"Object.normalize() called without preloaded object (#{ap_id}). Consider preloading the object!"
)
@ -78,10 +78,14 @@ def normalize(%Activity{data: %{"object" => ap_id}}, fetch_remote) do
end
# Old way, try fetching the object through cache.
def normalize(%{"id" => ap_id}, fetch_remote), do: normalize(ap_id, fetch_remote)
def normalize(ap_id, false) when is_binary(ap_id), do: get_cached_by_ap_id(ap_id)
def normalize(ap_id, true) when is_binary(ap_id), do: Fetcher.fetch_object_from_id!(ap_id)
def normalize(_, _), do: nil
def normalize(%{"id" => ap_id}, fetch_remote, _), do: normalize(ap_id, fetch_remote)
def normalize(ap_id, false, _) when is_binary(ap_id), do: get_cached_by_ap_id(ap_id)
def normalize(ap_id, true, options) when is_binary(ap_id) do
Fetcher.fetch_object_from_id!(ap_id, options)
end
def normalize(_, _, _), do: nil
# Owned objects can only be mutated by their owner
def authorize_mutation(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}),

View File

@ -22,7 +22,7 @@ defp reinject_object(data) do
# TODO:
# This will create a Create activity, which we need internally at the moment.
def fetch_object_from_id(id) do
def fetch_object_from_id(id, options \\ []) do
if object = Object.get_cached_by_ap_id(id) do
{:ok, object}
else
@ -38,7 +38,7 @@ def fetch_object_from_id(id) do
"object" => data
},
:ok <- Containment.contain_origin(id, params),
{:ok, activity} <- Transmogrifier.handle_incoming(params),
{:ok, activity} <- Transmogrifier.handle_incoming(params, options),
{:object, _data, %Object{} = object} <-
{:object, data, Object.normalize(activity, false)} do
{:ok, object}
@ -63,8 +63,8 @@ def fetch_object_from_id(id) do
end
end
def fetch_object_from_id!(id) do
with {:ok, object} <- fetch_object_from_id(id) do
def fetch_object_from_id!(id, options \\ []) do
with {:ok, object} <- fetch_object_from_id(id, options) do
object
else
_e ->

View File

@ -14,6 +14,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
import Ecto.Query
@ -22,20 +23,20 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
"""
def fix_object(object) do
def fix_object(object, options \\ []) do
object
|> fix_actor
|> fix_url
|> fix_attachments
|> fix_context
|> fix_in_reply_to
|> fix_in_reply_to(options)
|> fix_emoji
|> fix_tag
|> fix_content_map
|> fix_likes
|> fix_addressing
|> fix_summary
|> fix_type
|> fix_type(options)
end
def fix_summary(%{"summary" => nil} = object) do
@ -164,7 +165,9 @@ def fix_likes(object) do
object
end
def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object)
def fix_in_reply_to(object, options \\ [])
def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object, options)
when not is_nil(in_reply_to) do
in_reply_to_id =
cond do
@ -182,7 +185,10 @@ def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object)
""
end
case get_obj_helper(in_reply_to_id) do
object = Map.put(object, "inReplyToAtomUri", in_reply_to_id)
if (options[:depth] || 1) <= Federator.max_replies_depth() do
case get_obj_helper(in_reply_to_id, options) do
{:ok, replied_object} ->
with %Activity{} = _activity <-
Activity.get_create_by_object_ap_id(replied_object.data["id"]) do
@ -201,9 +207,12 @@ def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object)
Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
object
end
else
object
end
end
def fix_in_reply_to(object), do: object
def fix_in_reply_to(object, _options), do: object
def fix_context(object) do
context = object["context"] || object["conversation"] || Utils.generate_context_id()
@ -336,8 +345,15 @@ def fix_content_map(%{"contentMap" => content_map} = object) do
def fix_content_map(object), do: object
def fix_type(%{"inReplyTo" => reply_id} = object) when is_binary(reply_id) do
reply = Object.normalize(reply_id)
def fix_type(object, options \\ [])
def fix_type(%{"inReplyTo" => reply_id} = object, options) when is_binary(reply_id) do
reply =
if (options[:depth] || 1) <= Federator.max_replies_depth() do
Object.normalize(reply_id, true)
else
nil
end
if reply && (reply.data["type"] == "Question" and object["name"]) do
Map.put(object, "type", "Answer")
@ -346,7 +362,7 @@ def fix_type(%{"inReplyTo" => reply_id} = object) when is_binary(reply_id) do
end
end
def fix_type(object), do: object
def fix_type(object, _), do: object
defp mastodon_follow_hack(%{"id" => id, "actor" => follower_id}, followed) do
with true <- id =~ "follows",
@ -374,9 +390,11 @@ defp get_follow_activity(follow_object, followed) do
end
end
def handle_incoming(data, options \\ [])
# Flag objects are placed ahead of the ID check because Mastodon 2.8 and earlier send them
# with nil ID.
def handle_incoming(%{"type" => "Flag", "object" => objects, "actor" => actor} = data) do
def handle_incoming(%{"type" => "Flag", "object" => objects, "actor" => actor} = data, _options) do
with context <- data["context"] || Utils.generate_context_id(),
content <- data["content"] || "",
%User{} = actor <- User.get_cached_by_ap_id(actor),
@ -409,15 +427,19 @@ def handle_incoming(%{"type" => "Flag", "object" => objects, "actor" => actor} =
end
# disallow objects with bogus IDs
def handle_incoming(%{"id" => nil}), do: :error
def handle_incoming(%{"id" => ""}), do: :error
def handle_incoming(%{"id" => nil}, _options), do: :error
def handle_incoming(%{"id" => ""}, _options), do: :error
# length of https:// = 8, should validate better, but good enough for now.
def handle_incoming(%{"id" => id}) when not (is_binary(id) and length(id) > 8), do: :error
def handle_incoming(%{"id" => id}, _options) when not (is_binary(id) and length(id) > 8),
do: :error
# TODO: validate those with a Ecto scheme
# - tags
# - emoji
def handle_incoming(%{"type" => "Create", "object" => %{"type" => objtype} = object} = data)
def handle_incoming(
%{"type" => "Create", "object" => %{"type" => objtype} = object} = data,
options
)
when objtype in ["Article", "Note", "Video", "Page", "Question", "Answer"] do
actor = Containment.get_actor(data)
@ -427,7 +449,8 @@ def handle_incoming(%{"type" => "Create", "object" => %{"type" => objtype} = obj
with nil <- Activity.get_create_by_object_ap_id(object["id"]),
{:ok, %User{} = user} <- User.get_or_fetch_by_ap_id(data["actor"]) do
object = fix_object(data["object"])
options = Keyword.put(options, :depth, (options[:depth] || 0) + 1)
object = fix_object(data["object"], options)
params = %{
to: data["to"],
@ -452,7 +475,8 @@ def handle_incoming(%{"type" => "Create", "object" => %{"type" => objtype} = obj
end
def handle_incoming(
%{"type" => "Follow", "object" => followed, "actor" => follower, "id" => id} = data
%{"type" => "Follow", "object" => followed, "actor" => follower, "id" => id} = data,
_options
) do
with %User{local: true} = followed <- User.get_cached_by_ap_id(followed),
{:ok, %User{} = follower} <- User.get_or_fetch_by_ap_id(follower),
@ -503,7 +527,8 @@ def handle_incoming(
end
def handle_incoming(
%{"type" => "Accept", "object" => follow_object, "actor" => _actor, "id" => _id} = data
%{"type" => "Accept", "object" => follow_object, "actor" => _actor, "id" => _id} = data,
_options
) do
with actor <- Containment.get_actor(data),
{:ok, %User{} = followed} <- User.get_or_fetch_by_ap_id(actor),
@ -524,7 +549,8 @@ def handle_incoming(
end
def handle_incoming(
%{"type" => "Reject", "object" => follow_object, "actor" => _actor, "id" => _id} = data
%{"type" => "Reject", "object" => follow_object, "actor" => _actor, "id" => _id} = data,
_options
) do
with actor <- Containment.get_actor(data),
{:ok, %User{} = followed} <- User.get_or_fetch_by_ap_id(actor),
@ -548,7 +574,8 @@ def handle_incoming(
end
def handle_incoming(
%{"type" => "Like", "object" => object_id, "actor" => _actor, "id" => id} = data
%{"type" => "Like", "object" => object_id, "actor" => _actor, "id" => id} = data,
_options
) do
with actor <- Containment.get_actor(data),
{:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor),
@ -561,7 +588,8 @@ def handle_incoming(
end
def handle_incoming(
%{"type" => "Announce", "object" => object_id, "actor" => _actor, "id" => id} = data
%{"type" => "Announce", "object" => object_id, "actor" => _actor, "id" => id} = data,
_options
) do
with actor <- Containment.get_actor(data),
{:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor),
@ -576,7 +604,8 @@ def handle_incoming(
def handle_incoming(
%{"type" => "Update", "object" => %{"type" => object_type} = object, "actor" => actor_id} =
data
data,
_options
)
when object_type in ["Person", "Application", "Service", "Organization"] do
with %User{ap_id: ^actor_id} = actor <- User.get_cached_by_ap_id(object["id"]) do
@ -614,7 +643,8 @@ def handle_incoming(
# an error or a tombstone. This would allow us to verify that a deletion actually took
# place.
def handle_incoming(
%{"type" => "Delete", "object" => object_id, "actor" => _actor, "id" => _id} = data
%{"type" => "Delete", "object" => object_id, "actor" => _actor, "id" => _id} = data,
_options
) do
object_id = Utils.get_ap_id(object_id)
@ -635,7 +665,8 @@ def handle_incoming(
"object" => %{"type" => "Announce", "object" => object_id},
"actor" => _actor,
"id" => id
} = data
} = data,
_options
) do
with actor <- Containment.get_actor(data),
{:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor),
@ -653,7 +684,8 @@ def handle_incoming(
"object" => %{"type" => "Follow", "object" => followed},
"actor" => follower,
"id" => id
} = _data
} = _data,
_options
) do
with %User{local: true} = followed <- User.get_cached_by_ap_id(followed),
{:ok, %User{} = follower} <- User.get_or_fetch_by_ap_id(follower),
@ -671,7 +703,8 @@ def handle_incoming(
"object" => %{"type" => "Block", "object" => blocked},
"actor" => blocker,
"id" => id
} = _data
} = _data,
_options
) do
with true <- Pleroma.Config.get([:activitypub, :accept_blocks]),
%User{local: true} = blocked <- User.get_cached_by_ap_id(blocked),
@ -685,7 +718,8 @@ def handle_incoming(
end
def handle_incoming(
%{"type" => "Block", "object" => blocked, "actor" => blocker, "id" => id} = _data
%{"type" => "Block", "object" => blocked, "actor" => blocker, "id" => id} = _data,
_options
) do
with true <- Pleroma.Config.get([:activitypub, :accept_blocks]),
%User{local: true} = blocked = User.get_cached_by_ap_id(blocked),
@ -705,7 +739,8 @@ def handle_incoming(
"object" => %{"type" => "Like", "object" => object_id},
"actor" => _actor,
"id" => id
} = data
} = data,
_options
) do
with actor <- Containment.get_actor(data),
{:ok, %User{} = actor} <- User.get_or_fetch_by_ap_id(actor),
@ -717,10 +752,10 @@ def handle_incoming(
end
end
def handle_incoming(_), do: :error
def handle_incoming(_, _), do: :error
def get_obj_helper(id) do
if object = Object.normalize(id), do: {:ok, object}, else: nil
def get_obj_helper(id, options \\ []) do
if object = Object.normalize(id, true, options), do: {:ok, object}, else: nil
end
def set_reply_to_uri(%{"inReplyTo" => in_reply_to} = object) when is_binary(in_reply_to) do

View File

@ -22,6 +22,12 @@ def init do
refresh_subscriptions()
end
@max_replies_depth 100
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
# credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
def max_replies_depth, do: @max_replies_depth
# Client API
def incoming_doc(doc) do

View File

@ -10,6 +10,7 @@ defmodule Pleroma.Web.OStatus.NoteHandler do
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator
alias Pleroma.Web.OStatus
alias Pleroma.Web.XML
@ -88,14 +89,15 @@ def add_external_url(note, entry) do
Map.put(note, "external_url", url)
end
def fetch_replied_to_activity(entry, in_reply_to) do
def fetch_replied_to_activity(entry, in_reply_to, options \\ []) do
with %Activity{} = activity <- Activity.get_create_by_object_ap_id(in_reply_to) do
activity
else
_e ->
with in_reply_to_href when not is_nil(in_reply_to_href) <-
with true <- (options[:depth] || 1) <= Federator.max_replies_depth(),
in_reply_to_href when not is_nil(in_reply_to_href) <-
XML.string_from_xpath("//thr:in-reply-to[1]/@href", entry),
{:ok, [activity | _]} <- OStatus.fetch_activity_from_url(in_reply_to_href) do
{:ok, [activity | _]} <- OStatus.fetch_activity_from_url(in_reply_to_href, options) do
activity
else
_e -> nil
@ -104,7 +106,7 @@ def fetch_replied_to_activity(entry, in_reply_to) do
end
# TODO: Clean this up a bit.
def handle_note(entry, doc \\ nil) do
def handle_note(entry, doc \\ nil, options \\ []) do
with id <- XML.string_from_xpath("//id", entry),
activity when is_nil(activity) <- Activity.get_create_by_object_ap_id_with_object(id),
[author] <- :xmerl_xpath.string('//author[1]', doc),
@ -112,7 +114,8 @@ def handle_note(entry, doc \\ nil) do
content_html <- OStatus.get_content(entry),
cw <- OStatus.get_cw(entry),
in_reply_to <- XML.string_from_xpath("//thr:in-reply-to[1]/@ref", entry),
in_reply_to_activity <- fetch_replied_to_activity(entry, in_reply_to),
options <- Keyword.put(options, :depth, (options[:depth] || 0) + 1),
in_reply_to_activity <- fetch_replied_to_activity(entry, in_reply_to, options),
in_reply_to_object <-
(in_reply_to_activity && Object.normalize(in_reply_to_activity)) || nil,
in_reply_to <- (in_reply_to_object && in_reply_to_object.data["id"]) || in_reply_to,

View File

@ -54,7 +54,7 @@ def remote_follow_path do
"#{Web.base_url()}/ostatus_subscribe?acct={uri}"
end
def handle_incoming(xml_string) do
def handle_incoming(xml_string, options \\ []) do
with doc when doc != :error <- parse_document(xml_string) do
with {:ok, actor_user} <- find_make_or_update_user(doc),
do: Pleroma.Instances.set_reachable(actor_user.ap_id)
@ -91,10 +91,12 @@ def handle_incoming(xml_string) do
_ ->
case object_type do
'http://activitystrea.ms/schema/1.0/note' ->
with {:ok, activity} <- NoteHandler.handle_note(entry, doc), do: activity
with {:ok, activity} <- NoteHandler.handle_note(entry, doc, options),
do: activity
'http://activitystrea.ms/schema/1.0/comment' ->
with {:ok, activity} <- NoteHandler.handle_note(entry, doc), do: activity
with {:ok, activity} <- NoteHandler.handle_note(entry, doc, options),
do: activity
_ ->
Logger.error("Couldn't parse incoming document")
@ -359,7 +361,7 @@ def get_atom_url(body) do
end
end
def fetch_activity_from_atom_url(url) do
def fetch_activity_from_atom_url(url, options \\ []) do
with true <- String.starts_with?(url, "http"),
{:ok, %{body: body, status: code}} when code in 200..299 <-
HTTP.get(
@ -367,7 +369,7 @@ def fetch_activity_from_atom_url(url) do
[{:Accept, "application/atom+xml"}]
) do
Logger.debug("Got document from #{url}, handling...")
handle_incoming(body)
handle_incoming(body, options)
else
e ->
Logger.debug("Couldn't get #{url}: #{inspect(e)}")
@ -375,13 +377,13 @@ def fetch_activity_from_atom_url(url) do
end
end
def fetch_activity_from_html_url(url) do
def fetch_activity_from_html_url(url, options \\ []) do
Logger.debug("Trying to fetch #{url}")
with true <- String.starts_with?(url, "http"),
{:ok, %{body: body}} <- HTTP.get(url, []),
{:ok, atom_url} <- get_atom_url(body) do
fetch_activity_from_atom_url(atom_url)
fetch_activity_from_atom_url(atom_url, options)
else
e ->
Logger.debug("Couldn't get #{url}: #{inspect(e)}")
@ -389,11 +391,11 @@ def fetch_activity_from_html_url(url) do
end
end
def fetch_activity_from_url(url) do
with {:ok, [_ | _] = activities} <- fetch_activity_from_atom_url(url) do
def fetch_activity_from_url(url, options \\ []) do
with {:ok, [_ | _] = activities} <- fetch_activity_from_atom_url(url, options) do
{:ok, activities}
else
_e -> fetch_activity_from_html_url(url)
_e -> fetch_activity_from_html_url(url, options)
end
rescue
e ->

View File

@ -11,12 +11,13 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub.WebsubClientSubscription
import Mock
import Pleroma.Factory
import ExUnit.CaptureLog
alias Pleroma.Web.CommonAPI
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@ -46,12 +47,10 @@ test "it fetches replied-to activities if we don't have them" do
data["object"]
|> Map.put("inReplyTo", "https://shitposter.club/notice/2827873")
data =
data
|> Map.put("object", object)
data = Map.put(data, "object", object)
{:ok, returned_activity} = Transmogrifier.handle_incoming(data)
returned_object = Object.normalize(returned_activity.data["object"])
returned_object = Object.normalize(returned_activity.data["object"], false)
assert activity =
Activity.get_create_by_object_ap_id(
@ -61,6 +60,32 @@ test "it fetches replied-to activities if we don't have them" do
assert returned_object.data["inReplyToAtomUri"] == "https://shitposter.club/notice/2827873"
end
test "it does not fetch replied-to activities beyond max_replies_depth" do
data =
File.read!("test/fixtures/mastodon-post-activity.json")
|> Poison.decode!()
object =
data["object"]
|> Map.put("inReplyTo", "https://shitposter.club/notice/2827873")
data = Map.put(data, "object", object)
with_mock Pleroma.Web.Federator,
max_replies_depth: fn -> 0 end do
{:ok, returned_activity} = Transmogrifier.handle_incoming(data)
returned_object = Object.normalize(returned_activity.data["object"], false)
refute Activity.get_create_by_object_ap_id(
"tag:shitposter.club,2017-05-05:noticeId=2827873:objectType=comment"
)
assert returned_object.data["inReplyToAtomUri"] ==
"https://shitposter.club/notice/2827873"
end
end
test "it does not crash if the object in inReplyTo can't be fetched" do
data =
File.read!("test/fixtures/mastodon-post-activity.json")

View File

@ -11,8 +11,10 @@ defmodule Pleroma.Web.OStatusTest do
alias Pleroma.User
alias Pleroma.Web.OStatus
alias Pleroma.Web.XML
import Pleroma.Factory
import ExUnit.CaptureLog
import Mock
import Pleroma.Factory
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@ -266,10 +268,13 @@ test "handle incoming favorites with locally available object - GS, websub" do
assert favorited_activity.local
end
test "handle incoming replies" do
test_with_mock "handle incoming replies, fetching replied-to activities if we don't have them",
OStatus,
[:passthrough],
[] do
incoming = File.read!("test/fixtures/incoming_note_activity_answer.xml")
{:ok, [activity]} = OStatus.handle_incoming(incoming)
object = Object.normalize(activity.data["object"])
object = Object.normalize(activity.data["object"], false)
assert activity.data["type"] == "Create"
assert object.data["type"] == "Note"
@ -282,6 +287,23 @@ test "handle incoming replies" do
assert object.data["id"] == "tag:gs.example.org:4040,2017-04-25:noticeId=55:objectType=note"
assert "https://www.w3.org/ns/activitystreams#Public" in activity.data["to"]
assert called(OStatus.fetch_activity_from_url(object.data["inReplyTo"], :_))
end
test_with_mock "handle incoming replies, not fetching replied-to activities beyond max_replies_depth",
OStatus,
[:passthrough],
[] do
incoming = File.read!("test/fixtures/incoming_note_activity_answer.xml")
with_mock Pleroma.Web.Federator,
max_replies_depth: fn -> 0 end do
{:ok, [activity]} = OStatus.handle_incoming(incoming)
object = Object.normalize(activity.data["object"], false)
refute called(OStatus.fetch_activity_from_url(object.data["inReplyTo"], :_))
end
end
test "handle incoming follows" do