Skip to content

Commit

Permalink
Inline docs
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed Nov 7, 2023
1 parent e758380 commit cc66f11
Show file tree
Hide file tree
Showing 31 changed files with 225 additions and 28 deletions.
52 changes: 34 additions & 18 deletions src/CatchUp/CatchUp.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
declare(strict_types=1);
namespace Neos\EventStore\CatchUp;

use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\EventStreamInterface;
Expand All @@ -16,9 +15,8 @@
* It ensures that a given projection **never runs concurrently** and thus prevents race conditions where the same
* projector is accidentally running multiple times in parallel.
*
* If you use the {@see DoctrineCheckpointStorage}, and share the same database connection with your projection,
* this class **implements Exactly-Once Semantics for your projections**, to ensure each event is seen
* EXACTLY once in your projection.
* If you use the {@see \Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage}, and share the same database connection with your projection,
* this class **implements Exactly-Once Semantics for your projections**, to ensure each event is seen EXACTLY once in your projection.
*
* ## How does it work?
*
Expand All @@ -29,9 +27,9 @@
* After every batchSize events (typically after every event), we update the sequence number and commit
* the transaction (via {@see CheckpointStorageInterface::updateAndReleaseLock()}). Then, we open a new transaction.
*
* In case of errors, we rollback the transaction.
*
* TODO: can you use own transactions in your projection code? I (SK) am currently not sure about this.
* In case of errors, the transaction is rolled back. So event listeners with their own state best share the same
* database connection.
* If a new transaction is started on the same connection within the event listener callback, the transaction will be nested (@see https://www.doctrine-project.org/projects/doctrine-dbal/en/3.7/reference/transactions.html#transaction-nesting)
*
* ## Example Usage (inside your projection)
*
Expand Down Expand Up @@ -90,9 +88,17 @@
* }
* }
* ```
*
* @api
*/
final class CatchUp
{
/**
* @param \Closure(EventEnvelope): void $eventHandler The callback that is invoked for every {@see EventEnvelope} that is processed
* @param CheckpointStorageInterface $checkpointStorage The checkpoint storage that saves the last processed {@see SequenceNumber}
* @param int $batchSize Number of events to process before the checkpoint is written (defaults to 1 in order to guarantee exactly-once semantics) – ({@see withBatchSize()})
* @param \Closure(): void|null $onBeforeBatchCompletedHook Optional callback that is invoked before the sequence number is updated ({@see withOnBeforeBatchCompleted()})
*/
private function __construct(
private readonly \Closure $eventHandler,
private readonly CheckpointStorageInterface $checkpointStorage,
Expand All @@ -102,16 +108,19 @@ private function __construct(
Assert::positiveInteger($batchSize);
}

public static function create(\Closure $eventApplier, CheckpointStorageInterface $checkpointStorage): self
/**
* @param \Closure(EventEnvelope): void $eventHandler The callback that is invoked for every {@see EventEnvelope} that is processed
* @param CheckpointStorageInterface $checkpointStorage The checkpoint storage that saves the last processed {@see SequenceNumber}
*/
public static function create(\Closure $eventHandler, CheckpointStorageInterface $checkpointStorage): self
{
return new self($eventApplier, $checkpointStorage, 1, null);
return new self($eventHandler, $checkpointStorage, 1, null);
}

/**
* After how many events should the (database) transaction be committed?
*
* @param int $batchSize
* @return $this
* @param int $batchSize Number of events to process before the checkpoint is written
*/
public function withBatchSize(int $batchSize): self
{
Expand All @@ -128,33 +137,40 @@ public function withBatchSize(int $batchSize): self
*
* Overrides all previously registered onBeforeBatchCompleted hooks.
*
* @param Closure $callback the hook being called before the batch is completed
* @return $this
* @param \Closure(): void $callback the hook being called before the batch is completed
*/
public function withOnBeforeBatchCompleted(\Closure $callback): self
{
return new self($this->eventHandler, $this->checkpointStorage, $this->batchSize, $callback);
}

/**
* Iterate over the $eventStream, invoke the specified event handler closure for every {@see EventEnvelope} and update
* the last processed sequence number in the {@see CheckpointStorageInterface}
*
* @param EventStreamInterface $eventStream The event stream to process
* @return SequenceNumber The last processed {@see SequenceNumber}
* @throws Throwable Exceptions that are thrown during callback handling are re-thrown
*/
public function run(EventStreamInterface $eventStream): SequenceNumber
{
$highestAppliedSequenceNumber = $this->checkpointStorage->acquireLock();
$iteration = 0;
try {
foreach ($eventStream->withMinimumSequenceNumber($highestAppliedSequenceNumber->next()) as $event) {
if ($event->sequenceNumber->value <= $highestAppliedSequenceNumber->value) {
foreach ($eventStream->withMinimumSequenceNumber($highestAppliedSequenceNumber->next()) as $eventEnvelope) {
if ($eventEnvelope->sequenceNumber->value <= $highestAppliedSequenceNumber->value) {
continue;
}
($this->eventHandler)($event);
($this->eventHandler)($eventEnvelope);
$iteration ++;
if ($this->batchSize === 1 || $iteration % $this->batchSize === 0) {
if ($this->onBeforeBatchCompletedHook) {
($this->onBeforeBatchCompletedHook)();
}
$this->checkpointStorage->updateAndReleaseLock($event->sequenceNumber);
$this->checkpointStorage->updateAndReleaseLock($eventEnvelope->sequenceNumber);
$highestAppliedSequenceNumber = $this->checkpointStorage->acquireLock();
} else {
$highestAppliedSequenceNumber = $event->sequenceNumber;
$highestAppliedSequenceNumber = $eventEnvelope->sequenceNumber;
}
}
} finally {
Expand Down
30 changes: 30 additions & 0 deletions src/CatchUp/CheckpointStorageInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,43 @@
namespace Neos\EventStore\CatchUp;

use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\ProvidesSetupInterface;

/**
* Contract for a central authority that keeps track of which event has been processed by a single event listener to prevent
* the same event to be applied multiple times.
*
* Implementations of this interface should start an exclusive lock with {@see self::acquireLock()} in order to prevent a
* separate instance (potentially in a separate process) to return the same {@see SequenceNumber}.
*
* An instance of this class is always ever responsible for a single event handler.
* If both, the event handler and its checkpoint storage, use the same backend (for example the same database connection)
* to manage their state, Exactly-Once Semantics can be guaranteed.
*
* See {@see CatchUp} for an explanation what this class does in detail.
* @api
*/
interface CheckpointStorageInterface
{
/**
* Obtain an exclusive lock (to prevent multiple instances from being executed simultaneously)
* and return the highest {@see SequenceNumber} that was processed by this checkpoint storage.
*
* Note: Some implementations require to be initialized once ({@see ProvidesSetupInterface})
*
* @return SequenceNumber The sequence number that was previously set via {@see updateAndReleaseLock()} or SequenceNumber(0) if it was not updated before
*/
public function acquireLock(): SequenceNumber;

/**
* Store the new {@see SequenceNumber} and release the lock
*
* @param SequenceNumber $sequenceNumber The sequence number to store – usually after the corresponding event was processed by a listener or when a projection was reset
*/
public function updateAndReleaseLock(SequenceNumber $sequenceNumber): void;

/**
* @return SequenceNumber the last {@see SequenceNumber} that was set via {@see updateAndReleaseLock()} without acquiring a lock
*/
public function getHighestAppliedSequenceNumber(): SequenceNumber;
}
32 changes: 27 additions & 5 deletions src/EventStoreInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Neos\EventStore;

use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\EventStore\CommitResult;
use Neos\EventStore\Model\EventStream\EventStreamFilter;
use Neos\EventStore\Model\EventStream\EventStreamInterface;
Expand All @@ -11,17 +13,37 @@
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Neos\EventStore\Model\Events;

/**
* Common interface for an event store backend
* @api
*/
interface EventStoreInterface
{
/**
* Load events from the specified stream (or virtual stream) in the order they were persisted, optionally applying a filter
*
* @param StreamName|VirtualStreamName $streamName The stream or virtual stream to fetch events from
* @param EventStreamFilter|null $filter Optional filter that allows to skip certain events
* @return EventStreamInterface The resulting event stream that can be iterated
*/
public function load(StreamName|VirtualStreamName $streamName, EventStreamFilter $filter = null): EventStreamInterface;

/**
* @param StreamName $streamName
* @param Events $events
* @param ExpectedVersion $expectedVersion
* @return CommitResult
* @throws ConcurrencyException in case the expectedVersion does not match
* Append one or more events to the specified stream
*
* @param StreamName $streamName Name of the stream to append the event(s) to
* @param Events $events The events to append to the stream
* @param ExpectedVersion $expectedVersion The expected {@see Version} of the last event in the specified stream
* @return CommitResult The result of this call that contains information about the committed {@see Version} and {@see SequenceNumber}
* @throws ConcurrencyException in case that the $expectedVersion check fails
*/
public function commit(StreamName $streamName, Events $events, ExpectedVersion $expectedVersion): CommitResult;

/**
* Permanently remove all events from the specified stream
* Note: Not all implementations might support this!
*
* @param StreamName $streamName Name of the stream to prune
*/
public function deleteStream(StreamName $streamName): void;
}
7 changes: 7 additions & 0 deletions src/Exception/CheckpointException.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
declare(strict_types=1);
namespace Neos\EventStore\Exception;

use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* Exception that can occur when acquiring or updating a {@see SequenceNumber} via {@see CheckpointStorageInterface}
* @api
*/
final class CheckpointException extends \RuntimeException
{
}
7 changes: 7 additions & 0 deletions src/Exception/ConcurrencyException.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
declare(strict_types=1);
namespace Neos\EventStore\Exception;

use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\EventStream\ExpectedVersion;

/**
* Exception that can occur when the {@see ExpectedVersion} is not satisfied in a {@see EventStoreInterface::commit()} call
* @api
*/
final class ConcurrencyException extends \RuntimeException
{
}
14 changes: 14 additions & 0 deletions src/Helper/BatchEventStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
use Neos\EventStore\Model\EventStream\EventStreamInterface;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* A wrapper that allows to process any instance of {@see EventStreamInterface} in batches
* This can be used to stream over a large amount of events without having to load each event individually (or to load all events into memory even)
*
* Usage:
*
* $stream = BatchEventStream::create($originalStream, 100); // for a batch size of 100
*
* @api
*/
final class BatchEventStream implements EventStreamInterface
{
private function __construct(
Expand All @@ -20,6 +30,10 @@ private function __construct(
}
}

/**
* @param EventStreamInterface $wrappedEventStream The original event stream that will be processed in batches
* @param int $batchSize Number of events to load at once
*/
public static function create(EventStreamInterface $wrappedEventStream, int $batchSize): self
{
return new self($wrappedEventStream, $batchSize, null, null, null, false);
Expand Down
9 changes: 9 additions & 0 deletions src/Helper/ClosureEventStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@
declare(strict_types=1);
namespace Neos\EventStore\Helper;

use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\EventStreamInterface;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* Implementation of an event stream that forwards iteration to a custom \Closure
*
* @internal This helper is mostly useful for testing purposes and should not be used in production
*/
final class ClosureEventStream implements EventStreamInterface
{

Expand All @@ -17,6 +23,9 @@ private function __construct(
) {
}

/**
* @param \Closure(?SequenceNumber, ?SequenceNumber, ?int, bool): \Traversable<EventEnvelope> $closure
*/
public static function create(\Closure $closure): self
{
return new self($closure, null, null, null, false);
Expand Down
5 changes: 5 additions & 0 deletions src/Helper/InMemoryCheckpointStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
use Neos\EventStore\Exception\CheckpointException;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* In-memory implementation of a checkpoint storage
*
* @internal This helper is mostly useful for testing purposes and should not be used in production
*/
final class InMemoryCheckpointStorage implements CheckpointStorageInterface
{

Expand Down
5 changes: 5 additions & 0 deletions src/Helper/InMemoryEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
use Neos\EventStore\Model\EventStream\VirtualStreamType;
use Neos\EventStore\Model\Events;

/**
* In-memorry implementation of an event store
*
* @internal This helper is mostly useful for testing purposes and should not be used in production
*/
final class InMemoryEventStore implements EventStoreInterface
{
/**
Expand Down
5 changes: 5 additions & 0 deletions src/Helper/InMemoryEventStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* In-memory implementation of an event stream
*
* @internal This helper is mostly useful for testing purposes and should not be used in production
*/
final class InMemoryEventStream implements EventStreamInterface
{

Expand Down
3 changes: 2 additions & 1 deletion src/Model/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
use Neos\EventStore\Model\Event\EventType;

/**
* Main model for reading and writing (when reading, it is wrapped in {@see EventEnvelope}.
* Main model for reading and writing (when reading, it is wrapped in {@see EventEnvelope})
* @api
*/
final class Event
{
Expand Down
5 changes: 5 additions & 0 deletions src/Model/Event/EventData.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
declare(strict_types=1);
namespace Neos\EventStore\Model\Event;

/**
* The actual payload of an event, usually serialized as JSON
*
* @api
*/
final class EventData
{
private function __construct(
Expand Down
5 changes: 5 additions & 0 deletions src/Model/Event/EventId.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

use Ramsey\Uuid\Uuid;

/**
* Globally unique id of an event, usually in the form of a UUID
*
* @api
*/
final class EventId
{
private function __construct(
Expand Down
5 changes: 5 additions & 0 deletions src/Model/Event/EventMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

use Webmozart\Assert\Assert;

/**
* Arbitrary metadata that can be attached to events, serialized as JSON
*
* @api
*/
final class EventMetadata
{
/**
Expand Down
6 changes: 5 additions & 1 deletion src/Model/Event/EventType.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
declare(strict_types=1);
namespace Neos\EventStore\Model\Event;

/// TODO make flyweight
/**
* The type of event, for example "CustomerHasSignedUp"
*
* @api
*/
final class EventType
{
private function __construct(
Expand Down
Loading

0 comments on commit cc66f11

Please sign in to comment.