diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index 5372416e6..4a35ca8fc 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.Websub do alias Pleroma.Web.OStatus.FeedRepresenter alias Pleroma.Web.OStatus alias Pleroma.Web.XML + require Logger import Ecto.Query @@ -98,8 +99,8 @@ defp valid_topic(%{"hub.topic" => topic}, user) do end end - def subscribe(user, topic) do - # Race condition, use transactions + def subscribe(user, topic, requester \\ &request_subscription/1) do + # FIXME: Race condition, use transactions {:ok, subscription} = with subscription when not is_nil(subscription) <- Repo.get_by(WebsubClientSubscription, topic: topic) do subscribers = [user.ap_id, subscription.subcribers] |> Enum.uniq change = Ecto.Changeset.change(subscription, %{subscribers: subscribers}) @@ -109,11 +110,60 @@ def subscribe(user, topic) do topic: topic, subscribers: [user.ap_id], state: "requested", - secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64 + secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64, + user: user } Repo.insert(subscription) end + requester.(subscription) + end - {:ok, subscription} + def discover(topic, getter \\ &HTTPoison.get/1) do + with {:ok, response} <- getter.(topic), + status_code when status_code in 200..299 <- response.status_code, + body <- response.body, + doc <- XML.parse_document(body), + url when not is_nil(url) <- XML.string_from_xpath(~S{/feed/link[@rel="self"]/@href}, doc), + hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do + {:ok, %{url: url, hub: hub}} + else e -> + {:error, e} + end + end + + def request_subscription(websub, poster \\ &HTTPoison.post/3, timeout \\ 10_000) do + data = [ + "hub.mode": "subscribe", + "hub.topic": websub.topic, + "hub.secret": websub.secret, + "hub.callback": "https://social.heldscal.la/callback" + ] + + # This checks once a second if we are confirmed yet + websub_checker = fn -> + helper = fn (helper) -> + :timer.sleep(1000) + websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted") + if websub, do: websub, else: helper.(helper) + end + helper.(helper) + end + + task = Task.async(websub_checker) + + with {:ok, %{status_code: 202}} <- poster.(websub.hub, {:form, data}, ["Content-type": "application/x-www-form-urlencoded"]), + {:ok, websub} <- Task.yield(task, timeout) do + {:ok, websub} + else e -> + Task.shutdown(task) + + change = Ecto.Changeset.change(websub, %{state: "rejected"}) + {:ok, websub} = Repo.update(change) + + Logger.debug("Couldn't confirm subscription: #{inspect(websub)}") + Logger.debug("error: #{inspect(e)}") + + {:error, websub} + end end end diff --git a/test/support/factory.ex b/test/support/factory.ex index d037be4a6..ac276567a 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -76,4 +76,14 @@ def websub_subscription_factory do state: "requested" } end + + def websub_client_subscription_factory do + %Pleroma.Web.Websub.WebsubClientSubscription{ + topic: "http://example.org", + secret: "here's a secret", + valid_until: nil, + state: "requested", + subscribers: [] + } + end end diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs index 7b77e696b..bf243ac91 100644 --- a/test/web/websub/websub_test.exs +++ b/test/web/websub/websub_test.exs @@ -77,7 +77,6 @@ test "an incoming subscription request for an existing subscription" do "hub.lease_seconds" => "100" } - {:ok, subscription } = Websub.incoming_subscription_request(user, data) assert subscription.topic == Pleroma.Web.OStatus.feed_path(user) assert subscription.state == sub.state @@ -87,14 +86,72 @@ test "an incoming subscription request for an existing subscription" do assert subscription.id == sub.id end + def accepting_verifier(subscription) do + {:ok, %{ subscription | state: "accepted" }} + end + test "initiate a subscription for a given user and topic" do user = insert(:user) topic = "http://example.org/some-topic.atom" - {:ok, websub} = Websub.subscribe(user, topic) + {:ok, websub} = Websub.subscribe(user, topic, &accepting_verifier/1) assert websub.subscribers == [user.ap_id] assert websub.topic == topic assert is_binary(websub.secret) + assert websub.user == user assert websub.state == "accepted" end + + test "discovers the hub and canonical url" do + topic = "https://mastodon.social/users/lambadalambda.atom" + + getter = fn(^topic) -> + doc = File.read!("test/fixtures/lambadalambda.atom") + {:ok, %{status_code: 200, body: doc}} + end + + {:ok, discovered} = Websub.discover(topic, getter) + assert %{hub: "https://mastodon.social/api/push", url: topic} == discovered + end + + test "calls the hub, requests topic" do + hub = "https://social.heldscal.la/main/push/hub" + topic = "https://social.heldscal.la/api/statuses/user_timeline/23211.atom" + websub = insert(:websub_client_subscription, %{hub: hub, topic: topic}) + + poster = fn (^hub, {:form, data}, _headers) -> + assert Keyword.get(data, :"hub.mode") == "subscribe" + {:ok, %{status_code: 202}} + end + + task = Task.async(fn -> Websub.request_subscription(websub, poster) end) + + change = Ecto.Changeset.change(websub, %{state: "accepted"}) + {:ok, _} = Repo.update(change) + + {:ok, websub} = Task.await(task) + + assert websub.state == "accepted" + end + + test "rejects the subscription if it can't be accepted" do + hub = "https://social.heldscal.la/main/push/hub" + topic = "https://social.heldscal.la/api/statuses/user_timeline/23211.atom" + websub = insert(:websub_client_subscription, %{hub: hub, topic: topic}) + + poster = fn (^hub, {:form, _data}, _headers) -> + {:ok, %{status_code: 202}} + end + + {:error, websub} = Websub.request_subscription(websub, poster, 1000) + assert websub.state == "rejected" + + websub = insert(:websub_client_subscription, %{hub: hub, topic: topic}) + poster = fn (^hub, {:form, _data}, _headers) -> + {:ok, %{status_code: 400}} + end + + {:error, websub} = Websub.request_subscription(websub, poster, 1000) + assert websub.state == "rejected" + end end