Skip to content

Commit

Permalink
TASK: Update neos/eventstore packages to 1.0
Browse files Browse the repository at this point in the history
Updates `neos/eventstore` and `neos/eventstore-doctrineadapter` packages to their first final release and adjusts code accordingly.

Furthermore this incorporates the following changes:

* Add `CheckpointStorageInterface` and its implementation `DbalCheckpointStorage` (aka `DoctrineCheckpointStorage`) from `neos/eventstore-doctrineadapter` (see neos/eventstore-doctrineadapter#13)
* Add `CatchUp` from `neos/eventstore` (see neos/eventstore#19)
  • Loading branch information
bwaidelich committed Jan 21, 2024
1 parent 9beb48c commit 23040e9
Show file tree
Hide file tree
Showing 22 changed files with 412 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
Expand All @@ -54,11 +55,8 @@
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;

/**
* @implements ProjectionInterface<ContentGraph>
Expand All @@ -81,7 +79,7 @@ final class DoctrineDbalContentGraphProjection implements ProjectionInterface, W
*/
private ?ContentGraph $contentGraph = null;

private DoctrineCheckpointStorage $checkpointStorage;
private DbalCheckpointStorage $checkpointStorage;

public function __construct(
private readonly DbalClientInterface $dbalClient,
Expand All @@ -91,7 +89,7 @@ public function __construct(
private readonly ProjectionContentGraph $projectionContentGraph,
private readonly string $tableNamePrefix,
) {
$this->checkpointStorage = new DoctrineCheckpointStorage(
$this->checkpointStorage = new DbalCheckpointStorage(
$this->dbalClient->getConnection(),
$this->tableNamePrefix . '_checkpoint',
self::class
Expand All @@ -111,10 +109,10 @@ protected function getTableNamePrefix(): string
public function setUp(): void
{
$this->setupTables();
$this->checkpointStorage->setup();
$this->checkpointStorage->setUp();
}

private function setupTables(): SetupResult
private function setupTables(): void
{
$connection = $this->dbalClient->getConnection();
$schemaManager = $connection->getSchemaManager();
Expand All @@ -128,7 +126,6 @@ private function setupTables(): SetupResult
foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) {
$connection->executeStatement($statement);
}
return SetupResult::success('');
}

public function reset(): void
Expand Down Expand Up @@ -202,7 +199,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
};
}

public function getCheckpointStorage(): CheckpointStorageInterface
public function getCheckpointStorage(): DbalCheckpointStorage
{
return $this->checkpointStorage;
}
Expand Down Expand Up @@ -1239,7 +1236,8 @@ private function getDatabaseConnection(): Connection

private static function initiatingDateTime(EventEnvelope $eventEnvelope): \DateTimeImmutable
{
$result = $eventEnvelope->event->metadata->has('initiatingTimestamp') ? \DateTimeImmutable::createFromFormat(\DateTimeInterface::ATOM, $eventEnvelope->event->metadata->get('initiatingTimestamp')) : $eventEnvelope->recordedAt;
$initiatingTimestamp = $eventEnvelope->event->metadata?->get('initiatingTimestamp');
$result = $initiatingTimestamp !== null ? \DateTimeImmutable::createFromFormat(\DateTimeInterface::ATOM, $initiatingTimestamp) : $eventEnvelope->recordedAt;
if (!$result instanceof \DateTimeImmutable) {
throw new \RuntimeException(sprintf('Failed to extract initiating timestamp from event "%s"', $eventEnvelope->event->id->value), 1678902291);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodePeerVariantWasCreated;
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;

/**
* The alternate reality-aware hypergraph projector for the PostgreSQL backend via Doctrine DBAL
Expand All @@ -76,7 +74,7 @@ final class HypergraphProjection implements ProjectionInterface
* so that always the same instance is returned
*/
private ?ContentHypergraph $contentHypergraph = null;
private DoctrineCheckpointStorage $checkpointStorage;
private DbalCheckpointStorage $checkpointStorage;
private ProjectionHypergraph $projectionHypergraph;

public function __construct(
Expand All @@ -87,7 +85,7 @@ public function __construct(
private readonly string $tableNamePrefix,
) {
$this->projectionHypergraph = new ProjectionHypergraph($this->databaseClient, $this->tableNamePrefix);
$this->checkpointStorage = new DoctrineCheckpointStorage(
$this->checkpointStorage = new DbalCheckpointStorage(
$this->databaseClient->getConnection(),
$this->tableNamePrefix . '_checkpoint',
self::class
Expand All @@ -98,10 +96,10 @@ public function __construct(
public function setUp(): void
{
$this->setupTables();
$this->checkpointStorage->setup();
$this->checkpointStorage->setUp();
}

private function setupTables(): SetupResult
private function setupTables(): void
{
$connection = $this->databaseClient->getConnection();
HypergraphSchemaBuilder::registerTypes($connection->getDatabasePlatform());
Expand All @@ -124,8 +122,6 @@ private function setupTables(): SetupResult
create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');

return SetupResult::success('');
}

public function reset(): void
Expand Down Expand Up @@ -212,7 +208,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
};
}

public function getCheckpointStorage(): CheckpointStorageInterface
public function getCheckpointStorage(): DbalCheckpointStorage
{
return $this->checkpointStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;

/**
* We had some race conditions in projections, where {@see DoctrineCheckpointStorage} was not working properly.
* We had some race conditions in projections, where {@see \Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage} was not working properly.
* We saw some non-deterministic, random errors when running the tests - unluckily only on Linux, not on OSX:
* On OSX, forking a new subprocess in {@see SubprocessProjectionCatchUpTrigger} is *WAY* slower than in Linux;
* and thus the race conditions which appears if two projector instances of the same class run concurrently
Expand Down
17 changes: 4 additions & 13 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CatchUp;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder;
Expand All @@ -36,13 +37,10 @@
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\EventStore\CatchUp\CatchUp;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Neos\EventStore\ProvidesSetupInterface;
use Psr\Clock\ClockInterface;

/**
Expand Down Expand Up @@ -118,7 +116,7 @@ public function handle(CommandInterface $command): CommandResult
$initiatingUserId,
$initiatingTimestamp
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata->value : [];
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
$metadata['initiatingUserId'] ??= $initiatingUserId;
$metadata['initiatingTimestamp'] ??= $initiatingTimestamp;
return DecoratedEvent::withMetadata($event, EventMetadata::fromArray($metadata));
Expand Down Expand Up @@ -193,19 +191,12 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
$catchUpHook?->onAfterCatchUp();
}

public function setUp(): SetupResult
public function setUp(): void
{
if ($this->eventStore instanceof ProvidesSetupInterface) {
$result = $this->eventStore->setup();
// TODO better result object
if ($result->errors !== []) {
return $result;
}
}
$this->eventStore->setup();
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
$projection->setUp();
}
return SetupResult::success('done');
}

public function resetProjectionStates(): void
Expand Down
24 changes: 18 additions & 6 deletions Neos.ContentRepository.Core/Classes/EventStore/DecoratedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Neos\ContentRepository\Core\EventStore;

use Neos\EventStore\Model\Event\CausationId;
use Neos\EventStore\Model\Event\CorrelationId;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventMetadata;

Expand All @@ -17,7 +19,9 @@ final class DecoratedEvent
private function __construct(
public readonly EventInterface $innerEvent,
public readonly EventId $eventId,
public readonly EventMetadata $eventMetadata,
public readonly ?EventMetadata $eventMetadata = null,
public readonly ?CausationId $causationId = null,
public readonly ?CorrelationId $correlationId = null,
) {
}

Expand All @@ -35,19 +39,27 @@ public static function withEventId(DecoratedEvent|EventInterface $event, EventId

public static function withCausationId(
DecoratedEvent|EventInterface $event,
EventId $causationId
EventId|CausationId $causationId
): self {
$event = self::wrapWithDecoratedEventIfNecessary($event);
$eventMetadata = $event->eventMetadata->value;
$eventMetadata['causationId'] = $causationId->value;
if ($causationId instanceof EventId) {
$causationId = CausationId::fromString($causationId->value);
}
return new self($event->innerEvent, $event->eventId, $event->eventMetadata, $causationId, $event->correlationId);
}

return new self($event->innerEvent, $event->eventId, EventMetadata::fromArray($eventMetadata));
public static function withCorrelationId(
DecoratedEvent|EventInterface $event,
CorrelationId $correlationId
): self {
$event = self::wrapWithDecoratedEventIfNecessary($event);
return new self($event->innerEvent, $event->eventId, $event->eventMetadata, $event->causationId, $correlationId);
}

private static function wrapWithDecoratedEventIfNecessary(EventInterface|DecoratedEvent $event): DecoratedEvent
{
if ($event instanceof EventInterface) {
$event = new self($event, EventId::create(), EventMetadata::none());
$event = new self($event, EventId::create());
}
return $event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,18 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult

private function normalizeEvent(EventInterface|DecoratedEvent $event): Event
{
if ($event instanceof DecoratedEvent) {
$eventId = $event->eventId;
$eventMetadata = $event->eventMetadata;
$event = $event->innerEvent;
} else {
$eventId = EventId::create();
$eventMetadata = EventMetadata::none();
}
$eventId = $event instanceof DecoratedEvent ? $event->eventId : EventId::create();
$eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null;
$causationId = $event instanceof DecoratedEvent ? $event->causationId : null;
$correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null;
$event = $event instanceof DecoratedEvent ? $event->innerEvent : $event;
return new Event(
$eventId,
$this->eventNormalizer->getEventType($event),
$this->eventNormalizer->getEventData($event),
$eventMetadata,
$causationId,
$correlationId,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,7 @@ private function publishContentStream(
$copiedEvent = $event->createCopyForContentStream($baseContentStreamId);
// We need to add the event metadata here for rebasing in nested workspace situations
// (and for exporting)
$events[] = DecoratedEvent::withMetadata(
$copiedEvent,
$eventEnvelope->event->metadata
);
$events[] = $eventEnvelope->event->metadata !== null ? DecoratedEvent::withMetadata($copiedEvent, $eventEnvelope->event->metadata) : $copiedEvent;
}
}

Expand Down Expand Up @@ -464,7 +461,7 @@ private function extractCommandsFromContentStreamMetadata(

$commands = [];
foreach ($workspaceContentStream as $eventEnvelope) {
$metadata = $eventEnvelope->event->metadata->value;
$metadata = $eventEnvelope->event->metadata?->value ?? [];
// TODO: Add this logic to the NodeAggregateCommandHandler;
// so that we can be sure these can be parsed again.
if (isset($metadata['commandClass'])) {
Expand Down
Loading

0 comments on commit 23040e9

Please sign in to comment.