From 23040e9616bf1d1264478951f587f8666a076d52 Mon Sep 17 00:00:00 2001 From: Bastian Waidelich Date: Sun, 21 Jan 2024 12:55:49 +0100 Subject: [PATCH] TASK: Update neos/eventstore packages to 1.0 Updates `neos/eventstore` and `neos/eventstore-doctrineadapter` packages to their first final release and adjusts code accordingly. Furthermore this incorporates the following changes: * Add `CheckpointStorageInterface` and its implementation `DbalCheckpointStorage` (aka `DoctrineCheckpointStorage`) from `neos/eventstore-doctrineadapter` (see https://github.com/neos/eventstore-doctrineadapter/pull/13) * Add `CatchUp` from `neos/eventstore` (see https://github.com/neos/eventstore/pull/19) --- .../DoctrineDbalContentGraphProjection.php | 18 ++- .../Projection/HypergraphProjection.php | 16 +-- .../RaceTrackerCatchUpHook.php | 3 +- .../Classes/ContentRepository.php | 17 +-- .../Classes/EventStore/DecoratedEvent.php | 24 +++- .../Classes/EventStore/EventPersister.php | 15 +- .../Feature/WorkspaceCommandHandler.php | 7 +- .../Infrastructure/DbalCheckpointStorage.php | 125 +++++++++++++++++ .../Classes/Projection/CatchUp.php | 132 ++++++++++++++++++ .../Projection/CheckpointStorageInterface.php | 56 ++++++++ .../ContentGraph/ContentGraphProjection.php | 2 +- .../ContentStream/ContentStreamProjection.php | 13 +- .../NodeHiddenStateProjection.php | 11 +- .../Projection/ProjectionInterface.php | 3 +- .../Workspace/WorkspaceProjection.php | 11 +- .../src/Event/ValueObject/ExportedEvent.php | 2 +- .../Processors/EventStoreImportProcessor.php | 15 +- .../Projection/AssetUsageProjection.php | 13 +- .../Projection/AssetUsageRepository.php | 17 ++- .../Projection/DocumentUriPathProjection.php | 15 +- .../ChangeProjection.php | 11 +- composer.json | 4 +- 22 files changed, 412 insertions(+), 118 deletions(-) create mode 100644 Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php create mode 100644 Neos.ContentRepository.Core/Classes/Projection/CatchUp.php create mode 100644 Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php diff --git a/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php b/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php index 7a724ed2846..0502fa6b8fa 100644 --- a/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php +++ b/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php @@ -44,6 +44,7 @@ use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated; use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated; use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; use Neos\ContentRepository\Core\NodeType\NodeTypeName; @@ -54,11 +55,8 @@ use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; use Neos\ContentRepository\Core\SharedModel\Node\NodeName; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; -use Neos\EventStore\Model\EventStore\SetupResult; /** * @implements ProjectionInterface @@ -81,7 +79,7 @@ final class DoctrineDbalContentGraphProjection implements ProjectionInterface, W */ private ?ContentGraph $contentGraph = null; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; public function __construct( private readonly DbalClientInterface $dbalClient, @@ -91,7 +89,7 @@ public function __construct( private readonly ProjectionContentGraph $projectionContentGraph, private readonly string $tableNamePrefix, ) { - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->dbalClient->getConnection(), $this->tableNamePrefix . '_checkpoint', self::class @@ -111,10 +109,10 @@ protected function getTableNamePrefix(): string public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } - private function setupTables(): SetupResult + private function setupTables(): void { $connection = $this->dbalClient->getConnection(); $schemaManager = $connection->getSchemaManager(); @@ -128,7 +126,6 @@ private function setupTables(): SetupResult foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) { $connection->executeStatement($statement); } - return SetupResult::success(''); } public function reset(): void @@ -202,7 +199,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } @@ -1239,7 +1236,8 @@ private function getDatabaseConnection(): Connection private static function initiatingDateTime(EventEnvelope $eventEnvelope): \DateTimeImmutable { - $result = $eventEnvelope->event->metadata->has('initiatingTimestamp') ? \DateTimeImmutable::createFromFormat(\DateTimeInterface::ATOM, $eventEnvelope->event->metadata->get('initiatingTimestamp')) : $eventEnvelope->recordedAt; + $initiatingTimestamp = $eventEnvelope->event->metadata?->get('initiatingTimestamp'); + $result = $initiatingTimestamp !== null ? \DateTimeImmutable::createFromFormat(\DateTimeInterface::ATOM, $initiatingTimestamp) : $eventEnvelope->recordedAt; if (!$result instanceof \DateTimeImmutable) { throw new \RuntimeException(sprintf('Failed to extract initiating timestamp from event "%s"', $eventEnvelope->event->id->value), 1678902291); } diff --git a/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php b/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php index bf8d017432b..5932e236157 100644 --- a/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php +++ b/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php @@ -45,13 +45,11 @@ use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodePeerVariantWasCreated; use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated; use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; -use Neos\EventStore\Model\EventStore\SetupResult; /** * The alternate reality-aware hypergraph projector for the PostgreSQL backend via Doctrine DBAL @@ -76,7 +74,7 @@ final class HypergraphProjection implements ProjectionInterface * so that always the same instance is returned */ private ?ContentHypergraph $contentHypergraph = null; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; private ProjectionHypergraph $projectionHypergraph; public function __construct( @@ -87,7 +85,7 @@ public function __construct( private readonly string $tableNamePrefix, ) { $this->projectionHypergraph = new ProjectionHypergraph($this->databaseClient, $this->tableNamePrefix); - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->databaseClient->getConnection(), $this->tableNamePrefix . '_checkpoint', self::class @@ -98,10 +96,10 @@ public function __construct( public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } - private function setupTables(): SetupResult + private function setupTables(): void { $connection = $this->databaseClient->getConnection(); HypergraphSchemaBuilder::registerTypes($connection->getDatabasePlatform()); @@ -124,8 +122,6 @@ private function setupTables(): SetupResult create index if not exists restriction_affected on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids); '); - - return SetupResult::success(''); } public function reset(): void @@ -212,7 +208,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } diff --git a/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php b/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php index 1bc16309a1d..34f089877af 100644 --- a/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php +++ b/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php @@ -21,10 +21,9 @@ use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\Projection\CatchUpHookInterface; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; /** - * We had some race conditions in projections, where {@see DoctrineCheckpointStorage} was not working properly. + * We had some race conditions in projections, where {@see \Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage} was not working properly. * We saw some non-deterministic, random errors when running the tests - unluckily only on Linux, not on OSX: * On OSX, forking a new subprocess in {@see SubprocessProjectionCatchUpTrigger} is *WAY* slower than in Linux; * and thus the race conditions which appears if two projector instances of the same class run concurrently diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index f0221f5f79d..61e6ff1533d 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -28,6 +28,7 @@ use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; +use Neos\ContentRepository\Core\Projection\CatchUp; use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder; @@ -36,13 +37,10 @@ use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; -use Neos\EventStore\CatchUp\CatchUp; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\EventMetadata; use Neos\EventStore\Model\EventEnvelope; -use Neos\EventStore\Model\EventStore\SetupResult; use Neos\EventStore\Model\EventStream\VirtualStreamName; -use Neos\EventStore\ProvidesSetupInterface; use Psr\Clock\ClockInterface; /** @@ -118,7 +116,7 @@ public function handle(CommandInterface $command): CommandResult $initiatingUserId, $initiatingTimestamp ) { - $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata->value : []; + $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : []; $metadata['initiatingUserId'] ??= $initiatingUserId; $metadata['initiatingTimestamp'] ??= $initiatingTimestamp; return DecoratedEvent::withMetadata($event, EventMetadata::fromArray($metadata)); @@ -193,19 +191,12 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o $catchUpHook?->onAfterCatchUp(); } - public function setUp(): SetupResult + public function setUp(): void { - if ($this->eventStore instanceof ProvidesSetupInterface) { - $result = $this->eventStore->setup(); - // TODO better result object - if ($result->errors !== []) { - return $result; - } - } + $this->eventStore->setup(); foreach ($this->projectionsAndCatchUpHooks->projections as $projection) { $projection->setUp(); } - return SetupResult::success('done'); } public function resetProjectionStates(): void diff --git a/Neos.ContentRepository.Core/Classes/EventStore/DecoratedEvent.php b/Neos.ContentRepository.Core/Classes/EventStore/DecoratedEvent.php index 0dbf8ad2d61..f09c26ceed9 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/DecoratedEvent.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/DecoratedEvent.php @@ -4,6 +4,8 @@ namespace Neos\ContentRepository\Core\EventStore; +use Neos\EventStore\Model\Event\CausationId; +use Neos\EventStore\Model\Event\CorrelationId; use Neos\EventStore\Model\Event\EventId; use Neos\EventStore\Model\Event\EventMetadata; @@ -17,7 +19,9 @@ final class DecoratedEvent private function __construct( public readonly EventInterface $innerEvent, public readonly EventId $eventId, - public readonly EventMetadata $eventMetadata, + public readonly ?EventMetadata $eventMetadata = null, + public readonly ?CausationId $causationId = null, + public readonly ?CorrelationId $correlationId = null, ) { } @@ -35,19 +39,27 @@ public static function withEventId(DecoratedEvent|EventInterface $event, EventId public static function withCausationId( DecoratedEvent|EventInterface $event, - EventId $causationId + EventId|CausationId $causationId ): self { $event = self::wrapWithDecoratedEventIfNecessary($event); - $eventMetadata = $event->eventMetadata->value; - $eventMetadata['causationId'] = $causationId->value; + if ($causationId instanceof EventId) { + $causationId = CausationId::fromString($causationId->value); + } + return new self($event->innerEvent, $event->eventId, $event->eventMetadata, $causationId, $event->correlationId); + } - return new self($event->innerEvent, $event->eventId, EventMetadata::fromArray($eventMetadata)); + public static function withCorrelationId( + DecoratedEvent|EventInterface $event, + CorrelationId $correlationId + ): self { + $event = self::wrapWithDecoratedEventIfNecessary($event); + return new self($event->innerEvent, $event->eventId, $event->eventMetadata, $event->causationId, $correlationId); } private static function wrapWithDecoratedEventIfNecessary(EventInterface|DecoratedEvent $event): DecoratedEvent { if ($event instanceof EventInterface) { - $event = new self($event, EventId::create(), EventMetadata::none()); + $event = new self($event, EventId::create()); } return $event; } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 42e17b5e591..8974211cdae 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -75,19 +75,18 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult private function normalizeEvent(EventInterface|DecoratedEvent $event): Event { - if ($event instanceof DecoratedEvent) { - $eventId = $event->eventId; - $eventMetadata = $event->eventMetadata; - $event = $event->innerEvent; - } else { - $eventId = EventId::create(); - $eventMetadata = EventMetadata::none(); - } + $eventId = $event instanceof DecoratedEvent ? $event->eventId : EventId::create(); + $eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null; + $causationId = $event instanceof DecoratedEvent ? $event->causationId : null; + $correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null; + $event = $event instanceof DecoratedEvent ? $event->innerEvent : $event; return new Event( $eventId, $this->eventNormalizer->getEventType($event), $this->eventNormalizer->getEventData($event), $eventMetadata, + $causationId, + $correlationId, ); } } diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 7fc7d47f176..f7878e58048 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -317,10 +317,7 @@ private function publishContentStream( $copiedEvent = $event->createCopyForContentStream($baseContentStreamId); // We need to add the event metadata here for rebasing in nested workspace situations // (and for exporting) - $events[] = DecoratedEvent::withMetadata( - $copiedEvent, - $eventEnvelope->event->metadata - ); + $events[] = $eventEnvelope->event->metadata !== null ? DecoratedEvent::withMetadata($copiedEvent, $eventEnvelope->event->metadata) : $copiedEvent; } } @@ -464,7 +461,7 @@ private function extractCommandsFromContentStreamMetadata( $commands = []; foreach ($workspaceContentStream as $eventEnvelope) { - $metadata = $eventEnvelope->event->metadata->value; + $metadata = $eventEnvelope->event->metadata?->value ?? []; // TODO: Add this logic to the NodeAggregateCommandHandler; // so that we can be sure these can be parsed again. if (isset($metadata['commandClass'])) { diff --git a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php b/Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php new file mode 100644 index 00000000000..d79396cc7c0 --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php @@ -0,0 +1,125 @@ +connection->getDatabasePlatform(); + if (!($platform instanceof MySqlPlatform || $platform instanceof PostgreSqlPlatform)) { + throw new \InvalidArgumentException(sprintf('The %s only supports the platforms %s and %s currently. Given: %s', $this::class, MySqlPlatform::class, PostgreSqlPlatform::class, get_debug_type($platform)), 1660556004); + } + if (strlen($this->subscriberId) > 255) { + throw new \InvalidArgumentException('The subscriberId must not exceed 255 characters', 1705673456); + } + $this->platform = $platform; + } + + public function setUp(): void + { + $schemaManager = $this->connection->getSchemaManager(); + if (!$schemaManager instanceof AbstractSchemaManager) { + throw new \RuntimeException('Failed to retrieve Schema Manager', 1652269057); + } + $schema = new Schema(); + $table = $schema->createTable($this->tableName); + $table->addColumn('subscriberid', Types::STRING, ['length' => 255]); + $table->addColumn('appliedsequencenumber', Types::INTEGER); + $table->setPrimaryKey(['subscriberid']); + + $schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema); + foreach ($schemaDiff->toSaveSql($this->platform) as $statement) { + $this->connection->executeStatement($statement); + } + try { + $this->connection->insert($this->tableName, ['subscriberid' => $this->subscriberId, 'appliedsequencenumber' => 0]); + } catch (UniqueConstraintViolationException $e) { + // table and row already exists, ignore + } + } + + public function acquireLock(): SequenceNumber + { + if ($this->connection->isTransactionActive()) { + throw new \RuntimeException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because a transaction is active already', $this->subscriberId), 1652268416); + } + $this->connection->beginTransaction(); + try { + $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ' . $this->platform->getForUpdateSQL() . ' NOWAIT', [ + 'subscriberId' => $this->subscriberId + ]); + } catch (DBALException $exception) { + $this->connection->rollBack(); + if ($exception instanceof LockWaitTimeoutException || ($exception instanceof DBALDriverException && ($exception->getErrorCode() === 3572 || $exception->getErrorCode() === 7))) { + throw new \RuntimeException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because it is acquired already', $this->subscriberId), 1652279016); + } + throw new \RuntimeException($exception->getMessage(), 1544207778, $exception); + } + if (!is_numeric($highestAppliedSequenceNumber)) { + $this->connection->rollBack(); + throw new \RuntimeException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setUp()', $this->subscriberId, $this::class), 1652279139); + } + $this->lockedSequenceNumber = SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); + return $this->lockedSequenceNumber; + } + + public function updateAndReleaseLock(SequenceNumber $sequenceNumber): void + { + if ($this->lockedSequenceNumber === null) { + throw new \RuntimeException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because the lock has not been acquired successfully before', $this->subscriberId), 1660556344); + } + if (!$this->connection->isTransactionActive()) { + throw new \RuntimeException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because no transaction is active', $this->subscriberId), 1652279314); + } + try { + if (!$this->lockedSequenceNumber->equals($sequenceNumber)) { + $this->connection->update($this->tableName, ['appliedsequencenumber' => $sequenceNumber->value], ['subscriberid' => $this->subscriberId]); + } + $this->connection->commit(); + } catch (DBALException $exception) { + $this->connection->rollBack(); + throw new \RuntimeException(sprintf('Failed to update and commit highest applied sequence number for subscriber "%s". Please run %s::setUp()', $this->subscriberId, $this::class), 1652279375, $exception); + } finally { + $this->lockedSequenceNumber = null; + } + } + + public function getHighestAppliedSequenceNumber(): SequenceNumber + { + $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ', [ + 'subscriberId' => $this->subscriberId + ]); + if (!is_numeric($highestAppliedSequenceNumber)) { + throw new \RuntimeException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setUp()', $this->subscriberId, $this::class), 1652279427); + } + return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); + } + +} diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php new file mode 100644 index 00000000000..962c8a1a46e --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php @@ -0,0 +1,132 @@ +batchSize < 1) { + throw new \InvalidArgumentException(sprintf('batch size must be a positive integer, given: %d', $this->batchSize), 1705672467); + } + } + + /** + * @param \Closure(EventEnvelope): void $eventHandler The callback that is invoked for every {@see EventEnvelope} that is processed + * @param CheckpointStorageInterface $checkpointStorage The checkpoint storage that saves the last processed {@see SequenceNumber} + */ + public static function create(\Closure $eventHandler, CheckpointStorageInterface $checkpointStorage): self + { + return new self($eventHandler, $checkpointStorage, 1, null); + } + + /** + * After how many events should the (database) transaction be committed? + * + * @param int $batchSize Number of events to process before the checkpoint is written + */ + public function withBatchSize(int $batchSize): self + { + if ($batchSize === $this->batchSize) { + return $this; + } + return new self($this->eventHandler, $this->checkpointStorage, $batchSize, $this->onBeforeBatchCompletedHook); + } + + /** + * This hook is called directly before the sequence number is persisted back in CheckpointStorage. + * Use this to trigger any operation which need to happen BEFORE the sequence number update is made + * visible to the outside. + * + * Overrides all previously registered onBeforeBatchCompleted hooks. + * + * @param \Closure(): void $callback the hook being called before the batch is completed + */ + public function withOnBeforeBatchCompleted(\Closure $callback): self + { + return new self($this->eventHandler, $this->checkpointStorage, $this->batchSize, $callback); + } + + /** + * Iterate over the $eventStream, invoke the specified event handler closure for every {@see EventEnvelope} and update + * the last processed sequence number in the {@see CheckpointStorageInterface} + * + * @param EventStreamInterface $eventStream The event stream to process + * @return SequenceNumber The last processed {@see SequenceNumber} + * @throws \Throwable Exceptions that are thrown during callback handling are re-thrown + */ + public function run(EventStreamInterface $eventStream): SequenceNumber + { + $highestAppliedSequenceNumber = $this->checkpointStorage->acquireLock(); + $iteration = 0; + try { + foreach ($eventStream->withMinimumSequenceNumber($highestAppliedSequenceNumber->next()) as $eventEnvelope) { + if ($eventEnvelope->sequenceNumber->value <= $highestAppliedSequenceNumber->value) { + continue; + } + ($this->eventHandler)($eventEnvelope); + $iteration++; + if ($this->batchSize === 1 || $iteration % $this->batchSize === 0) { + if ($this->onBeforeBatchCompletedHook) { + ($this->onBeforeBatchCompletedHook)(); + } + $this->checkpointStorage->updateAndReleaseLock($eventEnvelope->sequenceNumber); + $highestAppliedSequenceNumber = $this->checkpointStorage->acquireLock(); + } else { + $highestAppliedSequenceNumber = $eventEnvelope->sequenceNumber; + } + } + } finally { + try { + if ($this->onBeforeBatchCompletedHook) { + ($this->onBeforeBatchCompletedHook)(); + } + } catch (\Throwable $e) { + $this->checkpointStorage->updateAndReleaseLock($highestAppliedSequenceNumber); + throw $e; + } + $this->checkpointStorage->updateAndReleaseLock($highestAppliedSequenceNumber); + } + return $highestAppliedSequenceNumber; + } +} diff --git a/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php b/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php new file mode 100644 index 00000000000..4ca7f5b4606 --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php @@ -0,0 +1,56 @@ +checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->dbalClient->getConnection(), $this->tableName . '_checkpoint', self::class @@ -76,7 +75,7 @@ public function __construct( public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } private function setupTables(): void diff --git a/Neos.ContentRepository.Core/Classes/Projection/NodeHiddenState/NodeHiddenStateProjection.php b/Neos.ContentRepository.Core/Classes/Projection/NodeHiddenState/NodeHiddenStateProjection.php index 8fe377c4054..2e5a17ea9bb 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/NodeHiddenState/NodeHiddenStateProjection.php +++ b/Neos.ContentRepository.Core/Classes/Projection/NodeHiddenState/NodeHiddenStateProjection.php @@ -26,11 +26,10 @@ use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Event\DimensionSpacePointWasMoved; use Neos\ContentRepository\Core\Feature\NodeDisabling\Event\NodeAggregateWasDisabled; use Neos\ContentRepository\Core\Feature\NodeDisabling\Event\NodeAggregateWasEnabled; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface; use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; @@ -42,13 +41,13 @@ class NodeHiddenStateProjection implements ProjectionInterface { private ?NodeHiddenStateFinder $nodeHiddenStateFinder; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; public function __construct( private readonly DbalClientInterface $dbalClient, private readonly string $tableName ) { - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->dbalClient->getConnection(), $this->tableName . '_checkpoint', self::class @@ -58,7 +57,7 @@ public function __construct( public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } private function setupTables(): void @@ -116,7 +115,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php index 249683514cf..8c722561d83 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php @@ -6,7 +6,6 @@ use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\EventStore\EventInterface; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; use Neos\EventStore\Model\EventEnvelope; /** @@ -22,7 +21,7 @@ interface ProjectionInterface { /** - * Set up the projection state (create databases, call CheckpointStorage::setup()). + * Set up the projection state (create databases, call {@see CheckpointStorageInterface::setUp()}). */ public function setUp(): void; diff --git a/Neos.ContentRepository.Core/Classes/Projection/Workspace/WorkspaceProjection.php b/Neos.ContentRepository.Core/Classes/Projection/Workspace/WorkspaceProjection.php index 85ed95bbc4d..52c792466e7 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/Workspace/WorkspaceProjection.php +++ b/Neos.ContentRepository.Core/Classes/Projection/Workspace/WorkspaceProjection.php @@ -34,14 +34,13 @@ use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPublished; use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceRebaseFailed; use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface; use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; @@ -58,14 +57,14 @@ class WorkspaceProjection implements ProjectionInterface, WithMarkStaleInterface * so that always the same instance is returned */ private ?WorkspaceFinder $workspaceFinder = null; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; private WorkspaceRuntimeCache $workspaceRuntimeCache; public function __construct( private readonly DbalClientInterface $dbalClient, private readonly string $tableName, ) { - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->dbalClient->getConnection(), $this->tableName . '_checkpoint', self::class @@ -76,7 +75,7 @@ public function __construct( public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } private function setupTables(): void @@ -149,7 +148,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } diff --git a/Neos.ContentRepository.Export/src/Event/ValueObject/ExportedEvent.php b/Neos.ContentRepository.Export/src/Event/ValueObject/ExportedEvent.php index ac765f5d3e5..8e9c7d9ff66 100644 --- a/Neos.ContentRepository.Export/src/Event/ValueObject/ExportedEvent.php +++ b/Neos.ContentRepository.Export/src/Event/ValueObject/ExportedEvent.php @@ -23,7 +23,7 @@ public static function fromRawEvent(Event $event): self $event->id->value, $event->type->value, \json_decode($event->data->value, true), - $event->metadata->value, + $event->metadata?->value ?? [], ); } diff --git a/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php b/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php index 9afa36dc0cc..23606b1942f 100644 --- a/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php +++ b/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php @@ -159,19 +159,18 @@ public function run(): ProcessorResult */ private function normalizeEvent(EventInterface|DecoratedEvent $event): Event { - if ($event instanceof DecoratedEvent) { - $eventId = $event->eventId; - $eventMetadata = $event->eventMetadata; - $event = $event->innerEvent; - } else { - $eventId = EventId::create(); - $eventMetadata = EventMetadata::none(); - } + $eventId = $event instanceof DecoratedEvent ? $event->eventId : EventId::create(); + $eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null; + $causationId = $event instanceof DecoratedEvent ? $event->causationId : null; + $correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null; + $event = $event instanceof DecoratedEvent ? $event->innerEvent : $event; return new Event( $eventId, $this->eventNormalizer->getEventType($event), $this->eventNormalizer->getEventData($event), $eventMetadata, + $causationId, + $correlationId, ); } diff --git a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php index d3b25a3e10f..0238af9be13 100644 --- a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php +++ b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php @@ -20,9 +20,8 @@ use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPartiallyPublished; use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPublished; use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; use Neos\Media\Domain\Model\AssetInterface; @@ -43,7 +42,7 @@ final class AssetUsageProjection implements ProjectionInterface { private ?AssetUsageFinder $stateAccessor = null; private AssetUsageRepository $repository; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; /** @var array */ private array $originalAssetIdMappingRuntimeCache = []; @@ -54,7 +53,7 @@ public function __construct( AssetUsageRepositoryFactory $assetUsageRepositoryFactory, ) { $this->repository = $assetUsageRepositoryFactory->build($contentRepositoryId); - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $dbal, $this->repository->getTableNamePrefix() . '_checkpoint', self::class @@ -232,8 +231,8 @@ private function extractAssetIds(string $type, mixed $value): array public function setUp(): void { - $this->repository->setup(); - $this->checkpointStorage->setup(); + $this->repository->setUp(); + $this->checkpointStorage->setUp(); } public function canHandle(EventInterface $event): bool @@ -271,7 +270,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } diff --git a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php index 517fd56637a..12b2bcca192 100644 --- a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php +++ b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php @@ -9,24 +9,23 @@ use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\ForwardCompatibility\Result; use Doctrine\DBAL\Platforms\AbstractPlatform; +use Doctrine\DBAL\Schema\AbstractSchemaManager; use Doctrine\DBAL\Schema\Column; +use Doctrine\DBAL\Schema\Comparator; use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; +use Doctrine\DBAL\Types\Types; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; +use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint; use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; -use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; -use Neos\Neos\AssetUsage\Dto\AssetUsageNodeAddress; -use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint; +use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; +use Neos\Neos\AssetUsage\Dto\AssetIdAndOriginalAssetId; use Neos\Neos\AssetUsage\Dto\AssetIdsByProperty; use Neos\Neos\AssetUsage\Dto\AssetUsage; use Neos\Neos\AssetUsage\Dto\AssetUsageFilter; +use Neos\Neos\AssetUsage\Dto\AssetUsageNodeAddress; use Neos\Neos\AssetUsage\Dto\AssetUsages; -use Doctrine\DBAL\Schema\AbstractSchemaManager; -use Doctrine\DBAL\Schema\Comparator; -use Doctrine\DBAL\Schema\Schema; -use Doctrine\DBAL\Types\Types; -use Neos\Neos\AssetUsage\Dto\AssetIdAndOriginalAssetId; /** * @internal Not meant to be used in user land code. In order to look up asset usages the AssetUsageFinder can be used @@ -39,7 +38,7 @@ public function __construct( ) { } - public function setup(): void + public function setUp(): void { $schemaManager = $this->dbal->getSchemaManager(); if (!$schemaManager instanceof AbstractSchemaManager) { diff --git a/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php b/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php index b0b9fa03f4f..9185975b296 100644 --- a/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php +++ b/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php @@ -30,16 +30,14 @@ use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated; use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; use Neos\ContentRepository\Core\NodeType\NodeTypeName; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; -use Neos\EventStore\Model\EventStore\SetupResult; use Neos\Neos\Domain\Model\SiteNodeName; use Neos\Neos\Domain\Service\NodeTypeNameFactory; use Neos\Neos\FrontendRouting\Exception\NodeNotFoundException; @@ -53,7 +51,7 @@ final class DocumentUriPathProjection implements ProjectionInterface, WithMarkSt 'shortcutTarget' => Types::JSON, ]; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; private ?DocumentUriPathFinder $stateAccessor = null; /** @@ -66,7 +64,7 @@ public function __construct( private readonly Connection $dbal, private readonly string $tableNamePrefix, ) { - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->dbal, $this->tableNamePrefix . '_checkpoint', self::class @@ -76,10 +74,10 @@ public function __construct( public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } - private function setupTables(): SetupResult + private function setupTables(): void { $connection = $this->dbal; $schemaManager = $connection->getSchemaManager(); @@ -92,7 +90,6 @@ private function setupTables(): SetupResult foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) { $connection->executeStatement($statement); } - return SetupResult::success(''); } @@ -158,7 +155,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } diff --git a/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php b/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php index 31cd1ecabf1..64218c28e85 100644 --- a/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php +++ b/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php @@ -36,13 +36,12 @@ use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodePeerVariantWasCreated; use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; +use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface; use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; -use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; @@ -65,13 +64,13 @@ class ChangeProjection implements ProjectionInterface * @var array|null */ private ?array $liveContentStreamIdsRuntimeCache = null; - private DoctrineCheckpointStorage $checkpointStorage; + private DbalCheckpointStorage $checkpointStorage; public function __construct( private readonly DbalClientInterface $dbalClient, private readonly string $tableNamePrefix, ) { - $this->checkpointStorage = new DoctrineCheckpointStorage( + $this->checkpointStorage = new DbalCheckpointStorage( $this->dbalClient->getConnection(), $this->tableNamePrefix . '_checkpoint', self::class @@ -82,7 +81,7 @@ public function __construct( public function setUp(): void { $this->setupTables(); - $this->checkpointStorage->setup(); + $this->checkpointStorage->setUp(); } private function setupTables(): void @@ -186,7 +185,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void }; } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpointStorage(): DbalCheckpointStorage { return $this->checkpointStorage; } diff --git a/composer.json b/composer.json index 8f8f059a8e5..da3a15ec7e8 100644 --- a/composer.json +++ b/composer.json @@ -9,8 +9,8 @@ "neos/flow-development-collection": "9.0.x-dev", "doctrine/dbal": "^2.8", "doctrine/migrations": "*", - "neos/eventstore": "~1.0.0", - "neos/eventstore-doctrineadapter": "~1.0.0", + "neos/eventstore": "^1", + "neos/eventstore-doctrineadapter": "^1", "php": "^8.2", "neos/error-messages": "*", "neos/utility-objecthandling": "*",