diff --git a/pom.xml b/pom.xml
index 958835b..1a4efb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
3.26.0
2024-05.1
4.4
+ 3.14.0
11
1.12.0
5.10.0
@@ -113,6 +114,12 @@
${reactor.version}
test
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+ test
+
org.assertj
assertj-core
diff --git a/src/main/java/org/neo4j/cdc/client/CDCClient.java b/src/main/java/org/neo4j/cdc/client/CDCClient.java
index 08d1287..4311381 100644
--- a/src/main/java/org/neo4j/cdc/client/CDCClient.java
+++ b/src/main/java/org/neo4j/cdc/client/CDCClient.java
@@ -146,13 +146,16 @@ public Flux query(ChangeIdentifier from) {
.doOnComplete(() -> log.trace("subscription to cdc query completed"));
}
- @Override
public Flux stream(ChangeIdentifier from) {
var cursor = new AtomicReference<>(from);
var query = Flux.usingWhen(
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
+ var current = Mono.from(tx.run("CALL cdc.current()").records())
+ .map(MapAccessor::asMap)
+ .map(ResultMapper::parseChangeIdentifier);
+
var params = Map.of(
"from",
cursor.get().getId(),
@@ -162,7 +165,13 @@ public Flux stream(ChangeIdentifier from) {
log.trace("running cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);
- return Flux.from(result.records()).map(MapAccessor::asMap).map(ResultMapper::parseChangeEvent);
+ return current.flatMapMany(changeId -> Flux.from(result.records())
+ .map(MapAccessor::asMap)
+ .map(ResultMapper::parseChangeEvent)
+ .switchIfEmpty(Flux.defer(() -> {
+ cursor.set(changeId);
+ return Flux.empty();
+ })));
})),
RxSession::close);
diff --git a/src/main/java/org/neo4j/cdc/client/ResultMapper.java b/src/main/java/org/neo4j/cdc/client/ResultMapper.java
index 97ff1bb..932955b 100644
--- a/src/main/java/org/neo4j/cdc/client/ResultMapper.java
+++ b/src/main/java/org/neo4j/cdc/client/ResultMapper.java
@@ -23,9 +23,6 @@
import org.neo4j.cdc.client.model.Event;
import org.neo4j.cdc.client.model.Metadata;
-/**
- * @author Gerrit Meier
- */
public final class ResultMapper {
private static final String ID_FIELD = "id";
private static final String TX_ID_FIELD = "txId";
diff --git a/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java b/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java
index e5e0bc0..f6ff323 100644
--- a/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java
+++ b/src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java
@@ -21,8 +21,6 @@
/**
* The implementation can provide a session config that
* will be called each time before a session gets created.
- *
- * @author Gerrit Meier
*/
@FunctionalInterface
public interface SessionConfigSupplier {
diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java
index d5af27d..bde5854 100644
--- a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java
+++ b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java
@@ -35,9 +35,6 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.test.StepVerifier;
-/**
- * @author Gerrit Meier
- */
@Testcontainers
public abstract class CDCClientIT {
diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java
new file mode 100644
index 0000000..e18d0d2
--- /dev/null
+++ b/src/test/java/org/neo4j/cdc/client/CDCClientWithAggressiveLogPruningIT.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [https://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.cdc.client;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.neo4j.cdc.client.model.ChangeIdentifier;
+import org.neo4j.cdc.client.selector.EntitySelector;
+import org.neo4j.driver.*;
+import org.neo4j.driver.exceptions.ClientException;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.test.StepVerifier;
+
+@Testcontainers
+public class CDCClientWithAggressiveLogPruningIT {
+ @Container
+ private static final Neo4jContainer> neo4j = new Neo4jContainer<>("neo4j:5-enterprise")
+ .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
+ .withNeo4jConfig("db.tx_log.rotation.retention_policy", "3 files")
+ .withNeo4jConfig("db.tx_log.rotation.size", String.format("%d", 128 * 1024))
+ .withAdminPassword("passw0rd");
+
+ private static Driver driver;
+
+ @BeforeAll
+ static void setup() {
+ driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd"));
+ }
+
+ @AfterAll
+ static void cleanup() {
+ driver.close();
+ }
+
+ @BeforeEach
+ void reset() {
+ try (var session = driver.session(SessionConfig.forDatabase("system"))) {
+ session.run(
+ "CREATE OR REPLACE DATABASE $db OPTIONS {txLogEnrichment: $mode} WAIT",
+ Map.of("db", "neo4j", "mode", "FULL"))
+ .consume();
+ }
+ }
+
+ @Test
+ void shouldFailWithInvalidChangeIdentifierAfterLogPruning() {
+ var current = current();
+
+ // make large changes that will cause at least 3 tx log file rotation
+ for (var i = 0; i < 1000; i++) {
+ try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
+ session.run(
+ "CREATE (n:Test) SET n.id = $id, n.data = $data",
+ Map.of("id", i, "data", RandomStringUtils.random(1024, true, true)));
+ }
+ }
+
+ // checkpoint
+ try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
+ session.run("CALL db.checkpoint()").consume();
+ }
+
+ StepVerifier.create(new CDCClient(driver, () -> SessionConfig.forDatabase("neo4j")).query(current))
+ .expectErrorSatisfies(t -> assertThat(t)
+ .isInstanceOf(ClientException.class)
+ .hasMessage("Unable to find the transaction entry for the given change identifier"))
+ .verify();
+ }
+
+ @Test
+ void shouldNotFailWithInvalidChangeIdentifierAfterLogPruningWhenStreaming()
+ throws ExecutionException, InterruptedException {
+ var executor = Executors.newSingleThreadExecutor();
+ var current = current();
+
+ var generateChanges = new AtomicBoolean(true);
+ var changes = executor.submit(() -> {
+ var counter = 0;
+
+ while (generateChanges.get()) {
+ // make changes
+ for (var j = 0; j < 10; j++) {
+ try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
+ session.run(
+ "CREATE (n:Test) SET n.id = $id, n.data = $data",
+ Map.of("id", counter * 100 + j, "data", RandomStringUtils.random(1024, true, true)),
+ TransactionConfig.builder()
+ .withMetadata(Map.of("app", "hr"))
+ .build());
+ }
+ }
+
+ // checkpoint
+ try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
+ session.run("CALL db.checkpoint()").consume();
+ }
+
+ counter++;
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ try {
+ StepVerifier.create(new CDCClient(
+ driver,
+ () -> SessionConfig.forDatabase("neo4j"),
+ EntitySelector.builder()
+ .withTxMetadata(Map.of("app", "no-match"))
+ .build())
+ .stream(current))
+ .expectTimeout(Duration.ofMinutes(1))
+ .verify();
+ } finally {
+ generateChanges.set(false);
+ changes.get();
+ }
+ }
+
+ private ChangeIdentifier current() {
+ try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
+ return new ChangeIdentifier(
+ session.run("CALL cdc.current()").single().get(0).asString());
+ }
+ }
+}
diff --git a/src/test/java/org/neo4j/cdc/client/ResultMapperTest.java b/src/test/java/org/neo4j/cdc/client/ResultMapperTest.java
index 2043a15..38cf54f 100644
--- a/src/test/java/org/neo4j/cdc/client/ResultMapperTest.java
+++ b/src/test/java/org/neo4j/cdc/client/ResultMapperTest.java
@@ -29,9 +29,6 @@
import org.junit.jupiter.api.Test;
import org.neo4j.cdc.client.model.*;
-/**
- * @author Gerrit Meier
- */
public class ResultMapperTest {
@Test