Skip to content

Commit

Permalink
feat: add sessionConfig supplier (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
meistermeier committed Oct 2, 2023
1 parent 8b69ce7 commit bb3af1e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
24 changes: 21 additions & 3 deletions src/main/java/org/neo4j/cdc/client/CDCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.cdc.client.model.ChangeIdentifier;
import org.neo4j.cdc.client.selector.Selector;
import org.neo4j.driver.Driver;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.types.MapAccessor;
Expand All @@ -46,6 +47,7 @@ public class CDCClient implements CDCService {
private static final String CDC_QUERY_STATEMENT = "call cdc.query($from, $selectors)";
private final Driver driver;
private final List<Selector> selectors;
private final SessionConfigSupplier sessionConfigSupplier;
private final Duration streamingPollInterval;

public CDCClient(Driver driver, Selector... selectors) {
Expand All @@ -54,6 +56,22 @@ public CDCClient(Driver driver, Selector... selectors) {

public CDCClient(Driver driver, Duration streamingPollInterval, Selector... selectors) {
this.driver = Objects.requireNonNull(driver);
this.sessionConfigSupplier = () -> SessionConfig.builder().build();
this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval);
this.selectors = selectors == null ? List.of() : Arrays.asList(selectors);
}

public CDCClient(Driver driver, SessionConfigSupplier sessionConfigSupplier, Selector... selectors) {
this(driver, sessionConfigSupplier, Duration.ofSeconds(1), selectors);
}

public CDCClient(
Driver driver,
SessionConfigSupplier sessionConfigSupplier,
Duration streamingPollInterval,
Selector... selectors) {
this.driver = Objects.requireNonNull(driver);
this.sessionConfigSupplier = sessionConfigSupplier;
this.streamingPollInterval = Objects.requireNonNull(streamingPollInterval);
this.selectors = selectors == null ? List.of() : Arrays.asList(selectors);
}
Expand All @@ -71,7 +89,7 @@ public Mono<ChangeIdentifier> current() {
@Override
public Flux<ChangeEvent> query(ChangeIdentifier from) {
return Flux.usingWhen(
Mono.fromSupplier(driver::rxSession),
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
var params = Map.of(
"from",
Expand All @@ -97,7 +115,7 @@ public Flux<ChangeEvent> stream(ChangeIdentifier from) {
var cursor = new AtomicReference<>(from);

var query = Flux.usingWhen(
Mono.fromSupplier(driver::rxSession),
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Flux.from(session.readTransaction(tx -> {
var params = Map.of(
"from",
Expand Down Expand Up @@ -136,7 +154,7 @@ private ChangeEvent applyPropertyFilters(ChangeEvent original) {

private Mono<ChangeIdentifier> queryForChangeIdentifier(String query, String description) {
return Mono.usingWhen(
Mono.fromSupplier(driver::rxSession),
Mono.fromSupplier(() -> driver.rxSession(sessionConfigSupplier.sessionConfig())),
(RxSession session) -> Mono.from(session.readTransaction(tx -> {
RxResult result = tx.run(query);
return Mono.from(result.records())
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/org/neo4j/cdc/client/SessionConfigSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.cdc.client;

import org.neo4j.driver.SessionConfig;

/**
* The implementation can provide a session config that
* will be called each time before a session gets created.
*
* @author Gerrit Meier
*/
@FunctionalInterface
public interface SessionConfigSupplier {

/**
* {@link SessionConfig} to be used with the current session.
*
* @return sessionConfig object.
*/
SessionConfig sessionConfig();
}
11 changes: 11 additions & 0 deletions src/test/java/org/neo4j/cdc/client/CDCClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.cdc.client.selector.RelationshipNodeSelector;
import org.neo4j.cdc.client.selector.RelationshipSelector;
import org.neo4j.driver.*;
import org.neo4j.driver.exceptions.FatalDiscoveryException;
import org.testcontainers.containers.Neo4jContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand Down Expand Up @@ -114,6 +115,16 @@ void changesCanBeQueried() {
.verifyComplete();
}

@Test
void respectsSessionConfigSupplier() {
var client = new CDCClient(
driver,
() -> SessionConfig.builder().withDatabase("unknownDatabase").build());
StepVerifier.create(client.current())
.expectError(FatalDiscoveryException.class)
.verify();
}

@Test
void shouldReturnCypherTypesWithoutConversion() {
var client = new CDCClient(driver, Duration.ZERO);
Expand Down

0 comments on commit bb3af1e

Please sign in to comment.