Skip to content

Commit

Permalink
fix: update cursor with current when no changes are returned (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince authored Jun 24, 2024
1 parent 161435c commit 1202fcf
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 13 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<assertj-core.version>3.26.0</assertj-core.version>
<build-resources.version>2024-05.1</build-resources.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<java.version>11</java.version>
<jreleaser-maven-plugin.version>1.12.0</jreleaser-maven-plugin.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down Expand Up @@ -113,6 +114,12 @@
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/org/neo4j/cdc/client/CDCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,16 @@ public Flux<ChangeEvent> query(ChangeIdentifier from) {
.doOnComplete(() -> log.trace("subscription to cdc query completed"));
}

@Override
public Flux<ChangeEvent> stream(ChangeIdentifier from) {
var cursor = new AtomicReference<>(from);

var query = Flux.usingWhen(
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
var current = Mono.from(tx.run("CALL cdc.current()").records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeIdentifier);

var params = Map.of(
"from",
cursor.get().getId(),
Expand All @@ -162,7 +165,13 @@ public Flux<ChangeEvent> stream(ChangeIdentifier from) {
log.trace("running cdc.query using parameters {}", params);
RxResult result = tx.run(CDC_QUERY_STATEMENT, params);

return Flux.from(result.records()).map(MapAccessor::asMap).map(ResultMapper::parseChangeEvent);
return current.flatMapMany(changeId -> Flux.from(result.records())
.map(MapAccessor::asMap)
.map(ResultMapper::parseChangeEvent)
.switchIfEmpty(Flux.defer(() -> {
cursor.set(changeId);
return Flux.empty();
})));
})),
RxSession::close);

Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/neo4j/cdc/client/ResultMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.neo4j.cdc.client.model.Event;
import org.neo4j.cdc.client.model.Metadata;

/**
* @author Gerrit Meier
*/
public final class ResultMapper {
private static final String ID_FIELD = "id";
private static final String TX_ID_FIELD = "txId";
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
/**
* The implementation can provide a session config that
* will be called each time before a session gets created.
*
* @author Gerrit Meier
*/
@FunctionalInterface
public interface SessionConfigSupplier {
Expand Down
3 changes: 0 additions & 3 deletions src/test/java/org/neo4j/cdc/client/CDCClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.test.StepVerifier;

/**
* @author Gerrit Meier
*/
@Testcontainers
public abstract class CDCClientIT {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.cdc.client;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.neo4j.cdc.client.model.ChangeIdentifier;
import org.neo4j.cdc.client.selector.EntitySelector;
import org.neo4j.driver.*;
import org.neo4j.driver.exceptions.ClientException;
import org.testcontainers.containers.Neo4jContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.test.StepVerifier;

@Testcontainers
public class CDCClientWithAggressiveLogPruningIT {
@Container
private static final Neo4jContainer<?> neo4j = new Neo4jContainer<>("neo4j:5-enterprise")
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
.withNeo4jConfig("db.tx_log.rotation.retention_policy", "3 files")
.withNeo4jConfig("db.tx_log.rotation.size", String.format("%d", 128 * 1024))
.withAdminPassword("passw0rd");

private static Driver driver;

@BeforeAll
static void setup() {
driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("neo4j", "passw0rd"));
}

@AfterAll
static void cleanup() {
driver.close();
}

@BeforeEach
void reset() {
try (var session = driver.session(SessionConfig.forDatabase("system"))) {
session.run(
"CREATE OR REPLACE DATABASE $db OPTIONS {txLogEnrichment: $mode} WAIT",
Map.of("db", "neo4j", "mode", "FULL"))
.consume();
}
}

@Test
void shouldFailWithInvalidChangeIdentifierAfterLogPruning() {
var current = current();

// make large changes that will cause at least 3 tx log file rotation
for (var i = 0; i < 1000; i++) {
try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
session.run(
"CREATE (n:Test) SET n.id = $id, n.data = $data",
Map.of("id", i, "data", RandomStringUtils.random(1024, true, true)));
}
}

// checkpoint
try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
session.run("CALL db.checkpoint()").consume();
}

StepVerifier.create(new CDCClient(driver, () -> SessionConfig.forDatabase("neo4j")).query(current))
.expectErrorSatisfies(t -> assertThat(t)
.isInstanceOf(ClientException.class)
.hasMessage("Unable to find the transaction entry for the given change identifier"))
.verify();
}

@Test
void shouldNotFailWithInvalidChangeIdentifierAfterLogPruningWhenStreaming()
throws ExecutionException, InterruptedException {
var executor = Executors.newSingleThreadExecutor();
var current = current();

var generateChanges = new AtomicBoolean(true);
var changes = executor.submit(() -> {
var counter = 0;

while (generateChanges.get()) {
// make changes
for (var j = 0; j < 10; j++) {
try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
session.run(
"CREATE (n:Test) SET n.id = $id, n.data = $data",
Map.of("id", counter * 100 + j, "data", RandomStringUtils.random(1024, true, true)),
TransactionConfig.builder()
.withMetadata(Map.of("app", "hr"))
.build());
}
}

// checkpoint
try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
session.run("CALL db.checkpoint()").consume();
}

counter++;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
});

try {
StepVerifier.create(new CDCClient(
driver,
() -> SessionConfig.forDatabase("neo4j"),
EntitySelector.builder()
.withTxMetadata(Map.of("app", "no-match"))
.build())
.stream(current))
.expectTimeout(Duration.ofMinutes(1))
.verify();
} finally {
generateChanges.set(false);
changes.get();
}
}

private ChangeIdentifier current() {
try (var session = driver.session(SessionConfig.forDatabase("neo4j"))) {
return new ChangeIdentifier(
session.run("CALL cdc.current()").single().get(0).asString());
}
}
}
3 changes: 0 additions & 3 deletions src/test/java/org/neo4j/cdc/client/ResultMapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import org.junit.jupiter.api.Test;
import org.neo4j.cdc.client.model.*;

/**
* @author Gerrit Meier
*/
public class ResultMapperTest {

@Test
Expand Down

0 comments on commit 1202fcf

Please sign in to comment.