Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add methods to provide additional/finer config params #5

Merged
merged 5 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/main/java/org/neo4j/cdc/client/model/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Metadata {
private static final String CONNECTION_SERVER = "connectionServer";
private static final String TX_START_TIME = "txStartTime";
private static final String TX_COMMIT_TIME = "txCommitTime";
private static final String TX_METADATA = "txMetadata";
private static final List<String> KNOWN_KEYS = List.of(
EXECUTING_USER,
AUTHENTICATED_USER,
Expand All @@ -42,7 +43,8 @@ public class Metadata {
CAPTURE_MODE,
SERVER_ID,
TX_COMMIT_TIME,
TX_START_TIME);
TX_START_TIME,
TX_METADATA);

private final String executingUser;
private final String connectionClient;
Expand All @@ -53,6 +55,7 @@ public class Metadata {
private final String connectionServer;
private final ZonedDateTime txStartTime;
private final ZonedDateTime txCommitTime;
private final Map<String, Object> txMetadata;
private final Map<String, Object> additionalEntries;

public Metadata(
Expand All @@ -65,6 +68,7 @@ public Metadata(
String connectionServer,
ZonedDateTime txStartTime,
ZonedDateTime txCommitTime,
Map<String, Object> txMetadata,
Map<String, Object> additionalEntries) {
this.executingUser = executingUser;
this.connectionClient = connectionClient;
Expand All @@ -75,6 +79,7 @@ public Metadata(
this.connectionServer = connectionServer;
this.txStartTime = Objects.requireNonNull(txStartTime);
this.txCommitTime = Objects.requireNonNull(txCommitTime);
this.txMetadata = txMetadata;
this.additionalEntries = additionalEntries;
}

Expand Down Expand Up @@ -118,6 +123,10 @@ public Map<String, Object> getAdditionalEntries() {
return additionalEntries;
}

public Map<String, Object> getTxMetadata() {
return txMetadata;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -168,6 +177,7 @@ public String toString() {
additionalEntries);
}

@SuppressWarnings("unchecked")
public static Metadata fromMap(Map<?, ?> map) {
var cypherMap = ModelUtils.checkedMap(Objects.requireNonNull(map), String.class, Object.class);

Expand All @@ -180,6 +190,7 @@ public static Metadata fromMap(Map<?, ?> map) {
var connectionServer = MapUtils.getString(cypherMap, CONNECTION_SERVER);
var txStartTime = ModelUtils.getZonedDateTime(cypherMap, TX_START_TIME);
var txCommitTime = ModelUtils.getZonedDateTime(cypherMap, TX_COMMIT_TIME);
var txMetadata = (Map<String, Object>) MapUtils.getMap(cypherMap, TX_METADATA);
var unknownEntries = cypherMap.entrySet().stream()
.filter(e -> !KNOWN_KEYS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -194,6 +205,7 @@ public static Metadata fromMap(Map<?, ?> map) {
connectionServer,
txStartTime,
txCommitTime,
txMetadata,
unknownEntries);
}
}
28 changes: 26 additions & 2 deletions src/main/java/org/neo4j/cdc/client/pattern/NodePattern.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.neo4j.cdc.client.pattern;

import java.util.Collections;
import static java.util.Collections.emptySet;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.neo4j.cdc.client.model.EntityOperation;
import org.neo4j.cdc.client.selector.NodeSelector;
import org.neo4j.cdc.client.selector.Selector;

Expand All @@ -37,6 +40,12 @@ public class NodePattern implements Pattern {
@NotNull
private final Set<String> excludeProperties;

private final Map<String, Object> metadata = new HashMap<>();

private EntityOperation entityOperation;

private Set<String> changesTo = emptySet();

public NodePattern(
@NotNull Set<String> labels,
@NotNull Map<String, Object> keyFilters,
Expand Down Expand Up @@ -99,6 +108,21 @@ public String toString() {
@Override
public Set<Selector> toSelector() {
return Set.of(new NodeSelector(
null, Collections.emptySet(), labels, keyFilters, includeProperties, excludeProperties));
entityOperation, changesTo, labels, keyFilters, includeProperties, excludeProperties, metadata));
}

@Override
public void withOperation(EntityOperation operation) {
this.entityOperation = operation;
}

@Override
public void withMetadata(Map<String, Object> metadata) {
this.metadata.putAll(metadata);
}

@Override
public void withChangesTo(Set<String> changesTo) {
this.changesTo = changesTo;
}
}
8 changes: 8 additions & 0 deletions src/main/java/org/neo4j/cdc/client/pattern/Pattern.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
package org.neo4j.cdc.client.pattern;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.neo4j.cdc.client.model.EntityOperation;
import org.neo4j.cdc.client.selector.Selector;

public interface Pattern {

Set<Selector> toSelector();

void withOperation(EntityOperation operation);

void withMetadata(Map<String, Object> metadata);

void withChangesTo(Set<String> changesTo);

static List<Pattern> parse(String expression) {
return Visitors.parse(expression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.neo4j.cdc.client.pattern;

import static java.util.Collections.emptySet;

import java.util.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.cdc.client.model.EntityOperation;
import org.neo4j.cdc.client.selector.RelationshipNodeSelector;
import org.neo4j.cdc.client.selector.RelationshipSelector;
import org.neo4j.cdc.client.selector.Selector;
Expand All @@ -44,6 +47,12 @@ public class RelationshipPattern implements Pattern {
@NotNull
private final Set<String> excludeProperties;

private final Map<String, Object> metadata = new HashMap<>();

private EntityOperation entityOperation;

private Set<String> changesTo = emptySet();

public RelationshipPattern(
@Nullable String type,
@NotNull NodePattern start,
Expand Down Expand Up @@ -135,27 +144,44 @@ public Set<Selector> toSelector() {
var result = new HashSet<Selector>();

result.add(new RelationshipSelector(
null,
Collections.emptySet(),
entityOperation,
changesTo,
type,
new RelationshipNodeSelector(start.getLabels(), start.getKeyFilters()),
new RelationshipNodeSelector(end.getLabels(), end.getKeyFilters()),
keyFilters,
includeProperties,
excludeProperties));
excludeProperties,
metadata));

if (bidirectional) {
result.add(new RelationshipSelector(
null,
Collections.emptySet(),
entityOperation,
changesTo,
type,
new RelationshipNodeSelector(end.getLabels(), end.getKeyFilters()),
new RelationshipNodeSelector(start.getLabels(), start.getKeyFilters()),
keyFilters,
includeProperties,
excludeProperties));
excludeProperties,
metadata));
}

return result;
}

@Override
public void withOperation(EntityOperation operation) {
this.entityOperation = operation;
}

@Override
public void withMetadata(Map<String, Object> metadata) {
this.metadata.putAll(metadata);
}

@Override
public void withChangesTo(Set<String> changesTo) {
this.changesTo = changesTo;
}
}
64 changes: 61 additions & 3 deletions src/main/java/org/neo4j/cdc/client/selector/EntitySelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@
*/
package org.neo4j.cdc.client.selector;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.cdc.client.model.*;

public class EntitySelector implements Selector {

public static final String METADATA_KEY_AUTHENTICATED_USER = "authenticatedUser";
public static final String METADATA_KEY_EXECUTING_USER = "executingUser";
public static final String METADATA_KEY_TX_METADATA = "txMetadata";

@Nullable
private final EntityOperation change;

Expand All @@ -37,27 +44,37 @@ public class EntitySelector implements Selector {
@NotNull
private final Set<String> excludeProperties;

@NotNull
private final Map<String, Object> metadata;

public EntitySelector() {
this(null);
}

public EntitySelector(@Nullable EntityOperation change) {
this(change, emptySet());
this(change, emptySet(), emptyMap());
}

public EntitySelector(@Nullable EntityOperation change, @NotNull Set<String> changesTo) {
this(change, changesTo, emptySet(), emptySet());
this(change, changesTo, emptySet(), emptySet(), emptyMap());
}

public EntitySelector(
@Nullable EntityOperation change, @NotNull Set<String> changesTo, @NotNull Map<String, Object> metadata) {
this(change, changesTo, emptySet(), emptySet(), metadata);
}

public EntitySelector(
@Nullable EntityOperation change,
@NotNull Set<String> changesTo,
@NotNull Set<String> includeProperties,
@NotNull Set<String> excludeProperties) {
@NotNull Set<String> excludeProperties,
@NotNull Map<String, Object> metadata) {
this.change = change;
this.changesTo = Objects.requireNonNull(changesTo);
this.includeProperties = Objects.requireNonNull(includeProperties);
this.excludeProperties = Objects.requireNonNull(excludeProperties);
this.metadata = metadata;
}

public @Nullable EntityOperation getChange() {
Expand All @@ -76,6 +93,10 @@ public EntitySelector(
return excludeProperties;
}

public @NotNull Map<String, Object> getMetadata() {
return metadata;
}

@SuppressWarnings("unchecked")
@Override
public boolean matches(ChangeEvent e) {
Expand Down Expand Up @@ -123,6 +144,32 @@ public boolean matches(ChangeEvent e) {
break;
}
}
if (!metadata.isEmpty()) {
Object authenticatedUser = metadata.get(METADATA_KEY_AUTHENTICATED_USER);
if (authenticatedUser != null
&& !e.getMetadata().getAuthenticatedUser().equals(authenticatedUser)) {
return false;
}
Object executingUser = metadata.get(METADATA_KEY_EXECUTING_USER);
if (executingUser != null && !e.getMetadata().getExecutingUser().equals(executingUser)) {
return false;
}
var txMetadata = MapUtils.getMap(metadata, METADATA_KEY_TX_METADATA, emptyMap()).entrySet().stream()
.collect(Collectors.toMap(
entry -> {
if (entry.getKey() instanceof String) {
return (String) entry.getKey();
}

throw new IllegalArgumentException(String.format(
"expected map key to be a String but got '%s'.",
entry.getKey().getClass().getSimpleName()));
},
entry -> (Object) entry.getValue()));
if (!e.getMetadata().getTxMetadata().entrySet().containsAll(txMetadata.entrySet())) {
return false;
}
}

return true;
}
Expand Down Expand Up @@ -218,6 +265,15 @@ public Map<String, Object> asMap() {
if (!changesTo.isEmpty()) {
result.put("changesTo", changesTo);
}
if (metadata.containsKey(METADATA_KEY_AUTHENTICATED_USER)) {
result.put("authenticatedUser", metadata.get(METADATA_KEY_AUTHENTICATED_USER));
}
if (metadata.containsKey(METADATA_KEY_EXECUTING_USER)) {
result.put("executingUser", metadata.get(METADATA_KEY_EXECUTING_USER));
}
if (metadata.containsKey(METADATA_KEY_TX_METADATA)) {
result.put("txMetadata", metadata.get(METADATA_KEY_TX_METADATA));
}

return result;
}
Expand All @@ -232,6 +288,7 @@ public boolean equals(Object o) {
if (change != that.change) return false;
if (!changesTo.equals(that.changesTo)) return false;
if (!includeProperties.equals(that.includeProperties)) return false;
if (!metadata.equals(that.metadata)) return false;
return excludeProperties.equals(that.excludeProperties);
}

Expand All @@ -241,6 +298,7 @@ public int hashCode() {
result = 31 * result + changesTo.hashCode();
result = 31 * result + includeProperties.hashCode();
result = 31 * result + excludeProperties.hashCode();
result = 31 * result + metadata.hashCode();
return result;
}
}
Loading