[#2074] OAuth scope checking in Streaming API.

This commit is contained in:
Ivan Tashkinov 2020-09-19 19:16:55 +03:00
parent 6c052bd5b6
commit 60b025b782
7 changed files with 332 additions and 183 deletions

View File

@ -53,7 +53,7 @@ def drop_auth_info(conn) do
|> assign(:token, nil) |> assign(:token, nil)
end end
@doc "Filters descendants of supported scopes" @doc "Keeps those of `scopes` which are descendants of `supported_scopes`"
def filter_descendants(scopes, supported_scopes) do def filter_descendants(scopes, supported_scopes) do
Enum.filter( Enum.filter(
scopes, scopes,

View File

@ -23,8 +23,8 @@ def init(%{qs: qs} = req, state) do
with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
access_token <- Map.get(params, "access_token"), access_token <- Map.get(params, "access_token"),
{:ok, user} <- authenticate_request(access_token, sec_websocket), {:ok, user, oauth_token} <- authenticate_request(access_token, sec_websocket),
{:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do {:ok, topic} <- Streamer.get_topic(params["stream"], user, oauth_token, params) do
req = req =
if sec_websocket do if sec_websocket do
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
@ -117,7 +117,7 @@ def terminate(reason, _req, state) do
# Public streams without authentication. # Public streams without authentication.
defp authenticate_request(nil, nil) do defp authenticate_request(nil, nil) do
{:ok, nil} {:ok, nil, nil}
end end
# Authenticated streams. # Authenticated streams.
@ -125,9 +125,9 @@ defp authenticate_request(access_token, sec_websocket) do
token = access_token || sec_websocket token = access_token || sec_websocket
with true <- is_bitstring(token), with true <- is_bitstring(token),
%Token{user_id: user_id} <- Repo.get_by(Token, token: token), oauth_token = %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
user = %User{} <- User.get_cached_by_id(user_id) do user = %User{} <- User.get_cached_by_id(user_id) do
{:ok, user} {:ok, user, oauth_token}
else else
_ -> {:error, :unauthorized} _ -> {:error, :unauthorized}
end end

View File

@ -11,10 +11,12 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Conversation.Participation alias Pleroma.Conversation.Participation
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Plugs.OAuthScopesPlug
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.StreamerView alias Pleroma.Web.StreamerView
@mix_env Mix.env() @mix_env Mix.env()
@ -26,53 +28,87 @@ def registry, do: @registry
@user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@doc "Expands and authorizes a stream, and registers the process for streaming." @doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: @spec get_topic_and_add_socket(
stream :: String.t(),
User.t() | nil,
Token.t() | nil,
Map.t() | nil
) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, params \\ %{}) do def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
case get_topic(stream, user, params) do case get_topic(stream, user, oauth_token, params) do
{:ok, topic} -> add_socket(topic, user) {:ok, topic} -> add_socket(topic, user)
error -> error error -> error
end end
end end
@doc "Expand and authorizes a stream" @doc "Expand and authorizes a stream"
@spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} {:ok, topic :: String.t()} | {:error, :bad_topic}
def get_topic(stream, user, params \\ %{}) def get_topic(stream, user, oauth_token, params \\ %{})
# Allow all public steams. # Allow all public steams.
def get_topic(stream, _, _) when stream in @public_streams do def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
{:ok, stream} {:ok, stream}
end end
# Allow all hashtags streams. # Allow all hashtags streams.
def get_topic("hashtag", _, %{"tag" => tag}) do def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
{:ok, "hashtag:" <> tag} {:ok, "hashtag:" <> tag}
end end
# Expand user streams. # Expand user streams.
def get_topic(stream, %User{} = user, _) when stream in @user_streams do def get_topic(
{:ok, stream <> ":" <> to_string(user.id)} stream,
%User{id: user_id} = user,
%Token{user_id: token_user_id} = oauth_token,
_params
)
when stream in @user_streams and user_id == token_user_id do
# Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
required_scopes =
if stream == "user:notification" do
["read:notifications"]
else
["read:statuses"]
end
if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
{:error, :unauthorized}
else
{:ok, stream <> ":" <> to_string(user.id)}
end
end end
def get_topic(stream, _, _) when stream in @user_streams do def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
{:error, :unauthorized} {:error, :unauthorized}
end end
# List streams. # List streams.
def get_topic("list", %User{} = user, %{"list" => id}) do def get_topic(
if Pleroma.List.get(id, user) do "list",
{:ok, "list:" <> to_string(id)} %User{id: user_id} = user,
else %Token{user_id: token_user_id} = oauth_token,
{:error, :bad_topic} %{"list" => id}
)
when user_id == token_user_id do
cond do
OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
{:error, :unauthorized}
Pleroma.List.get(id, user) ->
{:ok, "list:" <> to_string(id)}
true ->
{:error, :bad_topic}
end end
end end
def get_topic("list", _, _) do def get_topic("list", _user, _oauth_token, _params) do
{:error, :unauthorized} {:error, :unauthorized}
end end
def get_topic(_, _, _) do def get_topic(_stream, _user, _oauth_token, _params) do
{:error, :bad_topic} {:error, :bad_topic}
end end

View File

@ -78,7 +78,7 @@ test "receives well formatted events" do
Pleroma.Repo.insert( Pleroma.Repo.insert(
OAuth.App.register_changeset(%OAuth.App{}, %{ OAuth.App.register_changeset(%OAuth.App{}, %{
client_name: "client", client_name: "client",
scopes: ["scope"], scopes: ["read"],
redirect_uris: "url" redirect_uris: "url"
}) })
) )

View File

@ -179,17 +179,19 @@ test "does not create a notification for subscribed users if status is a reply"
describe "create_notification" do describe "create_notification" do
@tag needs_streamer: true @tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
task = task =
Task.async(fn -> Task.async(fn ->
Streamer.get_topic_and_add_socket("user", user) {:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000 assert_receive {:render_with_user, _, _, _}, 4_000
end) end)
task_user_notification = task_user_notification =
Task.async(fn -> Task.async(fn ->
Streamer.get_topic_and_add_socket("user:notification", user) {:ok, _topic} =
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000 assert_receive {:render_with_user, _, _, _}, 4_000
end) end)

View File

@ -27,6 +27,21 @@ defmodule Pleroma.DataCase do
import Ecto.Query import Ecto.Query
import Pleroma.DataCase import Pleroma.DataCase
use Pleroma.Tests.Helpers use Pleroma.Tests.Helpers
# Sets up OAuth access with specified scopes
defp oauth_access(scopes, opts \\ []) do
user =
Keyword.get_lazy(opts, :user, fn ->
Pleroma.Factory.insert(:user)
end)
token =
Keyword.get_lazy(opts, :oauth_token, fn ->
Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes)
end)
%{user: user, token: token}
end
end end
end end

View File

@ -21,92 +21,148 @@ defmodule Pleroma.Web.StreamerTest do
setup do: clear_config([:instance, :skip_thread_containment]) setup do: clear_config([:instance, :skip_thread_containment])
describe "get_topic without an user" do describe "get_topic/_ (unauthenticated)" do
test "allows public" do test "allows public" do
assert {:ok, "public"} = Streamer.get_topic("public", nil) assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil) assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil) assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil) assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
end end
test "allows hashtag streams" do test "allows hashtag streams" do
assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"}) assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
end end
test "disallows user streams" do test "disallows user streams" do
assert {:error, _} = Streamer.get_topic("user", nil) assert {:error, _} = Streamer.get_topic("user", nil, nil)
assert {:error, _} = Streamer.get_topic("user:notification", nil) assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
assert {:error, _} = Streamer.get_topic("direct", nil) assert {:error, _} = Streamer.get_topic("direct", nil, nil)
end end
test "disallows list streams" do test "disallows list streams" do
assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42}) assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
end end
end end
describe "get_topic with an user" do describe "get_topic/_ (authenticated)" do
setup do setup do: oauth_access(["read"])
user = insert(:user)
{:ok, %{user: user}} test "allows public streams (regardless of OAuth token scopes)", %{
user: user,
token: read_oauth_token
} do
with oauth_token <- [nil, read_oauth_token] do
assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
assert {:ok, "public:local:media"} =
Streamer.get_topic("public:local:media", user, oauth_token)
end
end end
test "allows public streams", %{user: user} do test "allows user streams (with proper OAuth token scopes)", %{
assert {:ok, "public"} = Streamer.get_topic("public", user) user: user,
assert {:ok, "public:local"} = Streamer.get_topic("public:local", user) token: read_oauth_token
assert {:ok, "public:media"} = Streamer.get_topic("public:media", user) } do
assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user) %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
end %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
%{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
test "allows user streams", %{user: user} do
expected_user_topic = "user:#{user.id}" expected_user_topic = "user:#{user.id}"
expected_notif_topic = "user:notification:#{user.id}" expected_notification_topic = "user:notification:#{user.id}"
expected_direct_topic = "direct:#{user.id}" expected_direct_topic = "direct:#{user.id}"
assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user) expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user)
assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user) for valid_user_token <- [read_oauth_token, read_statuses_token] do
assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
assert {:ok, ^expected_direct_topic} =
Streamer.get_topic("direct", user, valid_user_token)
assert {:ok, ^expected_pleroma_chat_topic} =
Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
end
for invalid_user_token <- [read_notifications_token, badly_scoped_token],
user_topic <- ["user", "direct", "user:pleroma_chat"] do
assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
end
for valid_notification_token <- [read_oauth_token, read_notifications_token] do
assert {:ok, ^expected_notification_topic} =
Streamer.get_topic("user:notification", user, valid_notification_token)
end
for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
assert {:error, :unauthorized} =
Streamer.get_topic("user:notification", user, invalid_notification_token)
end
end end
test "allows hashtag streams", %{user: user} do test "allows hashtag streams (regardless of OAuth token scopes)", %{
assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"}) user: user,
token: read_oauth_token
} do
for oauth_token <- [nil, read_oauth_token] do
assert {:ok, "hashtag:cofe"} =
Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
end
end end
test "disallows registering to an user stream", %{user: user} do test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
another_user = insert(:user) another_user = insert(:user)
assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user) assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user)
assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user) assert {:error, _} =
Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
end end
test "allows list stream that are owned by the user", %{user: user} do test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
user: user,
token: read_oauth_token
} do
%{token: read_lists_token} = oauth_access(["read:lists"], user: user)
%{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
{:ok, list} = List.create("Test", user) {:ok, list} = List.create("Test", user)
assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id}) assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
for valid_token <- [read_oauth_token, read_lists_token] do
assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
end
assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
end end
test "disallows list stream that are not owned by the user", %{user: user} do test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
another_user = insert(:user) another_user = insert(:user)
{:ok, list} = List.create("Test", another_user) {:ok, list} = List.create("Test", another_user)
assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id}) assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
end end
end end
describe "user streams" do describe "user streams" do
setup do setup do
user = insert(:user) %{user: user, token: token} = oauth_access(["read"])
notify = insert(:notification, user: user, activity: build(:note_activity)) notify = insert(:notification, user: user, activity: build(:note_activity))
{:ok, %{user: user, notify: notify}} {:ok, %{user: user, notify: notify, token: token}}
end end
test "it streams the user's post in the 'user' stream", %{user: user} do test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(user, activity)
end end
test "it streams boosts of the user in the 'user' stream", %{user: user} do test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
@ -117,9 +173,10 @@ test "it streams boosts of the user in the 'user' stream", %{user: user} do
end end
test "it does not stream announces of the user's own posts in the 'user' stream", %{ test "it does not stream announces of the user's own posts in the 'user' stream", %{
user: user user: user,
token: oauth_token
} do } do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
@ -129,9 +186,10 @@ test "it does not stream announces of the user's own posts in the 'user' stream"
end end
test "it does stream notifications announces of the user's own posts in the 'user' stream", %{ test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
user: user user: user,
token: oauth_token
} do } do
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
@ -145,8 +203,11 @@ test "it does stream notifications announces of the user's own posts in the 'use
refute Streamer.filtered_by_user?(user, notification) refute Streamer.filtered_by_user?(user, notification)
end end
test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do test "it streams boosts of mastodon user in the 'user' stream", %{
Streamer.get_topic_and_add_socket("user", user) user: user,
token: oauth_token
} do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
@ -164,21 +225,34 @@ test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do
refute Streamer.filtered_by_user?(user, announce) refute Streamer.filtered_by_user?(user, announce)
end end
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do test "it sends notify to in the 'user' stream", %{
Streamer.get_topic_and_add_socket("user", user) user: user,
token: oauth_token,
notify: notify
} do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", notify) Streamer.stream("user", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do test "it sends notify to in the 'user:notification' stream", %{
Streamer.get_topic_and_add_socket("user:notification", user) user: user,
token: oauth_token,
notify: notify
} do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do test "it sends chat messages to the 'user:pleroma_chat' stream", %{
user: user,
token: oauth_token
} do
other_user = insert(:user) other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
@ -187,7 +261,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} d
cm_ref = MessageReference.for_chat_and_object(chat, object) cm_ref = MessageReference.for_chat_and_object(chat, object)
cm_ref = %{cm_ref | chat: chat, object: object} cm_ref = %{cm_ref | chat: chat, object: object}
Streamer.get_topic_and_add_socket("user:pleroma_chat", user) Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
Streamer.stream("user:pleroma_chat", {user, cm_ref}) Streamer.stream("user:pleroma_chat", {user, cm_ref})
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
@ -196,7 +270,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} d
assert_receive {:text, ^text} assert_receive {:text, ^text}
end end
test "it sends chat messages to the 'user' stream", %{user: user} do test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
other_user = insert(:user) other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
@ -205,7 +279,7 @@ test "it sends chat messages to the 'user' stream", %{user: user} do
cm_ref = MessageReference.for_chat_and_object(chat, object) cm_ref = MessageReference.for_chat_and_object(chat, object)
cm_ref = %{cm_ref | chat: chat, object: object} cm_ref = %{cm_ref | chat: chat, object: object}
Streamer.get_topic_and_add_socket("user", user) Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", {user, cm_ref}) Streamer.stream("user", {user, cm_ref})
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
@ -214,7 +288,10 @@ test "it sends chat messages to the 'user' stream", %{user: user} do
assert_receive {:text, ^text} assert_receive {:text, ^text}
end end
test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do test "it sends chat message notifications to the 'user:notification' stream", %{
user: user,
token: oauth_token
} do
other_user = insert(:user) other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
@ -223,19 +300,21 @@ test "it sends chat message notifications to the 'user:notification' stream", %{
Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id) Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
|> Repo.preload(:activity) |> Repo.preload(:activity)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify} assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{ test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
user: user user: user,
token: oauth_token
} do } do
blocked = insert(:user) blocked = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked) {:ok, _user_relationship} = User.block(user, blocked)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: ":("}) {:ok, activity} = CommonAPI.post(user, %{status: ":("})
{:ok, _} = CommonAPI.favorite(blocked, activity.id) {:ok, _} = CommonAPI.favorite(blocked, activity.id)
@ -244,14 +323,15 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl
end end
test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{ test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
user: user user: user,
token: oauth_token
} do } do
user2 = insert(:user) user2 = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user, activity) {:ok, _} = CommonAPI.add_mute(user, activity)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
@ -260,12 +340,13 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is
end end
test "it sends favorite to 'user:notification' stream'", %{ test "it sends favorite to 'user:notification' stream'", %{
user: user user: user,
token: oauth_token
} do } do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
@ -274,13 +355,14 @@ test "it sends favorite to 'user:notification' stream'", %{
end end
test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{ test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
user: user user: user,
token: oauth_token
} do } do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
{:ok, user} = User.block_domain(user, "hecking-lewd-place.com") {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
refute_receive _ refute_receive _
@ -288,7 +370,8 @@ test "it doesn't send the 'user:notification' stream' when a domain is blocked",
end end
test "it sends follow activities to the 'user:notification' stream", %{ test "it sends follow activities to the 'user:notification' stream", %{
user: user user: user,
token: oauth_token
} do } do
user_url = user.ap_id user_url = user.ap_id
user2 = insert(:user) user2 = insert(:user)
@ -303,7 +386,7 @@ test "it sends follow activities to the 'user:notification' stream", %{
%Tesla.Env{status: 200, body: body} %Tesla.Env{status: 200, body: body}
end) end)
Streamer.get_topic_and_add_socket("user:notification", user) Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
@ -312,51 +395,53 @@ test "it sends follow activities to the 'user:notification' stream", %{
end end
end end
test "it sends to public authenticated" do describe "public streams" do
user = insert(:user) test "it sends to public (authenticated)" do
other_user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
other_user = insert(:user)
Streamer.get_topic_and_add_socket("public", other_user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "Test"}) {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
refute Streamer.filtered_by_user?(user, activity) refute Streamer.filtered_by_user?(other_user, activity)
end
test "it sends to public (unauthenticated)" do
user = insert(:user)
Streamer.get_topic_and_add_socket("public", nil, nil)
{:ok, activity} = CommonAPI.post(user, %{status: "Test"})
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
assert %{"id" => ^activity_id} = Jason.decode!(payload)
{:ok, _} = CommonAPI.delete(activity.id, user)
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
test "handles deletions" do
%{user: user, token: oauth_token} = oauth_access(["read"])
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, _} = CommonAPI.delete(activity.id, other_user)
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
end end
test "works for deletions" do describe "thread_containment/2" do
user = insert(:user)
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
Streamer.get_topic_and_add_socket("public", user)
{:ok, _} = CommonAPI.delete(activity.id, other_user)
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
test "it sends to public unauthenticated" do
user = insert(:user)
Streamer.get_topic_and_add_socket("public", nil)
{:ok, activity} = CommonAPI.post(user, %{status: "Test"})
activity_id = activity.id
assert_receive {:text, event}
assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
assert %{"id" => ^activity_id} = Jason.decode!(payload)
{:ok, _} = CommonAPI.delete(activity.id, user)
assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
describe "thread_containment" do
test "it filters to user if recipients invalid and thread containment is enabled" do test "it filters to user if recipients invalid and thread containment is enabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], false) Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user) author = insert(:user)
user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
User.follow(user, author, :follow_accept) User.follow(user, author, :follow_accept)
activity = activity =
@ -368,7 +453,7 @@ test "it filters to user if recipients invalid and thread containment is enabled
) )
) )
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user, activity) assert Streamer.filtered_by_user?(user, activity)
@ -377,7 +462,7 @@ test "it filters to user if recipients invalid and thread containment is enabled
test "it sends message if recipients invalid and thread containment is disabled" do test "it sends message if recipients invalid and thread containment is disabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], true) Pleroma.Config.put([:instance, :skip_thread_containment], true)
author = insert(:user) author = insert(:user)
user = insert(:user) %{user: user, token: oauth_token} = oauth_access(["read"])
User.follow(user, author, :follow_accept) User.follow(user, author, :follow_accept)
activity = activity =
@ -389,7 +474,7 @@ test "it sends message if recipients invalid and thread containment is disabled"
) )
) )
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
@ -400,6 +485,7 @@ test "it sends message if recipients invalid and thread containment is enabled b
Pleroma.Config.put([:instance, :skip_thread_containment], false) Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user) author = insert(:user)
user = insert(:user, skip_thread_containment: true) user = insert(:user, skip_thread_containment: true)
%{token: oauth_token} = oauth_access(["read"], user: user)
User.follow(user, author, :follow_accept) User.follow(user, author, :follow_accept)
activity = activity =
@ -411,7 +497,7 @@ test "it sends message if recipients invalid and thread containment is enabled b
) )
) )
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity) Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
@ -420,23 +506,26 @@ test "it sends message if recipients invalid and thread containment is enabled b
end end
describe "blocks" do describe "blocks" do
test "it filters messages involving blocked users" do setup do: oauth_access(["read"])
user = insert(:user)
test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
blocked_user = insert(:user) blocked_user = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked_user) {:ok, _user_relationship} = User.block(user, blocked_user)
Streamer.get_topic_and_add_socket("public", user) Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity} assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user, activity) assert Streamer.filtered_by_user?(user, activity)
end end
test "it filters messages transitively involving blocked users" do test "it filters messages transitively involving blocked users", %{
blocker = insert(:user) user: blocker,
token: blocker_token
} do
blockee = insert(:user) blockee = insert(:user)
friend = insert(:user) friend = insert(:user)
Streamer.get_topic_and_add_socket("public", blocker) Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
{:ok, _user_relationship} = User.block(blocker, blockee) {:ok, _user_relationship} = User.block(blocker, blockee)
@ -458,8 +547,9 @@ test "it filters messages transitively involving blocked users" do
end end
describe "lists" do describe "lists" do
test "it doesn't send unwanted DMs to list" do setup do: oauth_access(["read"])
user_a = insert(:user)
test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
user_b = insert(:user) user_b = insert(:user)
user_c = insert(:user) user_c = insert(:user)
@ -468,7 +558,7 @@ test "it doesn't send unwanted DMs to list" do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
@ -479,14 +569,13 @@ test "it doesn't send unwanted DMs to list" do
refute_receive _ refute_receive _
end end
test "it doesn't send unwanted private posts to list" do test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
user_a = insert(:user)
user_b = insert(:user) user_b = insert(:user)
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
@ -497,8 +586,7 @@ test "it doesn't send unwanted private posts to list" do
refute_receive _ refute_receive _
end end
test "it sends wanted private posts to list" do test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
user_a = insert(:user)
user_b = insert(:user) user_b = insert(:user)
{:ok, user_a} = User.follow(user_a, user_b) {:ok, user_a} = User.follow(user_a, user_b)
@ -506,7 +594,7 @@ test "it sends wanted private posts to list" do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
{:ok, activity} = {:ok, activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
@ -520,8 +608,9 @@ test "it sends wanted private posts to list" do
end end
describe "muted reblogs" do describe "muted reblogs" do
test "it filters muted reblogs" do setup do: oauth_access(["read"])
user1 = insert(:user)
test "it filters muted reblogs", %{user: user1, token: user1_token} do
user2 = insert(:user) user2 = insert(:user)
user3 = insert(:user) user3 = insert(:user)
CommonAPI.follow(user1, user2) CommonAPI.follow(user1, user2)
@ -529,34 +618,38 @@ test "it filters muted reblogs" do
{:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
Streamer.get_topic_and_add_socket("user", user1) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2) {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, _, ^announce_activity} assert_receive {:render_with_user, _, _, ^announce_activity}
assert Streamer.filtered_by_user?(user1, announce_activity) assert Streamer.filtered_by_user?(user1, announce_activity)
end end
test "it filters reblog notification for reblog-muted actors" do test "it filters reblog notification for reblog-muted actors", %{
user1 = insert(:user) user: user1,
token: user1_token
} do
user2 = insert(:user) user2 = insert(:user)
CommonAPI.follow(user1, user2) CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2) CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
Streamer.get_topic_and_add_socket("user", user1) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2) {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
assert Streamer.filtered_by_user?(user1, notif) assert Streamer.filtered_by_user?(user1, notif)
end end
test "it send non-reblog notification for reblog-muted actors" do test "it send non-reblog notification for reblog-muted actors", %{
user1 = insert(:user) user: user1,
token: user1_token
} do
user2 = insert(:user) user2 = insert(:user)
CommonAPI.follow(user1, user2) CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2) CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
Streamer.get_topic_and_add_socket("user", user1) Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
assert_receive {:render_with_user, _, "notification.json", notif} assert_receive {:render_with_user, _, "notification.json", notif}
@ -564,27 +657,28 @@ test "it send non-reblog notification for reblog-muted actors" do
end end
end end
test "it filters posts from muted threads" do describe "muted threads" do
user = insert(:user) test "it filters posts from muted threads" do
user2 = insert(:user) user = insert(:user)
Streamer.get_topic_and_add_socket("user", user2) %{user: user2, token: user2_token} = oauth_access(["read"])
{:ok, user2, user, _activity} = CommonAPI.follow(user2, user) Streamer.get_topic_and_add_socket("user", user2, user2_token)
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user2, activity) {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
assert_receive {:render_with_user, _, _, ^activity} {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
assert Streamer.filtered_by_user?(user2, activity) {:ok, _} = CommonAPI.add_mute(user2, activity)
assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user2, activity)
end
end end
describe "direct streams" do describe "direct streams" do
setup do setup do: oauth_access(["read"])
:ok
end
test "it sends conversation update to the 'direct' stream", %{} do test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
user = insert(:user)
another_user = insert(:user) another_user = insert(:user)
Streamer.get_topic_and_add_socket("direct", user) Streamer.get_topic_and_add_socket("direct", user, oauth_token)
{:ok, _create_activity} = {:ok, _create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
@ -602,11 +696,11 @@ test "it sends conversation update to the 'direct' stream", %{} do
assert last_status["pleroma"]["direct_conversation_id"] == participation.id assert last_status["pleroma"]["direct_conversation_id"] == participation.id
end end
test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
user = insert(:user) %{user: user, token: oauth_token} do
another_user = insert(:user) another_user = insert(:user)
Streamer.get_topic_and_add_socket("direct", user) Streamer.get_topic_and_add_socket("direct", user, oauth_token)
{:ok, create_activity} = {:ok, create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
@ -629,10 +723,12 @@ test "it doesn't send conversation update to the 'direct' stream when the last m
refute_receive _ refute_receive _
end end
test "it sends conversation update to the 'direct' stream when a message is deleted" do test "it sends conversation update to the 'direct' stream when a message is deleted", %{
user = insert(:user) user: user,
token: oauth_token
} do
another_user = insert(:user) another_user = insert(:user)
Streamer.get_topic_and_add_socket("direct", user) Streamer.get_topic_and_add_socket("direct", user, oauth_token)
{:ok, create_activity} = {:ok, create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{