From c87936f05ce83c0c00070f203f6c633867fefd07 Mon Sep 17 00:00:00 2001 From: Swa-aham Date: Thu, 23 May 2024 23:06:09 +0530 Subject: [PATCH] filters added --- connectors/filtering/__init__.py | 5 + connectors/filtering/basic_rule.py | 352 ++++++++++++++++++++++++++++ connectors/filtering/validation.py | 357 +++++++++++++++++++++++++++++ 3 files changed, 714 insertions(+) create mode 100644 connectors/filtering/__init__.py create mode 100644 connectors/filtering/basic_rule.py create mode 100644 connectors/filtering/validation.py diff --git a/connectors/filtering/__init__.py b/connectors/filtering/__init__.py new file mode 100644 index 0000000..1fa99ac --- /dev/null +++ b/connectors/filtering/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# diff --git a/connectors/filtering/basic_rule.py b/connectors/filtering/basic_rule.py new file mode 100644 index 0000000..301339a --- /dev/null +++ b/connectors/filtering/basic_rule.py @@ -0,0 +1,352 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# + +import datetime +import re +from enum import Enum + +from dateutil.parser import ParserError, parser + +from connectors.logger import logger +from connectors.utils import Format, shorten_str + +IS_BOOL_FALSE = re.compile("^(false|f|no|n|off)$", re.I) +IS_BOOL_TRUE = re.compile("^(true|t|yes|y|on)$", re.I) + + +def parse(basic_rules_json): + """Parse a basic rules json array to BasicRule objects. + + Arguments: + - `basic_rules_json`: an array of dicts or an empty array + + The parser works in the following way: + - Map every raw basic rule in the json array to the corresponding BasicRule object + - Filter out every basic rule, which returns true for is_default_rule() + - Sort the result in ascending order according to their basic rule order (rules are executed in ascending order) + """ + if not basic_rules_json: + return [] + + map_to_basic_rules_list = [ + BasicRule.from_json(basic_rule_json) for basic_rule_json in basic_rules_json + ] + + return sorted( + filter( + lambda basic_rule: not basic_rule.is_default_rule(), + map_to_basic_rules_list, + ), + key=lambda basic_rule: basic_rule.order, + ) + + +def to_float(value): + try: + return float(value) + except ValueError: + return value + + +def to_datetime(value): + try: + date_parser = parser() + parsed_date_or_datetime = date_parser.parse(timestr=value) + + if isinstance(parsed_date_or_datetime, datetime.datetime): + return parsed_date_or_datetime + elif isinstance(parsed_date_or_datetime, datetime.date): + # adds 00:00 to the date and returns datetime + return datetime.datetime.combine(parsed_date_or_datetime, datetime.time.min) + + return value + + except ParserError: + return value + + +def to_bool(value): + if len(value) == 0 or IS_BOOL_FALSE.match(value): + return False + + if IS_BOOL_TRUE.match(value): + return True + + return value + + +class RuleMatchStats: + """RuleMatchStats records how many documents a basic rule matched and which policy it used. + + It's an internal class and is not expected to be used outside the module. + """ + + def __init__(self, policy, matches_count): + self.policy = policy + self.matches_count = matches_count + + def __add__(self, other): + if other is None: + return self + + if isinstance(other, int): + return RuleMatchStats( + policy=self.policy, matches_count=self.matches_count + other + ) + else: + msg = f"__add__ is not implemented for '{type(other)}'" + raise NotImplementedError(msg) + + def __eq__(self, other): + return self.policy == other.policy and self.matches_count == other.matches_count + + +class BasicRuleEngine: + """BasicRuleEngine matches a document against a list of basic rules in order. + + The main concern of the engine is to decide, whether a document should be ingested during a sync or not: + - If a document matches a basic rule and the basic rule uses the `INCLUDE` policy the document will be ingested + - If a document matches a basic rule and the basic rule uses the `EXCLUDE` policy the document won't be ingested + + It also records stats, which basic rule matched how many documents with a certain policy. + """ + + def __init__(self, rules): + self.rules = rules + self.rules_match_stats = { + BasicRule.DEFAULT_RULE_ID: RuleMatchStats(Policy.INCLUDE, 0) + } + + def should_ingest(self, document): + """Check, whether a document should be ingested or not. + + By default, the document will be ingested, if it doesn't match any rule. + + Arguments: + - `document`: document matched against the basic rules + """ + if not self.rules: + self.rules_match_stats[BasicRule.DEFAULT_RULE_ID] += 1 + return True + + for rule in self.rules: + if not rule: + continue + + if rule.matches(document): + logger.debug( + f"Document (id: '{document.get('id')}') matched basic rule (id: '{rule.id_}'). Document will be {rule.policy.value}d" + ) + + self.rules_match_stats.setdefault( + rule.id_, RuleMatchStats(rule.policy, 0) + ) + self.rules_match_stats[rule.id_] += 1 + + return rule.is_include() + + # default behavior: ingest document, if no rule matches ("default rule") + self.rules_match_stats[BasicRule.DEFAULT_RULE_ID] += 1 + logger.debug( + f"Document (id: '{document.get('id')}') didn't match any basic rule. Document will be included" + ) + return True + + +class InvalidRuleError(ValueError): + pass + + +class Rule(Enum): + EQUALS = "equals" + STARTS_WITH = "starts_with" + ENDS_WITH = "ends_with" + CONTAINS = "contains" + REGEX = "regex" + GREATER_THAN = "greater_than" + LESS_THAN = "less_than" + + RULES = [EQUALS, STARTS_WITH, ENDS_WITH, CONTAINS, REGEX, GREATER_THAN, LESS_THAN] + + @classmethod + def is_string_rule(cls, string): + try: + cls.from_string(string) + return True + except InvalidRuleError: + return False + + @classmethod + def from_string(cls, string): + match string.casefold(): + case "equals": + return Rule.EQUALS + case "contains": + return Rule.CONTAINS + case "ends_with": + return Rule.ENDS_WITH + case ">": + return Rule.GREATER_THAN + case "<": + return Rule.LESS_THAN + case "regex": + return Rule.REGEX + case "starts_with": + return Rule.STARTS_WITH + case _: + msg = f"'{string}' is an unknown value for the enum Rule. Allowed rules: {Rule.RULES}." + raise InvalidRuleError(msg) + + +class InvalidPolicyError(ValueError): + pass + + +class Policy(Enum): + INCLUDE = "include" + EXCLUDE = "exclude" + + POLICIES = [INCLUDE, EXCLUDE] + + @classmethod + def is_string_policy(cls, string): + try: + cls.from_string(string) + return True + except InvalidPolicyError: + return False + + @classmethod + def from_string(cls, string): + match string.casefold(): + case "include": + return Policy.INCLUDE + case "exclude": + return Policy.EXCLUDE + case _: + msg = f"'{string}' is an unknown value for the enum Policy. Allowed policies: {Policy.POLICIES}" + raise InvalidPolicyError(msg) + + +class BasicRule: + """A BasicRule is used to match documents based on different comparisons (see `matches` method).""" + + DEFAULT_RULE_ID = "DEFAULT" + SHORTEN_UUID_BY = 26 # UUID: 32 random chars + 4 hyphens; keep 10 characters + + def __init__(self, id_, order, policy, field, rule, value): + self.id_ = id_ + self.order = order + self.policy = policy + self.field = field + self.rule = rule + self.value = value + + @classmethod + def from_json(cls, basic_rule_json): + return cls( + id_=basic_rule_json["id"], + order=basic_rule_json["order"], + policy=Policy.from_string(basic_rule_json["policy"]), + field=basic_rule_json["field"], + rule=Rule.from_string(basic_rule_json["rule"]), + value=basic_rule_json["value"], + ) + + def matches(self, document): + """Check whether a document matches the basic rule. + + A basic rule matches or doesn't match a document based on the following comparisons: + - STARTS_WITH: Does the document's field value start with the basic rule's value? + - ENDS_WITH: Does the document's field value end with the basic rule's value? + - CONTAINS: Does the document's field value contain the basic rule's value? + - REGEX: Does the document's field value match the basic rule's regex? + - LESS_THAN: Is the document's field value less than the basic rule's value? + - GREATER_THAN: Is the document's field value greater than the basic rule's value? + - EQUALS: Is the document's field value equal to the basic rule's value? + + If the basic rule is the default rule it's always a match (the default rule matches every document). + If the field is not in the document it's always a no match. + + Arguments: + - `document`: document to check, if it matches + """ + if self.is_default_rule(): + return True + + if self.field not in document: + return False + + document_value = document[self.field] + coerced_rule_value = self.coerce_rule_value_based_on_document_value( + document_value + ) + + match self.rule: + case Rule.STARTS_WITH: + return str(document_value).startswith(self.value) + case Rule.ENDS_WITH: + return str(document_value).endswith(self.value) + case Rule.CONTAINS: + return self.value in str(document_value) + case Rule.REGEX: + return re.match(self.value, str(document_value)) is not None + case Rule.LESS_THAN: + return document_value < coerced_rule_value + case Rule.GREATER_THAN: + return document_value > coerced_rule_value + case Rule.EQUALS: + return document_value == coerced_rule_value + + def is_default_rule(self): + return self.id_ == BasicRule.DEFAULT_RULE_ID + + def is_include(self): + return self.policy == Policy.INCLUDE + + def coerce_rule_value_based_on_document_value(self, doc_value): + """Coerce the value inside the basic rule. + + This method tries to coerce the value inside the basic rule to the type used in the document. + + Arguments: + - `doc_value`: value of the field in the document to coerce + + """ + try: + match doc_value: + case str(): + return str(self.value) + case bool(): + return str(to_bool(self.value)) + case float() | int(): + return float(self.value) + case datetime.date() | datetime.datetime: + return to_datetime(self.value) + case _: + return str(self.value) + except ValueError as e: + logger.debug( + f"Failed to coerce value '{self.value}' ({type(self.value)}) based on document value '{doc_value}' ({type(doc_value)}) due to error: {type(e)}: {e}" + ) + return str(self.value) + + def __str__(self): + def _format_field(key, value): + if isinstance(value, Enum): + return f"{key}: {value.value}" + return f"{key}: {value}" + + formatted_fields = [ + _format_field(key, value) for key, value in self.__dict__.items() + ] + return "Basic rule: " + ", ".join(formatted_fields) + + def __format__(self, format_spec): + if format_spec == Format.SHORT.value: + # order uses 0 based indexing + return f"Basic rule {self.order + 1} (id: '{shorten_str(self.id_, BasicRule.SHORTEN_UUID_BY)}')" + return str(self) diff --git a/connectors/filtering/validation.py b/connectors/filtering/validation.py new file mode 100644 index 0000000..d07a420 --- /dev/null +++ b/connectors/filtering/validation.py @@ -0,0 +1,357 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# +from copy import deepcopy +from enum import Enum + +import fastjsonschema + +from connectors.filtering.basic_rule import BasicRule, Policy, Rule +from connectors.logger import logger +from connectors.utils import Format + + +class InvalidFilteringError(Exception): + pass + + +class SyncRuleValidationResult: + """Represent the validation result for a basic or an advanced rule.""" + + ADVANCED_RULES = "advanced_snippet" + + def __init__(self, rule_id, is_valid, validation_message): + self.rule_id = rule_id + self.is_valid = is_valid + self.validation_message = validation_message + + @classmethod + def valid_result(cls, rule_id): + return SyncRuleValidationResult( + rule_id=rule_id, is_valid=True, validation_message="Valid rule" + ) + + def __eq__(self, other): + if not isinstance(other, SyncRuleValidationResult): + msg = f"Can't compare SyncRuleValidationResult with {type(other)}" + raise TypeError(msg) + + return ( + self.rule_id == other.rule_id + and self.is_valid == other.is_valid + and self.validation_message == other.validation_message + ) + + +class FilterValidationError: + """Represent an error occurring during filtering validation. + + FilterValidationError can contain multiple ids/messages (f.e. if two basic rules are valid for themselves but + not in the context of each other) -> both rules belong to one error. + """ + + def __init__(self, ids=None, messages=None): + if ids is None: + ids = [] + if messages is None: + messages = [] + + self.ids = ids + self.messages = messages + + def __eq__(self, other): + if other is None: + return False + + return self.ids == other.ids and self.messages == other.messages + + def __str__(self): + return f"(ids: {self.ids}, messages: {self.messages})" + + +class FilteringValidationState(Enum): + VALID = "valid" + INVALID = "invalid" + EDITED = "edited" + + @classmethod + def to_s(cls, value): + match value: + case FilteringValidationState.VALID: + return "valid" + case FilteringValidationState.INVALID: + return "invalid" + case FilteringValidationState.EDITED: + return "edited" + + +class FilteringValidationResult: + """Composed of multiple FilterValidationErrors. + + One FilteringValidationResult is composed of one or multiple FilterValidationErrors if the result is invalid. + These errors will be derived from a single SyncRuleValidationResult which can be added to a FilteringValidationResult. + """ + + def __init__(self, state=FilteringValidationState.VALID, errors=None): + if errors is None: + errors = [] + + self.state = state + self.errors = errors + + def __add__(self, other): + if other is None: + return self + + if isinstance(other, SyncRuleValidationResult): + if not other.is_valid: + return FilteringValidationResult( + state=FilteringValidationState.INVALID, + errors=deepcopy(self.errors) + + [ + FilterValidationError( + ids=[other.rule_id], messages=[other.validation_message] + ) + ], + ) + else: + return self + else: + msg = f"Result of type '{type(other)}' cannot be added to '{type(FilteringValidationResult)}'" + raise NotImplementedError(msg) + + def __eq__(self, other): + if other is None: + return False + + return self.state == other.state and self.errors == other.errors + + def to_dict(self): + return { + "state": FilteringValidationState.to_s(self.state), + "errors": [vars(error) for error in self.errors], + } + + +class FilteringValidator: + """Facade for basic and advanced rule validators. + + The FilteringValidator class acts as a facade for basic rule and advanced rule validators, + calling their validate methods and aggregating the result in one FilteringValidationResult. + """ + + def __init__( + self, basic_rules_validators=None, advanced_rules_validators=None, logger_=None + ): + self.basic_rules_validators = ( + [] if basic_rules_validators is None else basic_rules_validators + ) + self.advanced_rules_validators = ( + [] if advanced_rules_validators is None else advanced_rules_validators + ) + self._logger = logger_ or logger + + async def validate(self, filtering): + def _is_valid_str(result): + if result is None: + return "Unknown (check validator implementation as it should never return 'None')" + + return "valid" if result.is_valid else "invalid" + + self._logger.info("Filtering validation started") + basic_rules = filtering.basic_rules + basic_rules_ids = [basic_rule["id"] for basic_rule in basic_rules] + + filtering_validation_result = FilteringValidationResult() + + for validator in self.basic_rules_validators: + if issubclass(validator, BasicRulesSetValidator): + # pass the whole set/list of rules at once (validate constraints between rules) + results = validator.validate(basic_rules) + for result in results: + filtering_validation_result += result + + logger.debug( + f"Basic rules set: '{basic_rules_ids}' validation result (Validator: {validator.__name__}): {_is_valid_str(result)}" + ) + + if issubclass(validator, BasicRuleValidator): + for basic_rule in basic_rules: + # pass rule by rule (validate rule in isolation) + validator_result = validator.validate(basic_rule) + filtering_validation_result += validator_result + + logger.debug( + f"{str(basic_rule)} validation result (Validator: {validator.__name__}): {_is_valid_str(validator_result)}" + ) + + if filtering.has_advanced_rules(): + advanced_rules = filtering.get_advanced_rules() + advanced_rules_validators = ( + self.advanced_rules_validators + if isinstance(self.advanced_rules_validators, list) + else [self.advanced_rules_validators] + ) + + for validator in advanced_rules_validators: + filtering_validation_result += await validator.validate(advanced_rules) + + self._logger.info( + f"Filtering validation result: {filtering_validation_result.state}" + ) + + if filtering_validation_result.errors: + self._logger.error( + f"Filtering validation errors: {[str(error) for error in filtering_validation_result.errors]}" + ) + + return filtering_validation_result + + +class BasicRulesSetValidator: + """Validate constraints between different rules.""" + + @classmethod + def validate(cls, rules): + raise NotImplementedError + + +class BasicRulesSetSemanticValidator(BasicRulesSetValidator): + """BasicRulesSetSemanticValidator can be used to validate that a set of filtering rules does not contain semantic duplicates. + + A semantic duplicate is defined as two basic rules having the same values for `field`, `rule` and `value`. + Therefore, two basic rules are also seen as semantic duplicates, if their `policy` values differ. + + If a semantic duplicate is detected both rules will be marked as invalid. + """ + + @classmethod + def validate(cls, rules): + rules_dict = {} + + for rule in rules: + basic_rule = BasicRule.from_json(rule) + # we want to check whether another rule already uses the exact same values for 'field', 'rule' and 'value' + # to detect semantic duplicates + field_rule_value_hash = hash( + (basic_rule.field, basic_rule.rule, basic_rule.value) + ) + + if field_rule_value_hash in rules_dict: + semantic_duplicate = rules_dict[field_rule_value_hash] + + return cls.semantic_duplicates_validation_results( + basic_rule, semantic_duplicate + ) + + rules_dict[field_rule_value_hash] = basic_rule + + return [ + SyncRuleValidationResult.valid_result(rule_id=rule.id_) + for rule in rules_dict.values() + ] + + @classmethod + def semantic_duplicates_validation_results(cls, basic_rule, semantic_duplicate): + def semantic_duplicate_msg(rule_one, rule_two): + return f"{format(rule_one, Format.SHORT.value)} is semantically equal to {format(rule_two, Format.SHORT.value)}." + + return [ + # We need two error messages to highlight both rules in the UI + SyncRuleValidationResult( + rule_id=basic_rule.id_, + is_valid=False, + validation_message=semantic_duplicate_msg( + basic_rule, semantic_duplicate + ), + ), + SyncRuleValidationResult( + rule_id=semantic_duplicate.id_, + is_valid=False, + validation_message=semantic_duplicate_msg( + semantic_duplicate, basic_rule + ), + ), + ] + + +class BasicRuleValidator: + """Validate a single rule in isolation.""" + + @classmethod + def validate(cls, rule): + raise NotImplementedError + + +class BasicRuleNoMatchAllRegexValidator(BasicRuleValidator): + """BasicRuleNoMatchAllRegexValidator can be used to check that a basic rule does not use a match all regex.""" + + MATCH_ALL_REGEXPS = [".*", "(.*)"] + + @classmethod + def validate(cls, basic_rule_json): + basic_rule = BasicRule.from_json(basic_rule_json) + # default rule uses match all regex, which is intended + if basic_rule.is_default_rule(): + return SyncRuleValidationResult.valid_result(rule_id=basic_rule.id_) + + if basic_rule.rule == Rule.REGEX and any( + match_all_regex == basic_rule.value + for match_all_regex in BasicRuleNoMatchAllRegexValidator.MATCH_ALL_REGEXPS + ): + return SyncRuleValidationResult( + rule_id=basic_rule.id_, + is_valid=False, + validation_message=f"{format(basic_rule, Format.SHORT.value)} uses a match all regexps {BasicRuleNoMatchAllRegexValidator.MATCH_ALL_REGEXPS}, which are not allowed.", + ) + + return SyncRuleValidationResult.valid_result(rule_id=basic_rule.id_) + + +class BasicRuleAgainstSchemaValidator(BasicRuleValidator): + """BasicRuleAgainstSchemaValidator can be used to check if basic rule follows specified json schema.""" + + SCHEMA_DEFINITION = { + "type": "object", + "properties": { + "id": {"type": "string", "minLength": 1}, + "policy": {"format": "policy"}, + "field": {"type": "string", "minLength": 1}, + "rule": {"format": "rule"}, + "value": {"type": "string", "minLength": 1}, + "order": {"type": "number", "minLength": 1}, + }, + "required": ["id", "policy", "field", "rule", "value", "order"], + } + + CUSTOM_FORMATS = { + "policy": lambda policy_string: Policy.is_string_policy(policy_string), + "rule": lambda rule_string: Rule.is_string_rule(rule_string), + } + + SCHEMA = fastjsonschema.compile( + definition=SCHEMA_DEFINITION, formats=CUSTOM_FORMATS + ) + + @classmethod + def validate(cls, rule): + try: + BasicRuleAgainstSchemaValidator.SCHEMA(rule) + + return SyncRuleValidationResult.valid_result(rule["id"]) + except fastjsonschema.JsonSchemaValueException as e: + # id field could be missing + rule_id = rule["id"] if "id" in rule else None + + return SyncRuleValidationResult( + rule_id=rule_id, is_valid=False, validation_message=e.message + ) + + +class AdvancedRulesValidator: + """Validate advanced rules.""" + + def validate(self, advanced_rules): + raise NotImplementedError