From cbee3ae795c1a5f316069df3245cfd4b6ded9b7e Mon Sep 17 00:00:00 2001 From: Alexandra Wolf Date: Thu, 5 Oct 2023 01:09:23 +0200 Subject: [PATCH] :construction: subscriber: Comment out handle_batch/4 and add a generic message passing based subscriber --- lib/gazet/subscriber.ex | 30 +++++++------ lib/gazet/subscriber/generic.ex | 75 +++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 13 deletions(-) create mode 100644 lib/gazet/subscriber/generic.ex diff --git a/lib/gazet/subscriber.ex b/lib/gazet/subscriber.ex index d9a182c..77cff95 100644 --- a/lib/gazet/subscriber.ex +++ b/lib/gazet/subscriber.ex @@ -41,19 +41,20 @@ defmodule Gazet.Subscriber do @type implementation :: module @type opts :: [unquote(Gazet.Options.typespec(schema))] + # TODO: Allow state modification? @type result :: :ok | :skip | {:error, reason :: any} @callback init(spec) :: {:ok, config()} | {:error, reason :: any} - @callback handle_batch( - topic :: Gazet.topic(), - batch :: - nonempty_list({ - Gazet.Message.data(), - Gazet.Message.metadata() - }), - config :: config - ) :: result + # @callback handle_batch( + # topic :: Gazet.topic(), + # batch :: + # nonempty_list({ + # Gazet.Message.data(), + # Gazet.Message.metadata() + # }), + # config :: config + # ) :: result @callback handle_message( topic :: Gazet.topic(), @@ -70,8 +71,6 @@ defmodule Gazet.Subscriber do config :: config ) :: result - @optional_callbacks handle_message: 4, handle_error: 5 - @spec spec(spec | opts) :: Gazet.Spec.result(__MODULE__) def spec(values), do: Gazet.Spec.build(__MODULE__, values) @spec spec!(spec | opts) :: spec | no_return @@ -119,9 +118,14 @@ defmodule Gazet.Subscriber do end @impl Gazet.Subscriber - def init(%Gazet.Subscriber{extra: extra}), do: {:ok, extra} + def init(%Gazet.Subscriber{config: config}), do: {:ok, config} + + @impl Gazet.Subscriber + def handle_error(reason, _topic, _message, _metadata, _config) do + {:error, reason} + end - defoverridable init: 1 + defoverridable init: 1, handle_error: 5 end end end diff --git a/lib/gazet/subscriber/generic.ex b/lib/gazet/subscriber/generic.ex new file mode 100644 index 0000000..aad506f --- /dev/null +++ b/lib/gazet/subscriber/generic.ex @@ -0,0 +1,75 @@ +defmodule Gazet.Subscriber.Generic do + use GenServer + + alias Gazet.Subscriber + + def child_spec(%Subscriber{adapter_opts: opts} = subscriber) do + Supervisor.child_spec( + %{ + id: subscriber.id, + start: {__MODULE__, :start_link, [subscriber]} + }, + opts[:child_spec] || [] + ) + end + + def start_link(%Subscriber{} = subscriber) do + GenServer.start_link(__MODULE__, subscriber, subscriber.adapter_opts[:start_opts] || []) + end + + def init(%Subscriber{module: module} = subscriber) do + with {:ok, config} <- module.init(subscriber) do + { + :ok, + %{subscriber | config: config}, + {:continue, {:on_start, subscriber.adapter_opts[:on_start]}} + } + end + end + + def handle_continue({:on_start, nil}, %Subscriber{} = subscriber) do + {:noreply, subscriber} + end + + def handle_continue({:on_start, on_start}, %Subscriber{} = subscriber) do + on_start + |> case do + function when is_function(function, 0) -> + function.() + + function when is_function(function, 1) -> + function.(subscriber) + + {module, function, args} -> + apply(module, function, [subscriber | args]) + end + |> case do + :ok -> {:noreply, subscriber} + {:ok, %Subscriber{} = subscriber} -> {:noreply, subscriber} + {:error, reason} -> {:stop, reason} + end + end + + def handle_info( + {:message, topic, %Gazet.Message{} = message}, + %Subscriber{module: module} = subscriber + ) do + case module.handle_message(topic, message.data, message.metadata, subscriber.config) do + fine when fine in [:ok, :skip] -> + {:noreply, subscriber} + + {:error, reason} -> + handle_error(reason, topic, message, subscriber) + end + end + + defp handle_error(reason, topic, message, %{module: module} = subscriber) do + case module.handle_error(reason, topic, message.data, message.metadata, subscriber.config) do + fine when fine in [:ok, :skip] -> + {:noreply, subscriber} + + {:error, reason} -> + {:stop, reason} + end + end +end