Skip to content

Commit

Permalink
🚧 subscriber: Comment out handle_batch/4 and add a generic message pa…
Browse files Browse the repository at this point in the history
…ssing based subscriber
  • Loading branch information
Alexandra Wolf committed Oct 4, 2023
1 parent 2c347d9 commit cbee3ae
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 13 deletions.
30 changes: 17 additions & 13 deletions lib/gazet/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,20 @@ defmodule Gazet.Subscriber do
@type implementation :: module

@type opts :: [unquote(Gazet.Options.typespec(schema))]
# TODO: Allow state modification?
@type result :: :ok | :skip | {:error, reason :: any}

@callback init(spec) :: {:ok, config()} | {:error, reason :: any}

@callback handle_batch(
topic :: Gazet.topic(),
batch ::
nonempty_list({
Gazet.Message.data(),
Gazet.Message.metadata()
}),
config :: config
) :: result
# @callback handle_batch(
# topic :: Gazet.topic(),
# batch ::
# nonempty_list({
# Gazet.Message.data(),
# Gazet.Message.metadata()
# }),
# config :: config
# ) :: result

@callback handle_message(
topic :: Gazet.topic(),
Expand All @@ -70,8 +71,6 @@ defmodule Gazet.Subscriber do
config :: config
) :: result

@optional_callbacks handle_message: 4, handle_error: 5

@spec spec(spec | opts) :: Gazet.Spec.result(__MODULE__)
def spec(values), do: Gazet.Spec.build(__MODULE__, values)
@spec spec!(spec | opts) :: spec | no_return
Expand Down Expand Up @@ -119,9 +118,14 @@ defmodule Gazet.Subscriber do
end

@impl Gazet.Subscriber
def init(%Gazet.Subscriber{extra: extra}), do: {:ok, extra}
def init(%Gazet.Subscriber{config: config}), do: {:ok, config}

@impl Gazet.Subscriber
def handle_error(reason, _topic, _message, _metadata, _config) do
{:error, reason}
end

defoverridable init: 1
defoverridable init: 1, handle_error: 5
end
end
end
75 changes: 75 additions & 0 deletions lib/gazet/subscriber/generic.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
defmodule Gazet.Subscriber.Generic do
use GenServer

alias Gazet.Subscriber

def child_spec(%Subscriber{adapter_opts: opts} = subscriber) do
Supervisor.child_spec(
%{
id: subscriber.id,
start: {__MODULE__, :start_link, [subscriber]}
},
opts[:child_spec] || []
)
end

def start_link(%Subscriber{} = subscriber) do
GenServer.start_link(__MODULE__, subscriber, subscriber.adapter_opts[:start_opts] || [])
end

def init(%Subscriber{module: module} = subscriber) do
with {:ok, config} <- module.init(subscriber) do
{
:ok,
%{subscriber | config: config},
{:continue, {:on_start, subscriber.adapter_opts[:on_start]}}
}
end
end

def handle_continue({:on_start, nil}, %Subscriber{} = subscriber) do
{:noreply, subscriber}
end

def handle_continue({:on_start, on_start}, %Subscriber{} = subscriber) do
on_start
|> case do
function when is_function(function, 0) ->
function.()

function when is_function(function, 1) ->
function.(subscriber)

{module, function, args} ->
apply(module, function, [subscriber | args])
end
|> case do
:ok -> {:noreply, subscriber}
{:ok, %Subscriber{} = subscriber} -> {:noreply, subscriber}
{:error, reason} -> {:stop, reason}
end
end

def handle_info(
{:message, topic, %Gazet.Message{} = message},
%Subscriber{module: module} = subscriber
) do
case module.handle_message(topic, message.data, message.metadata, subscriber.config) do
fine when fine in [:ok, :skip] ->
{:noreply, subscriber}

{:error, reason} ->
handle_error(reason, topic, message, subscriber)
end
end

defp handle_error(reason, topic, message, %{module: module} = subscriber) do
case module.handle_error(reason, topic, message.data, message.metadata, subscriber.config) do
fine when fine in [:ok, :skip] ->
{:noreply, subscriber}

{:error, reason} ->
{:stop, reason}
end
end
end

0 comments on commit cbee3ae

Please sign in to comment.