Skip to content

Commit

Permalink
Use an UPSERT SQL query to insert or update the projection version
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Jan 18, 2024
1 parent d77c087 commit 6aa182b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 34 deletions.
53 changes: 28 additions & 25 deletions lib/projections/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,40 @@ defmodule Commanded.Projections.Ecto do
projection_name = Map.fetch!(metadata, :handler_name)
event_number = Map.fetch!(metadata, :event_number)

changeset =
%ProjectionVersion{projection_name: projection_name}
|> ProjectionVersion.changeset(%{last_seen_event_number: event_number})
projection_version = %ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: event_number
}

prefix = schema_prefix(event, metadata)

# Query to update an existing projection version with the last seen event number with
# a check to ensure that the event has not already been projected.
update_projection_version =
from(pv in ProjectionVersion,
where:
pv.projection_name == ^projection_name and pv.last_seen_event_number < ^event_number,
update: [set: [last_seen_event_number: ^event_number]]
)

multi =
Ecto.Multi.new()
|> Ecto.Multi.run(:verify_projection_version, fn repo, _changes ->
version =
case repo.get(ProjectionVersion, projection_name, prefix: prefix) do
nil ->
repo.insert!(
%ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: 0
},
prefix: prefix
)

version ->
version
end

if version.last_seen_event_number < event_number do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
|> Ecto.Multi.run(:track_projection_version, fn repo, _changes ->
try do
repo.insert(projection_version,
prefix: prefix,
on_conflict: update_projection_version,
conflict_target: [:projection_name]
)
rescue
exception in Ecto.StaleEntryError ->
# Attempted to insert a projection version for an already seen event
{:error, :already_seen_event}

exception ->
reraise exception, __STACKTRACE__
end
end)
|> Ecto.Multi.update(:projection_version, changeset, prefix: prefix)

with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]),
{:ok, changes} <- transaction(multi) do
Expand All @@ -99,7 +102,7 @@ defmodule Commanded.Projections.Ecto do
:ok
end
else
{:error, :verify_projection_version, :already_seen_event, _changes} -> :ok
{:error, :track_projection_version, :already_seen_event, _changes} -> :ok
{:error, _stage, error, _changes} -> {:error, error}
{:error, _error} = reply -> reply
end
Expand Down
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ defmodule Commanded.Projections.Ecto.Mixfile do
{:postgrex, ">= 0.0.0", only: :test},

# Optional dependencies
{:jason, "~> 1.3", optional: true},
{:jason, "~> 1.4", optional: true},

# Test & build tooling
{:dialyxir, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:mix_test_watch, "~> 1.1", only: :dev, runtime: false},
{:mox, "~> 1.0", only: :test}
{:mox, "~> 1.1", only: :test}
]
end

Expand Down
21 changes: 21 additions & 0 deletions test/projections/ecto_projection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ defmodule Commanded.Projections.EctoProjectionTest do
assert_seen_event("Projector", 3)
end

test "should prevent an event being projected more than once" do
Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1})
Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2})

tasks =
Enum.map(1..3, fn _index ->
Task.async(fn ->
:timer.sleep(:rand.uniform(10))

Projector.handle(%AnEvent{name: "Event3"}, %{handler_name: "Projector", event_number: 3})
end)
end)

results = Task.await_many(tasks)

assert Enum.uniq(results) == [:ok]

assert_projections(Projection, ["Event1", "Event2", "Event3"])
assert_seen_event("Projector", 3)
end

test "should return an error on failure" do
assert {:error, :failure} ==
Projector.handle(%ErrorEvent{}, %{handler_name: "Projector", event_number: 1})
Expand Down
29 changes: 23 additions & 6 deletions test/projections/runtime_config_projector_test.exs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
defmodule Commanded.Projections.RuntimeConfigProjectorTest do
use ExUnit.Case

import Commanded.Projections.ProjectionAssertions

alias Commanded.EventStore.Adapters.Mock, as: MockEventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.Projections.Events.AnEvent
alias Commanded.Projections.Projection
alias Commanded.Projections.Repo
alias Commanded.Projections.RuntimeConfigProjector
alias Commanded.Projections.{Projection, ProjectionAssertions, Repo, RuntimeConfigProjector}
alias Commanded.UUID

import Mox
import ProjectionAssertions

setup [:set_mox_global, :stub_event_store, :verify_on_exit!]

setup do
start_supervised!(TestApplication)
start_supervised!({TestApplication, event_store: [adapter: MockEventStore]})
Ecto.Adapters.SQL.Sandbox.checkout(Repo)
end

Expand Down Expand Up @@ -54,4 +56,19 @@ defmodule Commanded.Projections.RuntimeConfigProjectorTest do
defp send_events(projector, events) do
send(projector, {:events, events})
end

defp stub_event_store(_context) do
stub(MockEventStore, :ack_event, fn _adapter_meta, _pid, _event -> :ok end)

stub(MockEventStore, :child_spec, fn _application, _config ->
{:ok, [], %{}}
end)

stub(MockEventStore, :subscribe_to, fn
_event_store, :all, _handler_name, _handler, _subscribe_from, _opts ->
{:ok, self()}
end)

:ok
end
end

0 comments on commit 6aa182b

Please sign in to comment.