Skip to content

Commit

Permalink
test: add integration and unit tests for metadata selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Nov 15, 2023
1 parent 1939625 commit 5f4b377
Show file tree
Hide file tree
Showing 3 changed files with 356 additions and 20 deletions.
26 changes: 22 additions & 4 deletions src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.cdc.client.model.*;
Expand Down Expand Up @@ -153,8 +154,19 @@ public boolean matches(ChangeEvent e) {
if (executingUser != null && !e.getMetadata().getExecutingUser().equals(executingUser)) {
return false;
}
Object txMetadata = metadata.get(METADATA_KEY_TX_METADATA);
if (txMetadata != null && !e.getMetadata().getTxMetadata().equals(txMetadata)) {
var txMetadata = MapUtils.getMap(metadata, METADATA_KEY_TX_METADATA, emptyMap()).entrySet().stream()
.collect(Collectors.toMap(
entry -> {
if (entry.getKey() instanceof String) {
return (String) entry.getKey();
}

throw new IllegalArgumentException(String.format(
"expected map key to be a String but got '%s'.",
entry.getKey().getClass().getSimpleName()));
},
entry -> (Object) entry.getValue()));
if (!e.getMetadata().getTxMetadata().entrySet().containsAll(txMetadata.entrySet())) {
return false;
}
}
Expand Down Expand Up @@ -253,8 +265,14 @@ public Map<String, Object> asMap() {
if (!changesTo.isEmpty()) {
result.put("changesTo", changesTo);
}
if (!metadata.isEmpty()) {
result.put("metadata", metadata);
if (metadata.containsKey(METADATA_KEY_AUTHENTICATED_USER)) {
result.put("authenticatedUser", metadata.get(METADATA_KEY_AUTHENTICATED_USER));
}
if (metadata.containsKey(METADATA_KEY_EXECUTING_USER)) {
result.put("executingUser", metadata.get(METADATA_KEY_EXECUTING_USER));
}
if (metadata.containsKey(METADATA_KEY_TX_METADATA)) {
result.put("txMetadata", metadata.get(METADATA_KEY_TX_METADATA));
}

return result;
Expand Down
210 changes: 206 additions & 4 deletions src/test/java/org/neo4j/cdc/client/CDCClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.neo4j.cdc.client;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand All @@ -29,6 +28,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.neo4j.cdc.client.model.*;
import org.neo4j.cdc.client.selector.EntitySelector;
import org.neo4j.cdc.client.selector.NodeSelector;
import org.neo4j.cdc.client.selector.RelationshipNodeSelector;
import org.neo4j.cdc.client.selector.RelationshipSelector;
Expand All @@ -45,13 +45,12 @@
@Testcontainers
public class CDCClientIT {

private static final String NEO4J_VERSION = "5.11";
private static final String NEO4J_VERSION = "5";

@SuppressWarnings("resource")
@Container
private static final Neo4jContainer<?> neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise")
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
.withNeo4jConfig("internal.dbms.change_data_capture", "true")
.withAdminPassword("passw0rd");

private static Driver driver;
Expand Down Expand Up @@ -726,4 +725,207 @@ void selectorsDoFilteringCorrectly() {
.verifyComplete();
}
}

@Test
void userSelectorsFilterCorrectly() {
// prepare
try (var session = driver.session()) {
session.run(
"CREATE OR REPLACE USER $user SET PLAINTEXT PASSWORD $pwd CHANGE NOT REQUIRED",
Map.of("user", "test", "pwd", "passw0rd"))
.consume();
session.run("CREATE OR REPLACE ROLE $role", Map.of("role", "imp")).consume();
session.run("GRANT IMPERSONATE ($user) ON DBMS TO $role", Map.of("user", "neo4j", "role", "imp"))
.consume();
session.run("GRANT ROLE publisher, imp to $user", Map.of("user", "test"))
.consume();
}

// make changes with test user
try (var driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("test", "passw0rd"));
var session = driver.session();
var impersonatedSession = driver.session(
SessionConfig.builder().withImpersonatedUser("neo4j").build())) {
session.run("UNWIND range(1, 100) AS n CREATE (:Test {id: n})").consume();

// also make change with impersonation
impersonatedSession
.run("UNWIND range(1, 100) AS n CREATE (:Impersonated {id: n})")
.consume();
}

// make changes with neo4j user
try (var session = driver.session()) {
session.run("UNWIND range(1, 100) AS n CREATE (:Neo4j {id: n})").consume();
}

// verify authenticatedUser = test
StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null,
emptySet(),
Map.of(EntitySelector.METADATA_KEY_AUTHENTICATED_USER, "test")))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(200).allSatisfy(e -> assertThat(e)
.satisfies(c -> assertThat(c.getMetadata().getAuthenticatedUser())
.isEqualTo("test"))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsAnyOf("Test", "Impersonated")))
.verifyComplete();

