diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 884d399d0..0c5709356 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -140,11 +140,15 @@ def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do |> Enum.each(fn [user, other_user] -> if user.local do {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id) - ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id) + {:ok, cm_ref} = ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id) + + Streamer.stream( + ["user", "user:pleroma_chat"], + {user, %{cm_ref | chat: chat, object: object}} + ) end end) - Streamer.stream(["user", "user:pleroma_chat"], object) {:ok, object, meta} end end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 2201cbfef..5e37e2cf2 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -6,11 +6,11 @@ defmodule Pleroma.Web.Streamer do require Logger alias Pleroma.Activity + alias Pleroma.ChatMessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation alias Pleroma.Notification alias Pleroma.Object - alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Visibility @@ -201,22 +201,15 @@ defp do_stream(topic, %Notification{} = item) end) end - defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object) + defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref}) when topic in ["user", "user:pleroma_chat"] do - recipients = [object.data["actor"] | object.data["to"]] + topic = "#{topic}:#{user.id}" - topics = - %{ap_id: recipients, local: true} - |> Pleroma.User.Query.build() - |> Repo.all() - |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end) + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) - Enum.each(topics, fn {user, topic} -> - Registry.dispatch(@registry, topic, fn list -> - Enum.each(list, fn {pid, _auth} -> - text = StreamerView.render("chat_update.json", object, user, recipients) - send(pid, {:text, text}) - end) + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) end) end) end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 616e0c4f2..a6efd0109 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -6,36 +6,11 @@ defmodule Pleroma.Web.StreamerView do use Pleroma.Web, :view alias Pleroma.Activity - alias Pleroma.Chat - alias Pleroma.ChatMessageReference alias Pleroma.Conversation.Participation alias Pleroma.Notification alias Pleroma.User alias Pleroma.Web.MastodonAPI.NotificationView - def render("chat_update.json", object, user, recipients) do - chat = Chat.get(user.id, hd(recipients -- [user.ap_id])) - - # Explicitly giving the cmr for the object here, so we don't accidentally - # send a later 'last_message' that was inserted between inserting this and - # streaming it out - cm_ref = ChatMessageReference.for_chat_and_object(chat, object) - - representation = - Pleroma.Web.PleromaAPI.ChatView.render( - "show.json", - %{last_message: cm_ref, chat: chat} - ) - - %{ - event: "pleroma:chat_update", - payload: - representation - |> Jason.encode!() - } - |> Jason.encode!() - end - def render("update.json", %Activity{} = activity, %User{} = user) do %{ event: "update", @@ -76,6 +51,27 @@ def render("update.json", %Activity{} = activity) do |> Jason.encode!() end + def render("chat_update.json", %{chat_message_reference: cm_ref}) do + # Explicitly giving the cmr for the object here, so we don't accidentally + # send a later 'last_message' that was inserted between inserting this and + # streaming it out + Logger.debug("Trying to stream out #{inspect(cm_ref)}") + + representation = + Pleroma.Web.PleromaAPI.ChatView.render( + "show.json", + %{last_message: cm_ref, chat: cm_ref.chat} + ) + + %{ + event: "pleroma:chat_update", + payload: + representation + |> Jason.encode!() + } + |> Jason.encode!() + end + def render("conversation.json", %Participation{} = participation) do %{ event: "conversation", diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs index f2fa062b4..92c266d84 100644 --- a/test/web/activity_pub/side_effects_test.exs +++ b/test/web/activity_pub/side_effects_test.exs @@ -325,9 +325,8 @@ test "it streams the created ChatMessage" do {:ok, _create_activity, _meta} = SideEffects.handle(create_activity, local: false, object_data: chat_message_data) - object = Object.normalize(create_activity, false) - - assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], object)) + assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_})) + assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_})) end end diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index bcb05a02d..893ae5449 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -7,6 +7,8 @@ defmodule Pleroma.Web.StreamerTest do import Pleroma.Factory + alias Pleroma.Chat + alias Pleroma.ChatMessageReference alias Pleroma.Conversation.Participation alias Pleroma.List alias Pleroma.Object @@ -150,22 +152,36 @@ test "it sends notify to in the 'user:notification' stream", %{user: user, notif test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do other_user = insert(:user) - {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") + {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") object = Object.normalize(create_activity, false) + chat = Chat.get(user.id, other_user.ap_id) + cm_ref = ChatMessageReference.for_chat_and_object(chat, object) + cm_ref = %{cm_ref | chat: chat, object: object} + Streamer.get_topic_and_add_socket("user:pleroma_chat", user) - Streamer.stream("user:pleroma_chat", object) - text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id]) + Streamer.stream("user:pleroma_chat", {user, cm_ref}) + + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + + assert text =~ "hey cirno" assert_receive {:text, ^text} end test "it sends chat messages to the 'user' stream", %{user: user} do other_user = insert(:user) - {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") + {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") object = Object.normalize(create_activity, false) + chat = Chat.get(user.id, other_user.ap_id) + cm_ref = ChatMessageReference.for_chat_and_object(chat, object) + cm_ref = %{cm_ref | chat: chat, object: object} + Streamer.get_topic_and_add_socket("user", user) - Streamer.stream("user", object) - text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id]) + Streamer.stream("user", {user, cm_ref}) + + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + + assert text =~ "hey cirno" assert_receive {:text, ^text} end