diff --git a/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java b/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java index 56595bd..5b3a945 100644 --- a/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java +++ b/src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java @@ -21,6 +21,7 @@ import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.collections4.MapUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.neo4j.cdc.client.model.*; @@ -153,8 +154,19 @@ public boolean matches(ChangeEvent e) { if (executingUser != null && !e.getMetadata().getExecutingUser().equals(executingUser)) { return false; } - Object txMetadata = metadata.get(METADATA_KEY_TX_METADATA); - if (txMetadata != null && !e.getMetadata().getTxMetadata().equals(txMetadata)) { + var txMetadata = MapUtils.getMap(metadata, METADATA_KEY_TX_METADATA, emptyMap()).entrySet().stream() + .collect(Collectors.toMap( + entry -> { + if (entry.getKey() instanceof String) { + return (String) entry.getKey(); + } + + throw new IllegalArgumentException(String.format( + "expected map key to be a String but got '%s'.", + entry.getKey().getClass().getSimpleName())); + }, + entry -> (Object) entry.getValue())); + if (!e.getMetadata().getTxMetadata().entrySet().containsAll(txMetadata.entrySet())) { return false; } } @@ -253,8 +265,14 @@ public Map asMap() { if (!changesTo.isEmpty()) { result.put("changesTo", changesTo); } - if (!metadata.isEmpty()) { - result.put("metadata", metadata); + if (metadata.containsKey(METADATA_KEY_AUTHENTICATED_USER)) { + result.put("authenticatedUser", metadata.get(METADATA_KEY_AUTHENTICATED_USER)); + } + if (metadata.containsKey(METADATA_KEY_EXECUTING_USER)) { + result.put("executingUser", metadata.get(METADATA_KEY_EXECUTING_USER)); + } + if (metadata.containsKey(METADATA_KEY_TX_METADATA)) { + result.put("txMetadata", metadata.get(METADATA_KEY_TX_METADATA)); } return result; diff --git a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java index c1d1c25..8e4e097 100644 --- a/src/test/java/org/neo4j/cdc/client/CDCClientIT.java +++ b/src/test/java/org/neo4j/cdc/client/CDCClientIT.java @@ -16,8 +16,7 @@ */ package org.neo4j.cdc.client; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; +import static java.util.Collections.*; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -29,6 +28,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.neo4j.cdc.client.model.*; +import org.neo4j.cdc.client.selector.EntitySelector; import org.neo4j.cdc.client.selector.NodeSelector; import org.neo4j.cdc.client.selector.RelationshipNodeSelector; import org.neo4j.cdc.client.selector.RelationshipSelector; @@ -45,13 +45,12 @@ @Testcontainers public class CDCClientIT { - private static final String NEO4J_VERSION = "5.11"; + private static final String NEO4J_VERSION = "5"; @SuppressWarnings("resource") @Container private static final Neo4jContainer neo4j = new Neo4jContainer<>("neo4j:" + NEO4J_VERSION + "-enterprise") .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") - .withNeo4jConfig("internal.dbms.change_data_capture", "true") .withAdminPassword("passw0rd"); private static Driver driver; @@ -726,4 +725,207 @@ void selectorsDoFilteringCorrectly() { .verifyComplete(); } } + + @Test + void userSelectorsFilterCorrectly() { + // prepare + try (var session = driver.session()) { + session.run( + "CREATE OR REPLACE USER $user SET PLAINTEXT PASSWORD $pwd CHANGE NOT REQUIRED", + Map.of("user", "test", "pwd", "passw0rd")) + .consume(); + session.run("CREATE OR REPLACE ROLE $role", Map.of("role", "imp")).consume(); + session.run("GRANT IMPERSONATE ($user) ON DBMS TO $role", Map.of("user", "neo4j", "role", "imp")) + .consume(); + session.run("GRANT ROLE publisher, imp to $user", Map.of("user", "test")) + .consume(); + } + + // make changes with test user + try (var driver = GraphDatabase.driver(neo4j.getBoltUrl(), AuthTokens.basic("test", "passw0rd")); + var session = driver.session(); + var impersonatedSession = driver.session( + SessionConfig.builder().withImpersonatedUser("neo4j").build())) { + session.run("UNWIND range(1, 100) AS n CREATE (:Test {id: n})").consume(); + + // also make change with impersonation + impersonatedSession + .run("UNWIND range(1, 100) AS n CREATE (:Impersonated {id: n})") + .consume(); + } + + // make changes with neo4j user + try (var session = driver.session()) { + session.run("UNWIND range(1, 100) AS n CREATE (:Neo4j {id: n})").consume(); + } + + // verify authenticatedUser = test + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, + emptySet(), + Map.of(EntitySelector.METADATA_KEY_AUTHENTICATED_USER, "test"))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(200).allSatisfy(e -> assertThat(e) + .satisfies(c -> assertThat(c.getMetadata().getAuthenticatedUser()) + .isEqualTo("test")) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsAnyOf("Test", "Impersonated"))) + .verifyComplete(); + + // verify authenticatedUser = neo4j + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, + emptySet(), + Map.of(EntitySelector.METADATA_KEY_AUTHENTICATED_USER, "neo4j"))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e) + .satisfies(c -> assertThat(c.getMetadata().getAuthenticatedUser()) + .isEqualTo("neo4j")) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsAnyOf("Neo4j"))) + .verifyComplete(); + + // verify executingUser = neo4j + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "neo4j"))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(200).allSatisfy(e -> assertThat(e) + .satisfies(c -> + assertThat(c.getMetadata().getExecutingUser()).isEqualTo("neo4j")) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsAnyOf("Neo4j", "Impersonated"))) + .verifyComplete(); + + // verify executingUser = test + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "test"))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e) + .satisfies(c -> + assertThat(c.getMetadata().getExecutingUser()).isEqualTo("test")) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsAnyOf("Test"))) + .verifyComplete(); + + // verify authenticatedUser = test, executingUser = neo4j + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_AUTHENTICATED_USER, + "test", + EntitySelector.METADATA_KEY_EXECUTING_USER, + "neo4j"))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e) + .satisfies(c -> assertThat(c.getMetadata().getAuthenticatedUser()) + .isEqualTo("test")) + .satisfies(c -> + assertThat(c.getMetadata().getExecutingUser()).isEqualTo("neo4j")) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsAnyOf("Impersonated"))) + .verifyComplete(); + } + + @Test + void txMetadataSelectorFiltersCorrectly() { + try (var session = driver.session()) { + session.run( + "UNWIND range(1, 100) AS n CREATE (:Test {id: n})", + TransactionConfig.builder() + .withMetadata(Map.of("app", "Test")) + .build()) + .consume(); + } + + try (var session = driver.session()) { + session.run( + "UNWIND range(1, 100) AS n CREATE (:Other {id: n})", + TransactionConfig.builder() + .withMetadata(Map.of("app", "Other")) + .build()) + .consume(); + } + + try (var session = driver.session()) { + session.run( + "UNWIND range(1, 100) AS n CREATE (:Another {id: n})", + TransactionConfig.builder() + .withMetadata(Map.of("app", "Other", "appUser", "test")) + .build()) + .consume(); + } + + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, + emptySet(), + Map.of(EntitySelector.METADATA_KEY_TX_METADATA, Map.of("app", "Test")))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e) + .satisfies( + c -> assertThat(c.getMetadata().getTxMetadata()).contains(Map.entry("app", "Test"))) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsOnly("Test"))) + .verifyComplete(); + + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, + emptySet(), + Map.of(EntitySelector.METADATA_KEY_TX_METADATA, Map.of("app", "Other")))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(200).allSatisfy(e -> assertThat(e) + .satisfies( + c -> assertThat(c.getMetadata().getTxMetadata()).contains(Map.entry("app", "Other"))) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .containsAnyOf("Other", "Another") + .doesNotContain("Test"))) + .verifyComplete(); + + StepVerifier.create(new CDCClient( + driver, + new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of("app", "Other", "appUser", "test")))) + .query(current)) + .recordWith(ArrayList::new) + .thenConsumeWhile(x -> true) + .consumeRecordedWith(coll -> assertThat(coll).hasSize(100).allSatisfy(e -> assertThat(e) + .satisfies(c -> assertThat(c.getMetadata().getTxMetadata()) + .contains(Map.entry("app", "Other"), Map.entry("appUser", "test"))) + .extracting("event.after.labels", InstanceOfAssertFactories.list(String.class)) + .contains("Another") + .doesNotContain("Test", "Other"))) + .verifyComplete(); + } } diff --git a/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java b/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java index 0bcb816..75555b2 100644 --- a/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java +++ b/src/test/java/org/neo4j/cdc/client/selector/SelectorTest.java @@ -318,6 +318,122 @@ void relationshipSelectorMatches() { assertThat(new RelationshipSelector().matches(nodeCreateEvent())).isFalse(); } + @Test + void metadataSelectorMatches() { + List.of(nodeCreateEvent(), nodeUpdateEvent(), nodeDeleteEvent()).forEach(event -> { + assertThat(new EntitySelector( + null, emptySet(), Map.of(EntitySelector.METADATA_KEY_AUTHENTICATED_USER, "neo4j")) + .matches(event)) + .isTrue(); + + assertThat(new EntitySelector(null, emptySet(), Map.of(EntitySelector.METADATA_KEY_EXECUTING_USER, "test")) + .matches(event)) + .isTrue(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_AUTHENTICATED_USER, + "neo4j", + EntitySelector.METADATA_KEY_EXECUTING_USER, + "test")) + .matches(event)) + .isTrue(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of(EntitySelector.METADATA_KEY_TX_METADATA, Map.of("app.name", "my-super-app"))) + .matches(event)) + .isTrue(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of("app.name", "my-super-app", "app.version", "1.0"))) + .matches(event)) + .isTrue(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_AUTHENTICATED_USER, + "neo4j", + EntitySelector.METADATA_KEY_EXECUTING_USER, + "test", + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of("app.name", "my-super-app", "app.version", "1.0"))) + .matches(event)) + .isTrue(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of( + "app.name", + "my-super-app", + "app.version", + "1.0", + "app.user", + "unknown"))) + .matches(event)) + .isFalse(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_AUTHENTICATED_USER, + "neo4j", + EntitySelector.METADATA_KEY_EXECUTING_USER, + "test", + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of("app.name", "my-super-app", "app.version", "2.0"))) + .matches(event)) + .isFalse(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_AUTHENTICATED_USER, + "neo4j", + EntitySelector.METADATA_KEY_EXECUTING_USER, + "neo4j", + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of("app.name", "my-super-app", "app.version", "1.0"))) + .matches(event)) + .isFalse(); + + assertThat(new EntitySelector( + null, + emptySet(), + Map.of( + EntitySelector.METADATA_KEY_AUTHENTICATED_USER, + "neo4j", + EntitySelector.METADATA_KEY_EXECUTING_USER, + "test", + EntitySelector.METADATA_KEY_TX_METADATA, + Map.of( + "app.name", + "my-super-app", + "app.version", + "1.0", + "app.user", + "test", + "app.code", + "no-code"))) + .matches(event)) + .isFalse(); + }); + } + @Test void applyFiltersShouldArrangeProperties() { List.of(nodeCreateEvent(), relationshipCreateEvent()).forEach(event -> { @@ -454,7 +570,7 @@ private static ChangeEvent nodeCreateEvent() { 0, new Metadata( "neo4j", - "neo4j", + "test", "server-1", CaptureMode.DIFF, "bolt", @@ -462,7 +578,7 @@ private static ChangeEvent nodeCreateEvent() { "127.0.0.1:7687", ZonedDateTime.now().minusSeconds(5), ZonedDateTime.now(), - emptyMap(), + Map.of("app.name", "my-super-app", "app.version", "1.0"), emptyMap()), new NodeEvent( "db:1", @@ -482,7 +598,7 @@ private static ChangeEvent nodeDeleteEvent() { 0, new Metadata( "neo4j", - "neo4j", + "test", "server-1", CaptureMode.DIFF, "bolt", @@ -490,7 +606,7 @@ private static ChangeEvent nodeDeleteEvent() { "127.0.0.1:7687", ZonedDateTime.now().minusSeconds(5), ZonedDateTime.now(), - Map.of("txMetadata.app", "neo4j-browser"), + Map.of("app.name", "my-super-app", "app.version", "1.0", "app.user", "test"), emptyMap()), new NodeEvent( "db:1", @@ -510,7 +626,7 @@ private static ChangeEvent nodeUpdateEvent() { 0, new Metadata( "neo4j", - "neo4j", + "test", "server-1", CaptureMode.DIFF, "bolt", @@ -518,7 +634,7 @@ private static ChangeEvent nodeUpdateEvent() { "127.0.0.1:7687", ZonedDateTime.now().minusSeconds(5), ZonedDateTime.now(), - Map.of("txMetadata.app", "neo4j-browser"), + Map.of("app.name", "my-super-app", "app.version", "1.0", "app.user", "test"), emptyMap()), new NodeEvent( "db:1", @@ -540,7 +656,7 @@ private static ChangeEvent relationshipCreateEvent() { 1, new Metadata( "neo4j", - "neo4j", + "test", "server-1", CaptureMode.DIFF, "bolt", @@ -548,7 +664,7 @@ private static ChangeEvent relationshipCreateEvent() { "127.0.0.1:7687", ZonedDateTime.now().minusSeconds(5), ZonedDateTime.now(), - Map.of("txMetadata.app", "neo4j-browser"), + Map.of("app.name", "my-super-app", "app.version", "1.0"), emptyMap()), new RelationshipEvent( "db:2", @@ -569,7 +685,7 @@ private static ChangeEvent relationshipDeleteEvent() { 1, new Metadata( "neo4j", - "neo4j", + "test", "server-1", CaptureMode.DIFF, "bolt", @@ -577,7 +693,7 @@ private static ChangeEvent relationshipDeleteEvent() { "127.0.0.1:7687", ZonedDateTime.now().minusSeconds(5), ZonedDateTime.now(), - Map.of("txMetadata.app", "neo4j-browser"), + Map.of("app.name", "my-super-app", "app.version", "1.0", "app.user", "another"), emptyMap()), new RelationshipEvent( "db:2", @@ -598,7 +714,7 @@ private static ChangeEvent relationshipUpdateEvent() { 1, new Metadata( "neo4j", - "neo4j", + "test", "server-1", CaptureMode.DIFF, "bolt", @@ -606,7 +722,7 @@ private static ChangeEvent relationshipUpdateEvent() { "127.0.0.1:7687", ZonedDateTime.now().minusSeconds(5), ZonedDateTime.now(), - Map.of("txMetadata.app", "neo4j-browser"), + Map.of("app.name", "my-super-app", "app.version", "1.0", "app.user", "another"), emptyMap()), new RelationshipEvent( "db:2",