// verify authenticatedUser = neo4j
StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null,
emptySet(),
Map.of(EntitySelector.METADATA_KEY_AUTHENTICATED_USER, "neo4j")))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e)
.satisfies(c -> assertThat(c.getMetadata().getAuthenticatedUser())
.isEqualTo("neo4j"))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsAnyOf("Neo4j")))
.verifyComplete();

// verify executingUser = neo4j
StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "neo4j")))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(200).allSatisfy(e -> assertThat(e)
.satisfies(c ->
assertThat(c.getMetadata().getExecutingUser()).isEqualTo("neo4j"))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsAnyOf("Neo4j", "Impersonated")))
.verifyComplete();

// verify executingUser = test
StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "test")))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e)
.satisfies(c ->
assertThat(c.getMetadata().getExecutingUser()).isEqualTo("test"))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsAnyOf("Test")))
.verifyComplete();

// verify authenticatedUser = test, executingUser = neo4j
StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null,
emptySet(),
Map.of(
EntitySelector.METADATA_KEY_AUTHENTICATED_USER,
"test",
EntitySelector.METADATA_KEY_EXECUTING_USER,
"neo4j")))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e)
.satisfies(c -> assertThat(c.getMetadata().getAuthenticatedUser())
.isEqualTo("test"))
.satisfies(c ->
assertThat(c.getMetadata().getExecutingUser()).isEqualTo("neo4j"))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsAnyOf("Impersonated")))
.verifyComplete();
}

@Test
void txMetadataSelectorFiltersCorrectly() {
try (var session = driver.session()) {
session.run(
"UNWIND range(1, 100) AS n CREATE (:Test {id: n})",
TransactionConfig.builder()
.withMetadata(Map.of("app", "Test"))
.build())
.consume();
}

try (var session = driver.session()) {
session.run(
"UNWIND range(1, 100) AS n CREATE (:Other {id: n})",
TransactionConfig.builder()
.withMetadata(Map.of("app", "Other"))
.build())
.consume();
}

try (var session = driver.session()) {
session.run(
"UNWIND range(1, 100) AS n CREATE (:Another {id: n})",
TransactionConfig.builder()
.withMetadata(Map.of("app", "Other", "appUser", "test"))
.build())
.consume();
}

StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null,
emptySet(),
Map.of(EntitySelector.METADATA_KEY_TX_METADATA, Map.of("app", "Test"))))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e)
.satisfies(
c -> assertThat(c.getMetadata().getTxMetadata()).contains(Map.entry("app", "Test")))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsOnly("Test")))
.verifyComplete();

StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null,
emptySet(),
Map.of(EntitySelector.METADATA_KEY_TX_METADATA, Map.of("app", "Other"))))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(200).allSatisfy(e -> assertThat(e)
.satisfies(
c -> assertThat(c.getMetadata().getTxMetadata()).contains(Map.entry("app", "Other")))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.containsAnyOf("Other", "Another")
.doesNotContain("Test")))
.verifyComplete();

StepVerifier.create(new CDCClient(
driver,
new EntitySelector(
null,
emptySet(),
Map.of(
EntitySelector.METADATA_KEY_TX_METADATA,
Map.of("app", "Other", "appUser", "test"))))
.query(current))
.recordWith(ArrayList::new)
.thenConsumeWhile(x -> true)
.consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e)
.satisfies(c -> assertThat(c.getMetadata().getTxMetadata())
.contains(Map.entry("app", "Other"), Map.entry("appUser", "test")))
.extracting("event.after.labels", InstanceOfAssertFactories.list(String.class))
.contains("Another")
.doesNotContain("Test", "Other")))
.verifyComplete();
}
}
Loading

0 comments on commit 5f4b377

Please sign in to comment.