From ee31efdbc34a669b0d7e0675531982cc01bcc9fe Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Thu, 7 Nov 2024 14:47:08 +0000 Subject: [PATCH 01/22] improve compare two records --- .../internals/linker_components/inference.py | 87 +++++++++---------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/splink/internals/linker_components/inference.py b/splink/internals/linker_components/inference.py index d14771494..c4b69f85b 100644 --- a/splink/internals/linker_components/inference.py +++ b/splink/internals/linker_components/inference.py @@ -683,7 +683,16 @@ def compare_two_records( SplinkDataFrame: Pairwise comparison with scored prediction """ - cache = self._linker._intermediate_table_cache + linker = self._linker + + retain_matching_columns = linker._settings_obj._retain_matching_columns + retain_intermediate_calculation_columns = ( + linker._settings_obj._retain_intermediate_calculation_columns + ) + linker._settings_obj._retain_matching_columns = True + linker._settings_obj._retain_intermediate_calculation_columns = True + + cache = linker._intermediate_table_cache uid = ascii_uid(8) @@ -698,7 +707,7 @@ def compare_two_records( else: to_register_right = record_2 - df_records_left = self._linker.table_management.register_table( + df_records_left = linker.table_management.register_table( to_register_left, f"__splink__compare_two_records_left_{uid}", overwrite=True, @@ -706,7 +715,7 @@ def compare_two_records( df_records_left.templated_name = "__splink__compare_two_records_left" - df_records_right = self._linker.table_management.register_table( + df_records_right = linker.table_management.register_table( to_register_right, f"__splink__compare_two_records_right_{uid}", overwrite=True, @@ -719,7 +728,7 @@ def compare_two_records( nodes_with_tf = cache.get_with_logging("__splink__df_concat_with_tf") pipeline.append_input_dataframe(nodes_with_tf) - for tf_col in self._linker._settings_obj._term_frequency_columns: + for tf_col in linker._settings_obj._term_frequency_columns: tf_table_name = colname_to_tf_tablename(tf_col) if tf_table_name in cache: tf_table = cache.get_with_logging(tf_table_name) @@ -734,67 +743,49 @@ def compare_two_records( ) sql_join_tf = _join_new_table_to_df_concat_with_tf_sql( - self._linker, "__splink__compare_two_records_left" + linker, "__splink__compare_two_records_left" ) pipeline.enqueue_sql(sql_join_tf, "__splink__compare_two_records_left_with_tf") sql_join_tf = _join_new_table_to_df_concat_with_tf_sql( - self._linker, "__splink__compare_two_records_right" + linker, "__splink__compare_two_records_right" ) pipeline.enqueue_sql(sql_join_tf, "__splink__compare_two_records_right_with_tf") - source_dataset_ic = ( - self._linker._settings_obj.column_info_settings.source_dataset_input_column - ) - uid_ic = self._linker._settings_obj.column_info_settings.unique_id_input_column - - pipeline = add_unique_id_and_source_dataset_cols_if_needed( - self._linker, - df_records_left, - pipeline, - in_tablename="__splink__compare_two_records_left_with_tf", - out_tablename="__splink__compare_two_records_left_with_tf_uid_fix", - uid_str="_left", - ) - pipeline = add_unique_id_and_source_dataset_cols_if_needed( - self._linker, - df_records_right, - pipeline, - in_tablename="__splink__compare_two_records_right_with_tf", - out_tablename="__splink__compare_two_records_right_with_tf_uid_fix", - uid_str="_right", - ) - - sqls = block_using_rules_sqls( - input_tablename_l="__splink__compare_two_records_left_with_tf_uid_fix", - input_tablename_r="__splink__compare_two_records_right_with_tf_uid_fix", - blocking_rules=[BlockingRule("1=1")], - link_type=self._linker._settings_obj._link_type, - source_dataset_input_column=source_dataset_ic, - unique_id_input_column=uid_ic, - ) - pipeline.enqueue_list_of_sqls(sqls) + cols_to_select = self._linker._settings_obj._columns_to_select_for_blocking + select_expr = ", ".join(cols_to_select) + sql = f""" + select {select_expr}, 0 as match_key + from __splink__compare_two_records_left_with_tf as l + cross join __splink__compare_two_records_right_with_tf as r + """ + pipeline.enqueue_sql(sql, "__splink__compare_two_records_blocked") - sqls = compute_comparison_vector_values_from_id_pairs_sqls( - self._linker._settings_obj._columns_to_select_for_blocking, - self._linker._settings_obj._columns_to_select_for_comparison_vector_values, - input_tablename_l="__splink__compare_two_records_left_with_tf_uid_fix", - input_tablename_r="__splink__compare_two_records_right_with_tf_uid_fix", - source_dataset_input_column=source_dataset_ic, - unique_id_input_column=uid_ic, + cols_to_select = ( + linker._settings_obj._columns_to_select_for_comparison_vector_values ) - pipeline.enqueue_list_of_sqls(sqls) + select_expr = ", ".join(cols_to_select) + sql = f""" + select {select_expr} + from __splink__compare_two_records_blocked + """ + pipeline.enqueue_sql(sql, "__splink__df_comparison_vectors") sqls = predict_from_comparison_vectors_sqls_using_settings( - self._linker._settings_obj, - sql_infinity_expression=self._linker._infinity_expression, + linker._settings_obj, + sql_infinity_expression=linker._infinity_expression, ) pipeline.enqueue_list_of_sqls(sqls) - predictions = self._linker._db_api.sql_pipeline_to_splink_dataframe( + predictions = linker._db_api.sql_pipeline_to_splink_dataframe( pipeline, use_cache=False ) + linker._settings_obj._retain_matching_columns = retain_matching_columns + linker._settings_obj._retain_intermediate_calculation_columns = ( + retain_intermediate_calculation_columns + ) + return predictions From 511a644648697840b5147d26e641cc303d394200 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Fri, 8 Nov 2024 13:53:13 +0000 Subject: [PATCH 02/22] works with tf columns --- .../internals/linker_components/inference.py | 30 +++++++++++++--- splink/internals/term_frequencies.py | 36 +++++++++++++------ 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/splink/internals/linker_components/inference.py b/splink/internals/linker_components/inference.py index c4b69f85b..4691b6c90 100644 --- a/splink/internals/linker_components/inference.py +++ b/splink/internals/linker_components/inference.py @@ -728,7 +728,9 @@ def compare_two_records( nodes_with_tf = cache.get_with_logging("__splink__df_concat_with_tf") pipeline.append_input_dataframe(nodes_with_tf) - for tf_col in linker._settings_obj._term_frequency_columns: + tf_cols = linker._settings_obj._term_frequency_columns + + for tf_col in tf_cols: tf_table_name = colname_to_tf_tablename(tf_col) if tf_table_name in cache: tf_table = cache.get_with_logging(tf_table_name) @@ -743,23 +745,41 @@ def compare_two_records( ) sql_join_tf = _join_new_table_to_df_concat_with_tf_sql( - linker, "__splink__compare_two_records_left" + linker, "__splink__compare_two_records_left", df_records_left ) pipeline.enqueue_sql(sql_join_tf, "__splink__compare_two_records_left_with_tf") sql_join_tf = _join_new_table_to_df_concat_with_tf_sql( - linker, "__splink__compare_two_records_right" + linker, "__splink__compare_two_records_right", df_records_right ) pipeline.enqueue_sql(sql_join_tf, "__splink__compare_two_records_right_with_tf") + pipeline = add_unique_id_and_source_dataset_cols_if_needed( + linker, + df_records_left, + pipeline, + in_tablename="__splink__compare_two_records_left_with_tf", + out_tablename="__splink__compare_two_records_left_with_tf_uid_fix", + uid_str="_left", + ) + pipeline = add_unique_id_and_source_dataset_cols_if_needed( + linker, + df_records_right, + pipeline, + in_tablename="__splink__compare_two_records_right_with_tf", + out_tablename="__splink__compare_two_records_right_with_tf_uid_fix", + uid_str="_right", + ) + cols_to_select = self._linker._settings_obj._columns_to_select_for_blocking + select_expr = ", ".join(cols_to_select) sql = f""" select {select_expr}, 0 as match_key - from __splink__compare_two_records_left_with_tf as l - cross join __splink__compare_two_records_right_with_tf as r + from __splink__compare_two_records_left_with_tf_uid_fix as l + cross join __splink__compare_two_records_right_with_tf_uid_fix as r """ pipeline.enqueue_sql(sql, "__splink__compare_two_records_blocked") diff --git a/splink/internals/term_frequencies.py b/splink/internals/term_frequencies.py index 250873e1d..091d3fcae 100644 --- a/splink/internals/term_frequencies.py +++ b/splink/internals/term_frequencies.py @@ -4,7 +4,7 @@ # https://github.com/moj-analytical-services/splink/pull/107 import logging import warnings -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional from numpy import arange, ceil, floor, log2 from pandas import concat, cut @@ -16,6 +16,7 @@ ) from splink.internals.input_column import InputColumn from splink.internals.pipeline import CTEPipeline +from splink.internals.splink_dataframe import SplinkDataFrame # https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports if TYPE_CHECKING: @@ -79,33 +80,46 @@ def _join_tf_to_df_concat_sql(linker: Linker) -> str: return sql -def _join_new_table_to_df_concat_with_tf_sql(linker: Linker, new_tablename: str) -> str: +def _join_new_table_to_df_concat_with_tf_sql( + linker: Linker, + input_tablename: str, + input_table: Optional[SplinkDataFrame] = None, +) -> str: """ - Joins any required tf columns onto new_tablename + Joins any required tf columns onto input_tablename This is needed e.g. when using linker.compare_two_records or linker.inference.find_matches_to_new_records in which the user provides new records which need tf adjustments computed """ + tf_cols_already_populated = [ + c.unquote().name + for c in input_table.columns + if c.unquote().name.startswith("tf_") + ] + tf_cols_not_already_populated = [ + c + for c in linker._settings_obj._term_frequency_columns + if c.unquote().tf_name not in tf_cols_already_populated + ] + cache = linker._intermediate_table_cache - settings_obj = linker._settings_obj - tf_cols = settings_obj._term_frequency_columns - select_cols = [f"{new_tablename}.*"] + select_cols = [f"{input_tablename}.*"] - for col in tf_cols: + for col in tf_cols_not_already_populated: tbl = colname_to_tf_tablename(col) if tbl in cache: select_cols.append(f"{tbl}.{col.tf_name}") - template = "left join {tbl} on " + new_tablename + ".{col} = {tbl}.{col}" + template = "left join {tbl} on " + input_tablename + ".{col} = {tbl}.{col}" template_with_alias = ( - "left join ({subquery}) as {_as} on " + new_tablename + ".{col} = {_as}.{col}" + "left join ({subquery}) as {_as} on " + input_tablename + ".{col} = {_as}.{col}" ) left_joins = [] - for i, col in enumerate(tf_cols): + for i, col in enumerate(tf_cols_not_already_populated): tbl = colname_to_tf_tablename(col) if tbl in cache: sql = template.format(tbl=tbl, col=col.name) @@ -127,7 +141,7 @@ def _join_new_table_to_df_concat_with_tf_sql(linker: Linker, new_tablename: str) sql = f""" select {select_cols_str} - from {new_tablename} + from {input_tablename} {left_joins_str} """ From 0bc58f4a069a529a544a287d0cf7343d76535735 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Fri, 8 Nov 2024 14:39:32 +0000 Subject: [PATCH 03/22] add test of compare two records --- tests/test_compare_two_records.py | 169 ++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 tests/test_compare_two_records.py diff --git a/tests/test_compare_two_records.py b/tests/test_compare_two_records.py new file mode 100644 index 000000000..c947271c3 --- /dev/null +++ b/tests/test_compare_two_records.py @@ -0,0 +1,169 @@ +import datetime + +import numpy as np +import pandas as pd +import pytest + +import splink.internals.comparison_library as cl +from splink import SettingsCreator +from splink.internals.blocking_rule_library import block_on +from splink.internals.pipeline import CTEPipeline +from splink.internals.vertically_concatenate import compute_df_concat_with_tf + +from .decorator import mark_with_dialects_excluding + + +@mark_with_dialects_excluding() +def test_compare_two_records_1(test_helpers, dialect): + # This one tests the following cases + # - User provides a city tf tble + # - But first_name tf table derived from input data + helper = test_helpers[dialect] + Linker = helper.Linker + + df = helper.load_frame_from_csv("./tests/datasets/fake_1000_from_splink_demos.csv") + # Cast the dob column to date + df["dob"] = df["dob"].astype("datetime64[s]") + df["first_name"] = df["first_name"].str.strip() + + settings = SettingsCreator( + link_type="dedupe_only", + comparisons=[ + cl.ExactMatch("first_name").configure(term_frequency_adjustments=True), + cl.ExactMatch("surname"), + cl.DateOfBirthComparison("dob", input_is_string=False), + cl.ExactMatch("city").configure(term_frequency_adjustments=True), + cl.ExactMatch("email"), + ], + blocking_rules_to_generate_predictions=[ + block_on("first_name"), + block_on("surname"), + ], + max_iterations=2, + retain_intermediate_calculation_columns=True, + retain_matching_columns=True, + ) + + linker = Linker(df, settings, **helper.extra_linker_args()) + + city_tf = pd.DataFrame( + [ + {"city": "London", "tf_city": 0.2}, + {"city": "Liverpool", "tf_city": 0.8}, + ] + ) + linker.table_management.register_term_frequency_lookup(city_tf, "city") + + # Compute the df_concat_with_tf so it's cached + pipeline = CTEPipeline() + compute_df_concat_with_tf(linker, pipeline) + + # Test with dictionary inputs + r1 = { + "first_name": "Julia", + "surname": "Taylor", + "dob": datetime.date(2015, 10, 29), + "city": "London", + "email": "hannah88@powers.com", + } + + r2 = { + "first_name": "Julia", + "surname": "Taylor", + "dob": datetime.date(2015, 10, 29), + "city": "London", + "email": "hannah88@powers.com", + } + + res = linker.inference.compare_two_records(r1, r2) + res_pd = res.as_pandas_dataframe() + + # Verify term frequencies match in the comparison result + assert res_pd["tf_city_l"].iloc[0] == 0.2 + assert res_pd["tf_city_r"].iloc[0] == 0.2 + # This is the tf value as derived from the input data + assert pytest.approx(res_pd["tf_first_name_l"].iloc[0]) == np.float64( + 0.00444444444444 + ) + assert pytest.approx(res_pd["tf_first_name_r"].iloc[0]) == np.float64( + 0.00444444444444 + ) + + +@mark_with_dialects_excluding() +def test_compare_two_records_2(test_helpers, dialect): + # This one tests the following cases + # - User provides a city and first_name tf tables + # - But specific values provided in input data, which take precedence + + helper = test_helpers[dialect] + Linker = helper.Linker + + df = helper.load_frame_from_csv("./tests/datasets/fake_1000_from_splink_demos.csv") + # Cast the dob column to date + df["dob"] = df["dob"].astype("datetime64[s]") + df["first_name"] = df["first_name"].str.strip() + + settings = SettingsCreator( + link_type="dedupe_only", + comparisons=[ + cl.ExactMatch("first_name").configure(term_frequency_adjustments=True), + cl.ExactMatch("surname"), + cl.DateOfBirthComparison("dob", input_is_string=False), + cl.ExactMatch("city").configure(term_frequency_adjustments=True), + cl.ExactMatch("email"), + ], + blocking_rules_to_generate_predictions=[ + block_on("first_name"), + block_on("surname"), + ], + max_iterations=2, + retain_intermediate_calculation_columns=True, + retain_matching_columns=True, + ) + + linker = Linker(df, settings, **helper.extra_linker_args()) + + city_tf = pd.DataFrame( + [ + {"city": "London", "tf_city": 0.2}, + {"city": "Liverpool", "tf_city": 0.8}, + ] + ) + linker.table_management.register_term_frequency_lookup(city_tf, "city") + + first_name_tf = pd.DataFrame( + [ + {"first_name": "Julia", "tf_first_name": 0.3}, + {"first_name": "Robert", "tf_first_name": 0.8}, + ] + ) + linker.table_management.register_term_frequency_lookup(first_name_tf, "first_name") + + # Test with dictionary inputs + r1 = { + "first_name": "Julia", + "surname": "Taylor", + "dob": datetime.date(2015, 10, 29), + "city": "London", + "email": "hannah88@powers.com", + "tf_city": 0.5, + } + + r2 = { + "first_name": "Julia", + "surname": "Taylor", + "dob": datetime.date(2015, 10, 29), + "city": "London", + "email": "hannah88@powers.com", + "tf_first_name": 0.4, + } + + res = linker.inference.compare_two_records(r1, r2) + res_pd = res.as_pandas_dataframe() + + # Verify term frequencies match in the comparison result + assert res_pd["tf_city_l"].iloc[0] == 0.5 + assert res_pd["tf_city_r"].iloc[0] == 0.2 + assert res_pd["tf_first_name_l"].iloc[0] == 0.3 + assert res_pd["tf_first_name_r"].iloc[0] == 0.4 From 704c68e8fb6e8440ca7935492884afc6a7761ee3 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Fri, 8 Nov 2024 14:44:35 +0000 Subject: [PATCH 04/22] maintain compat with previous code --- splink/internals/term_frequencies.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/splink/internals/term_frequencies.py b/splink/internals/term_frequencies.py index 091d3fcae..5cb03d9db 100644 --- a/splink/internals/term_frequencies.py +++ b/splink/internals/term_frequencies.py @@ -92,12 +92,14 @@ def _join_new_table_to_df_concat_with_tf_sql( or linker.inference.find_matches_to_new_records in which the user provides new records which need tf adjustments computed """ + tf_cols_already_populated = [] - tf_cols_already_populated = [ - c.unquote().name - for c in input_table.columns - if c.unquote().name.startswith("tf_") - ] + if input_table is not None: + tf_cols_already_populated = [ + c.unquote().name + for c in input_table.columns + if c.unquote().name.startswith("tf_") + ] tf_cols_not_already_populated = [ c for c in linker._settings_obj._term_frequency_columns From 0df455ba60a20a3193f896c6c152ceb155882814 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 06:47:00 +0000 Subject: [PATCH 05/22] add real time --- splink/real_time.py | 80 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 splink/real_time.py diff --git a/splink/real_time.py b/splink/real_time.py new file mode 100644 index 000000000..f6e430739 --- /dev/null +++ b/splink/real_time.py @@ -0,0 +1,80 @@ +from typing import Any, Dict + +from .internals import similarity_analysis +from .internals.completeness import completeness_chart +from .internals.database_api import DatabaseAPISubClass +from .internals.pipeline import CTEPipeline +from .internals.profile_data import profile_columns +from .internals.splink_dataframe import SplinkDataFrame + +__all__ = [ + "compare_records", +] + + +def compare_records( + record_1: Dict[str, Any], + record_2: Dict[str, Any], + db_api: DatabaseAPISubClass, +) -> SplinkDataFrame: + """Compare two records and compute similarity scores without requiring a Linker. + Assumes any required term frequency values are provided in the input records. + + Args: + record_1 (dict): First record to compare + record_2 (dict): Second record to compare + db_api (DatabaseAPISubClass): Database API to use for computations + + Returns: + SplinkDataFrame: Comparison results + """ + uid = ascii_uid(8) + + if isinstance(record_1, dict): + to_register_left = [record_1] + else: + to_register_left = record_1 + + if isinstance(record_2, dict): + to_register_right = [record_2] + else: + to_register_right = record_2 + + df_records_left = db_api.register_table( + to_register_left, + f"__splink__compare_records_left_{uid}", + overwrite=True, + ) + df_records_left.templated_name = "__splink__compare_records_left" + + df_records_right = db_api.register_table( + to_register_right, + f"__splink__compare_records_right_{uid}", + overwrite=True, + ) + df_records_right.templated_name = "__splink__compare_records_right" + + pipeline = CTEPipeline([df_records_left, df_records_right]) + + # Cross join the records + cols_to_select = [c.name for c in df_records_left.columns] + select_expr = ", ".join(cols_to_select) + + sql = f""" + select {select_expr}, 0 as match_key + from __splink__compare_records_left_{uid} as l + cross join __splink__compare_records_right_{uid} as r + """ + pipeline.enqueue_sql(sql, "__splink__compare_records_blocked") + + # Select comparison columns + sql = f""" + select {select_expr} + from __splink__compare_records_blocked + """ + pipeline.enqueue_sql(sql, "__splink__df_comparison_vectors") + + # Execute pipeline and return results + predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline, use_cache=False) + + return predictions From ff6b7c5497e2ddb971f7e0104e7237ec6e8e7b37 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 08:52:16 +0000 Subject: [PATCH 06/22] first attempt at realtime --- splink/{real_time.py => realtime.py} | 48 +++++++++++++++++++++------- 1 file changed, 37 insertions(+), 11 deletions(-) rename splink/{real_time.py => realtime.py} (60%) diff --git a/splink/real_time.py b/splink/realtime.py similarity index 60% rename from splink/real_time.py rename to splink/realtime.py index f6e430739..77fa795ae 100644 --- a/splink/real_time.py +++ b/splink/realtime.py @@ -1,10 +1,13 @@ +from pathlib import Path from typing import Any, Dict -from .internals import similarity_analysis -from .internals.completeness import completeness_chart from .internals.database_api import DatabaseAPISubClass +from .internals.misc import ascii_uid from .internals.pipeline import CTEPipeline -from .internals.profile_data import profile_columns +from .internals.predict import ( + predict_from_comparison_vectors_sqls_using_settings, +) +from .internals.settings_creator import SettingsCreator from .internals.splink_dataframe import SplinkDataFrame __all__ = [ @@ -15,6 +18,7 @@ def compare_records( record_1: Dict[str, Any], record_2: Dict[str, Any], + settings: SettingsCreator | dict[str, Any] | Path | str, db_api: DatabaseAPISubClass, ) -> SplinkDataFrame: """Compare two records and compute similarity scores without requiring a Linker. @@ -28,6 +32,16 @@ def compare_records( Returns: SplinkDataFrame: Comparison results """ + + + if not isinstance(settings, SettingsCreator): + settings_creator = SettingsCreator.from_path_or_dict(settings) + else: + settings_creator = settings + + settings_obj = settings_creator.get_settings(db_api.sql_dialect.sql_dialect_str) + + uid = ascii_uid(8) if isinstance(record_1, dict): @@ -57,24 +71,36 @@ def compare_records( pipeline = CTEPipeline([df_records_left, df_records_right]) # Cross join the records - cols_to_select = [c.name for c in df_records_left.columns] - select_expr = ", ".join(cols_to_select) + cols_to_select = settings_obj._columns_to_select_for_blocking + select_expr = ", ".join(cols_to_select) sql = f""" select {select_expr}, 0 as match_key - from __splink__compare_records_left_{uid} as l - cross join __splink__compare_records_right_{uid} as r + from __splink__compare_records_left as l + cross join __splink__compare_records_right as r """ - pipeline.enqueue_sql(sql, "__splink__compare_records_blocked") + pipeline.enqueue_sql(sql, "__splink__compare_two_records_blocked") - # Select comparison columns + cols_to_select = ( + settings_obj._columns_to_select_for_comparison_vector_values + ) + select_expr = ", ".join(cols_to_select) sql = f""" select {select_expr} - from __splink__compare_records_blocked + from __splink__compare_two_records_blocked """ pipeline.enqueue_sql(sql, "__splink__df_comparison_vectors") - # Execute pipeline and return results + sqls = predict_from_comparison_vectors_sqls_using_settings( + settings_obj, + sql_infinity_expression=db_api.sql_dialect.infinity_expression, + ) + pipeline.enqueue_list_of_sqls(sqls) + + predictions = db_api.sql_pipeline_to_splink_dataframe( + pipeline, use_cache=False + ) + predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline, use_cache=False) return predictions From c32d2f9f9da664c0fd223701cd03912dde89a3f4 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 09:15:59 +0000 Subject: [PATCH 07/22] remove double call --- splink/realtime.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/splink/realtime.py b/splink/realtime.py index 77fa795ae..8d2b63d3d 100644 --- a/splink/realtime.py +++ b/splink/realtime.py @@ -34,6 +34,7 @@ def compare_records( """ + if not isinstance(settings, SettingsCreator): settings_creator = SettingsCreator.from_path_or_dict(settings) else: @@ -70,7 +71,6 @@ def compare_records( pipeline = CTEPipeline([df_records_left, df_records_right]) - # Cross join the records cols_to_select = settings_obj._columns_to_select_for_blocking select_expr = ", ".join(cols_to_select) @@ -97,10 +97,8 @@ def compare_records( ) pipeline.enqueue_list_of_sqls(sqls) - predictions = db_api.sql_pipeline_to_splink_dataframe( - pipeline, use_cache=False - ) + predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline) - predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline, use_cache=False) + sql_used_to_create = pipeline.sql_used_to_create return predictions From ada32ccb182fb6e8c55e8384da4112ec99255fcc Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 09:44:13 +0000 Subject: [PATCH 08/22] caching seems to work --- splink/realtime.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/splink/realtime.py b/splink/realtime.py index 8d2b63d3d..69f497111 100644 --- a/splink/realtime.py +++ b/splink/realtime.py @@ -15,11 +15,14 @@ ] +_sql_used_for_compare_records_cache = {"sql": None, "uid": None} + def compare_records( record_1: Dict[str, Any], record_2: Dict[str, Any], settings: SettingsCreator | dict[str, Any] | Path | str, db_api: DatabaseAPISubClass, + use_sql_from_cache: bool = True, ) -> SplinkDataFrame: """Compare two records and compute similarity scores without requiring a Linker. Assumes any required term frequency values are provided in the input records. @@ -32,16 +35,7 @@ def compare_records( Returns: SplinkDataFrame: Comparison results """ - - - - if not isinstance(settings, SettingsCreator): - settings_creator = SettingsCreator.from_path_or_dict(settings) - else: - settings_creator = settings - - settings_obj = settings_creator.get_settings(db_api.sql_dialect.sql_dialect_str) - + global _sql_used_for_compare_records_cache uid = ascii_uid(8) @@ -69,8 +63,24 @@ def compare_records( ) df_records_right.templated_name = "__splink__compare_records_right" + if _sql_used_for_compare_records_cache["sql"] is not None and use_sql_from_cache: + sql = _sql_used_for_compare_records_cache["sql"] + uid_in_sql = _sql_used_for_compare_records_cache["uid"] + sql = sql.replace(uid_in_sql, uid) + return db_api._execute_sql_against_backend(sql) + + + if not isinstance(settings, SettingsCreator): + settings_creator = SettingsCreator.from_path_or_dict(settings) + else: + settings_creator = settings + + + settings_obj = settings_creator.get_settings(db_api.sql_dialect.sql_dialect_str) + pipeline = CTEPipeline([df_records_left, df_records_right]) + cols_to_select = settings_obj._columns_to_select_for_blocking select_expr = ", ".join(cols_to_select) @@ -99,6 +109,7 @@ def compare_records( predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline) - sql_used_to_create = pipeline.sql_used_to_create + _sql_used_for_compare_records_cache["sql"] = predictions.sql_used_to_create + _sql_used_for_compare_records_cache["uid"] = uid return predictions From 20345f070e6c901e0fb20b34c47f92c2cd941777 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 13:55:58 +0000 Subject: [PATCH 09/22] test realtime --- splink/realtime.py | 14 +-- tests/test_realtime.py | 215 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 tests/test_realtime.py diff --git a/splink/realtime.py b/splink/realtime.py index 69f497111..7e858d6c4 100644 --- a/splink/realtime.py +++ b/splink/realtime.py @@ -17,6 +17,7 @@ _sql_used_for_compare_records_cache = {"sql": None, "uid": None} + def compare_records( record_1: Dict[str, Any], record_2: Dict[str, Any], @@ -67,20 +68,21 @@ def compare_records( sql = _sql_used_for_compare_records_cache["sql"] uid_in_sql = _sql_used_for_compare_records_cache["uid"] sql = sql.replace(uid_in_sql, uid) - return db_api._execute_sql_against_backend(sql) - + return db_api._sql_to_splink_dataframe( + sql, + templated_name="__splink__realtime_compare_records", + physical_name=f"__splink__realtime_compare_records_{uid}", + ) if not isinstance(settings, SettingsCreator): settings_creator = SettingsCreator.from_path_or_dict(settings) else: settings_creator = settings - settings_obj = settings_creator.get_settings(db_api.sql_dialect.sql_dialect_str) pipeline = CTEPipeline([df_records_left, df_records_right]) - cols_to_select = settings_obj._columns_to_select_for_blocking select_expr = ", ".join(cols_to_select) @@ -91,9 +93,7 @@ def compare_records( """ pipeline.enqueue_sql(sql, "__splink__compare_two_records_blocked") - cols_to_select = ( - settings_obj._columns_to_select_for_comparison_vector_values - ) + cols_to_select = settings_obj._columns_to_select_for_comparison_vector_values select_expr = ", ".join(cols_to_select) sql = f""" select {select_expr} diff --git a/tests/test_realtime.py b/tests/test_realtime.py new file mode 100644 index 000000000..7c0010e93 --- /dev/null +++ b/tests/test_realtime.py @@ -0,0 +1,215 @@ +import pandas as pd +import pytest + +import splink.comparison_library as cl +from splink import SettingsCreator, block_on +from splink.realtime import compare_records + +from .decorator import mark_with_dialects_excluding + + +@mark_with_dialects_excluding() +def test_realtime_cache_two_records(test_helpers, dialect): + # Test that you get the same result whether you cache the SQL + # or not with different records + + helper = test_helpers[dialect] + + db_api = helper.extra_linker_args()["db_api"] + + df1 = pd.DataFrame( + [ + { + "unique_id": 0, + "first_name": "Julia ", + "surname": "Taylor", + "city": "London", + "email": "hannah88@powers.com", + "tf_city": 0.2, + "tf_first_name": 0.1, + } + ] + ) + + df2 = pd.DataFrame( + [ + { + "unique_id": 2, + "first_name": "Julia ", + "surname": "Taylor", + "city": "London", + "email": "hannah88@powers.com", + "cluster": 0, + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + ] + ) + + df3 = pd.DataFrame( + [ + { + "unique_id": 4, + "first_name": "Noah", + "surname": "Watson", + "city": "Bolton", + "email": "matthew78@ballard-mcdonald.net", + "cluster": 1, + "tf_city": 0.01, + "tf_first_name": 0.01, + }, + ] + ) + + settings = SettingsCreator( + link_type="dedupe_only", + comparisons=[ + cl.ExactMatch("first_name").configure(term_frequency_adjustments=True), + cl.ExactMatch("surname"), + cl.ExactMatch("city").configure(term_frequency_adjustments=True), + cl.ExactMatch("email"), + ], + blocking_rules_to_generate_predictions=[ + block_on("first_name"), + block_on("surname"), + ], + max_iterations=2, + retain_intermediate_calculation_columns=True, + retain_matching_columns=True, + ) + + res1_2_first = compare_records(df1, df2, settings, db_api).as_record_dict()[0][ + "match_weight" + ] + + res1_2_not_from_cache = compare_records( + df1, df2, settings, db_api, use_sql_from_cache=False + ).as_record_dict()[0]["match_weight"] + + res1_2_from_cache = compare_records( + df1, df2, settings, db_api, use_sql_from_cache=True + ).as_record_dict()[0]["match_weight"] + + assert res1_2_first == pytest.approx(res1_2_not_from_cache) + assert res1_2_first == pytest.approx(res1_2_from_cache) + + res1_3_first = compare_records(df1, df3, settings, db_api).as_record_dict()[0][ + "match_weight" + ] + res1_3_not_from_cache = compare_records( + df1, df3, settings, db_api, use_sql_from_cache=False + ).as_record_dict()[0]["match_weight"] + res1_3_from_cache = compare_records( + df1, df3, settings, db_api, use_sql_from_cache=True + ).as_record_dict()[0]["match_weight"] + + assert res1_3_first == pytest.approx(res1_3_not_from_cache) + assert res1_3_first == pytest.approx(res1_3_from_cache) + + assert res1_2_first != pytest.approx(res1_3_first) + + +@mark_with_dialects_excluding() +def test_realtime_cache_multiple_records(test_helpers, dialect): + # Test that you get the same result whether you cache the SQL + # or not with multiple records in each DataFrame + + helper = test_helpers[dialect] + db_api = helper.extra_linker_args()["db_api"] + + # Load data from CSV instead of hard-coding + df = helper.load_frame_from_csv("./tests/datasets/fake_1000_from_splink_demos.csv") + + # Select subsets of records for testing + df1 = df.iloc[[0, 1]].copy() # First two records + df2 = df.iloc[[2, 3]].copy() # Next two records + df3 = df.iloc[[4, 5, 6]].copy() # Next three records + + # Add required columns if they don't exist + for frame in [df1, df2, df3]: + if "tf_city" not in frame.columns: + frame["tf_city"] = 0.2 + if "tf_first_name" not in frame.columns: + frame["tf_first_name"] = 0.1 + if "cluster" not in frame.columns and frame is not df1: + frame["cluster"] = range(len(frame)) + + settings = SettingsCreator( + link_type="dedupe_only", + comparisons=[ + cl.ExactMatch("first_name").configure(term_frequency_adjustments=True), + cl.ExactMatch("surname"), + cl.ExactMatch("city").configure(term_frequency_adjustments=True), + cl.ExactMatch("email"), + ], + blocking_rules_to_generate_predictions=[ + block_on("first_name"), + block_on("surname"), + ], + max_iterations=2, + retain_intermediate_calculation_columns=True, + retain_matching_columns=True, + ) + + # Compare df1 and df2 + res1_2_first = compare_records(df1, df2, settings, db_api).as_pandas_dataframe() + res1_2_not_from_cache = compare_records( + df1, df2, settings, db_api, use_sql_from_cache=False + ).as_pandas_dataframe() + res1_2_from_cache = compare_records( + df1, df2, settings, db_api, use_sql_from_cache=True + ).as_pandas_dataframe() + + # Compare match weights using pandas merge + merged = res1_2_first.merge( + res1_2_not_from_cache, + on=["unique_id_l", "unique_id_r"], + suffixes=("_first", "_not_cache"), + ) + pd.testing.assert_series_equal( + merged["match_weight_first"], + merged["match_weight_not_cache"], + check_names=False, + ) + + merged = res1_2_first.merge( + res1_2_from_cache, + on=["unique_id_l", "unique_id_r"], + suffixes=("_first", "_from_cache"), + ) + pd.testing.assert_series_equal( + merged["match_weight_first"], + merged["match_weight_from_cache"], + check_names=False, + ) + + # Compare df1 and df3 (similar pattern) + res1_3_first = compare_records(df1, df3, settings, db_api).as_pandas_dataframe() + res1_3_not_from_cache = compare_records( + df1, df3, settings, db_api, use_sql_from_cache=False + ).as_pandas_dataframe() + res1_3_from_cache = compare_records( + df1, df3, settings, db_api, use_sql_from_cache=True + ).as_pandas_dataframe() + + merged = res1_3_first.merge( + res1_3_not_from_cache, + on=["unique_id_l", "unique_id_r"], + suffixes=("_first", "_not_cache"), + ) + pd.testing.assert_series_equal( + merged["match_weight_first"], + merged["match_weight_not_cache"], + check_names=False, + ) + + merged = res1_3_first.merge( + res1_3_from_cache, + on=["unique_id_l", "unique_id_r"], + suffixes=("_first", "_from_cache"), + ) + pd.testing.assert_series_equal( + merged["match_weight_first"], + merged["match_weight_from_cache"], + check_names=False, + ) From 7ab22413e4e1f3678f6350043b0da682ca2723fb Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 14:01:35 +0000 Subject: [PATCH 10/22] test with a pd merge --- tests/test_realtime.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_realtime.py b/tests/test_realtime.py index 7c0010e93..8efbb2e27 100644 --- a/tests/test_realtime.py +++ b/tests/test_realtime.py @@ -183,7 +183,6 @@ def test_realtime_cache_multiple_records(test_helpers, dialect): check_names=False, ) - # Compare df1 and df3 (similar pattern) res1_3_first = compare_records(df1, df3, settings, db_api).as_pandas_dataframe() res1_3_not_from_cache = compare_records( df1, df3, settings, db_api, use_sql_from_cache=False From 20a39f61d352e0240a29f1ec4d0930ad8d050826 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 16:07:09 +0000 Subject: [PATCH 11/22] 3.8 support --- tests/test_realtime.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_realtime.py b/tests/test_realtime.py index 8efbb2e27..973197c1f 100644 --- a/tests/test_realtime.py +++ b/tests/test_realtime.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import pandas as pd import pytest From 0bed1588a40a1148e5412fb73b1e50f49ca6be70 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 16:10:03 +0000 Subject: [PATCH 12/22] 3.8 support --- splink/realtime.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/splink/realtime.py b/splink/realtime.py index 7e858d6c4..10faab076 100644 --- a/splink/realtime.py +++ b/splink/realtime.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from pathlib import Path from typing import Any, Dict From e91d0da3ca986027a6e75fcd5b29f00bb1ad3d06 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 16:24:58 +0000 Subject: [PATCH 13/22] date types in sqlite don't work --- tests/test_compare_two_records.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/test_compare_two_records.py b/tests/test_compare_two_records.py index c947271c3..3b0ee04de 100644 --- a/tests/test_compare_two_records.py +++ b/tests/test_compare_two_records.py @@ -13,7 +13,7 @@ from .decorator import mark_with_dialects_excluding -@mark_with_dialects_excluding() +@mark_with_dialects_excluding("sqlite") def test_compare_two_records_1(test_helpers, dialect): # This one tests the following cases # - User provides a city tf tble @@ -21,10 +21,9 @@ def test_compare_two_records_1(test_helpers, dialect): helper = test_helpers[dialect] Linker = helper.Linker - df = helper.load_frame_from_csv("./tests/datasets/fake_1000_from_splink_demos.csv") - # Cast the dob column to date - df["dob"] = df["dob"].astype("datetime64[s]") - df["first_name"] = df["first_name"].str.strip() + df = helper.load_frame_from_parquet( + "./tests/datasets/fake_1000_from_splink_demos_strip_datetypes.parquet" + ) settings = SettingsCreator( link_type="dedupe_only", @@ -90,7 +89,7 @@ def test_compare_two_records_1(test_helpers, dialect): ) -@mark_with_dialects_excluding() +@mark_with_dialects_excluding("sqlite") def test_compare_two_records_2(test_helpers, dialect): # This one tests the following cases # - User provides a city and first_name tf tables @@ -99,10 +98,9 @@ def test_compare_two_records_2(test_helpers, dialect): helper = test_helpers[dialect] Linker = helper.Linker - df = helper.load_frame_from_csv("./tests/datasets/fake_1000_from_splink_demos.csv") - # Cast the dob column to date - df["dob"] = df["dob"].astype("datetime64[s]") - df["first_name"] = df["first_name"].str.strip() + df = helper.load_frame_from_parquet( + "./tests/datasets/fake_1000_from_splink_demos_strip_datetypes.parquet" + ) settings = SettingsCreator( link_type="dedupe_only", From 1650a2b7686c2dd722284df6949caa017ff883a9 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 16:28:33 +0000 Subject: [PATCH 14/22] add with datetypes --- ...00_from_splink_demos_strip_datetypes.parquet | Bin 0 -> 31471 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/datasets/fake_1000_from_splink_demos_strip_datetypes.parquet diff --git a/tests/datasets/fake_1000_from_splink_demos_strip_datetypes.parquet b/tests/datasets/fake_1000_from_splink_demos_strip_datetypes.parquet new file mode 100644 index 0000000000000000000000000000000000000000..84563de25f0c4870d8233d492b2a395358e80f76 GIT binary patch literal 31471 zcmeFZd0Z3M|NlEjGdjTm0|X2hHBrK%fQYDQQ4=5t2#SJ=)+$3Xkby}Cld#q*RB>sw z(yFanEvUHLYHO`+wJsIKweDJLRowTbb*oi-UxWSpe&2iV=Z|~;|K;J+NirvA&VJ78 zyx)fm)9@aC3ct$t{7w!?^E=wskMg0YodYP!lTgHyp;~~JKmlmb3bY0cPy!Fo2C%>r zc!9P+1-yX|@C9n%2mC=hpaB6O5VQwc&;fJ=oq!JLL1)kf1OWpG1|c96gn_Q08wdvx zAQE&3QJ@Ei20cLx=mmO%J|GtK1^vKVAP)2g1HeFF1SSv<5BT zkPe1`p&$bc1H(Zk$O0q4NH7XygVA6NcpHob?|^Y&JjekY$ORT)1w6)@1rx!0U=o-NDnS)^A4~yLK{c2Lrh^$^Cinn+ z2xfuV;3F^x)PRq{Ctxm^2j+uM!DnCrSO^w@#b62e94rN2fLc%omVxD91*iur!78vC ztO0AmIb`oz>i=X*baUIJHSq`3+x6zgFT=D z>;?NkBiIiPfP>%=I1G+}U%*jt3>*h1z)5floCasWS@0`32Yv(R!SCP#xCkzRCU6;C z0aw8_a2?zLH^D7%8{7eR!98#vJOB^DBk%|K6Fde_z*F!HJO_V)7vOL361)Ph!9SkN zqrVh{D82tr_eXIgfJ9&hNnj921}PvF3-90Uv`;z+5m7%m<%>&%gq( z5G(?V!4mK}SPH%XwV)0x1Ixh*P!CpuRbVw(1J;6dU_ICXz64)^jbIb_8hisbgKxnW z@EzC+z6U>mAHg=T9sC4#fSq6$*bROLdq4x&3-*CVupb-%2f-n57#sn=fTQ3TI1WyL zli(CM4bFhG;8$=C{07d0-@yfN5nKXI;4-)Zu7YdeI=BIDf?MD=xC8Ejd*D8J03L!z z;1BR8cnqF^r{EcQ4*mi!z~A5{cm-aAe>|CvkJ?fs@UOQ2r~jio5BTkPe1`p&$bc1H(Zk$O0q4NH7XygVA6NcpHob?|^Y&JjekY$ORT)1w6)@1rx!0U=o-NDnS)^A4~yLK{c2Lrh^$^ zCinn+2xfuV;3F^x)PRq{Ctxm^2j+uM!DnCrSO^w@#b62e94rN2fLc%omVxD91*iur z!78vCtO0AmIb`oz>i=X*baUIJHSq`3+x6z zgFT=D>;?NkBiIiPfP>%=I1G+}U%*jt3>*h1z)5floCasWS@0`32Yv(R!SCP#xCkzR zCU6;C0aw8_a2?zLH^D7%8{7eR!98#vJOB^DBk%|K6Fde_z*F!HJO_V)7vOL361)Ph z!9VE#x7wlqKWg`X`ajAe0VD!5NCJaEGDrcbU@%An>0k&L3NpYjFdSrpEHDC$1fxJU z7!AgNx4~HO4j2c$cr!4xnRRD)??I+y`wf)BulU>2ASJ_2(<4fq&*0_K8wU_SU1 zdb;!BX%As0DRk8CVWhfO@bJtOBdS8n70u1M9&C@Fn;PYy_LY*Weql z8GH-2fbYOo@ICke{0O#z?cgV{1MCF5z;5s}*aI5CUa$`|g8kqCI0z1b!{7+`1sny( zz;SQ_oCK%9X>bOd1;2uG;5Tp{{0=UFi{KJy0++!Ra1~thWa6&3q?%th{=NE9erJ9C zF8hYd(p}}(%CD7H!~Ex_hItd1qWr1Q|MD&F-c>%nqmbh|+Lm}srWCrJltP`elQ2QG zJ6o_Zmv**PCF*u5)LA*ZXpOaYS1Yai(yrFJsk+@v(7c@8%Fq?HyFJ1;U)tR!YLD(` zHs)l`&z`YYYk&5Ndwl8Vwnl}1k1A2k?eR_ys@vm}7SpuHHzQHspw7zT8vL@Ybq)Sw z-AxVca;EC{YOM3Py#clrb$bKF%}smT7wysS)4ES``#O|et=rde;^U@$ohlWb8+B9F zxsCc6LCYFD&x*O+*rg`1^ZuZDS-JZS3#`lb2QP77-XBsswex|{74vcrgson&>_FG` zn=c>ewsBAAgW;P`<{pgLdUe^s$Zd}=AMCzU(dAIo9<}9AkH(34VYl_PIG+|%WkxW^|gzx028 zb@?v?UOv9^%Rq__I%-t-S&y0+!-}KvY_F?F6IAA)V~Oe!)?;Q3zv5Vuw)pC?LAvUo zsK644gdD)@xf6IK_}8;PFYW+$6i};Vo2PRt0#sUX~W5kL_hxIuw+C1 z$>C|euAR)xFdI%~WsTrZjmYNfPmLT~eC^bzoNB}AZ0mgf^k`ds{pm5{x7SX;UDRMW zGuC~IKl4u6wfZyTCO)}#W_%?bd^TsQU*1`6hGFH|+*!S@pS9GOgMYQo8%ZpLRtKN6t(c#8PFP*P@?63CZ?B&#+}IHOo4EN@-f#A;*H-@K*!JZ5Z_-XW zdfyRpQ5TDb9f+4VIymQH+n>&6$At-9W< zo$6n3vu;N4+MCN}^}c;`c}-H+TPx;`EVxy_AaCuhl}k!)-&$2Wt?TX8D?Tl_y=L{w zwYS%<-*Wr*x{Z6g-dVr-bithsTd%LZ^X0aux9@zlvsJgd8~6AZ-rdv~yzcJThkD<+ z`_0j$Zud5y99ekp+p~G=?rk|=a_8Q6m!@^QzxC>;h4;U|xpLk8AMS3sbN|POd%Hc@ z_V{$+gYD0+uY2&*%cpl9?4VkQKisKkCqCT8gsgwKo9%P=;m@i;;g9yHM~RObH2Lcv z?bVjveY8(EJ^YWxpwGlV_J^)o|Hpyw@9zF_Flt};pNC@3h<_fAy|Mn!BXQ5}{`rfs zb;RSNiS6uPnoR9#KLG@05r7ESqF6@*Ea7R1^iiHNHsa+=6K3NQ^p}b2t*@a#+m zun9;H8OoU54rMlLw&@&nMyT0s;~er_WG1%yy2XpK>w&p6XddaN0Bb+ zTa#41H|?wMM{)FW2AMO=P;kjg25-yhVwOu3czUrqk#h*VNXDv1aY%>7pjVr%rJU1B zGF4W*F%zXKjW0#9gblFG(A zrBcHp#So6%B);qrH^+G@85_qDA2vbcob*eT8O1Nrv8*HtW?D&nX{8D&H%r9A4svqk zWQjVJcewB?DDSPTpz_8^;cH zSvYzx>*Ab4Nf2Xp*$t(v$)3v_;#HY;!EH1BW|+rHZa1&=WJD)NJWVY<1HD-l>eTHf zY0U6)&Oq3#Vy8p!Q3;4N$?z`f%VjGzn+)R^xh14((Ud3i=~f5X}`NogKvGI@7RlZJz0~Lv!lPp^G;{Ep`(JcDqB-_x|2d@=f6^$a)o?O zFvKc7$bpDYNhp(5F4EiO|7Y${h~;5pRVJs!CeRHk6D~TXhstTu@q)=qkzuRaMHSHr zb))GDiboNp&%VPVuRO_R%z~Z%S&>$(JjkZ$%8SYOs!TWf8c9nTgXa@gGhDEsuUHK# zxzAt-AY)=vO3FRZqY{c84#AOcQ~t%8#5{rKh*GbWJE1%50~I;Qx`^ zsgCce&9rb%o!s&U6P6MKSklJaP0d?4G1E=QDokR;a8){zU^6ppy3?)Oe@Net9M#_q zdtZ^{95#&2;6!`aYBrg7mg^5G#&G&*a|d<^UuvJ(zz)@+tF2{+IxQTpr$*^FDmc4g zv`XR!Ir9z2kPRt6r4c=m=NxN#vza!ol#HQ$4_B}f&xzOisuDS;g{K{CIw!b1zfef7 z!!OyvC?P{XZMj1p!BfihMs)%w^0}>=K36$~e02V|nN(g^7fk$6eQsMZ{kAGz65UdD zm|}=Tw`T`QPV2pRRwxm4w3@}>!-xIIq)GY8dn^VZmk-ULp<+mzzz!Eg-k?>XM0w}T zT$UHa;Y6#-Uj$DJ~8N|2b>6+qt?l)euy%)U?mkE-IgMq{&l? zL*&boNP;3sAZ^G|G)M#w`%SJ1i}^EV5dPn;NG&vfh-%YWNH@(;U9J-`6Qjz^m0Fz* zS9jf~ti)F>2;wL->ClKxreSOvSL&?FBkk0fLEJXpP)ph{I=+EloG` z25U*Takp|+=@uK$@h>ppQA5&T>AB++oW_`GWeE2ZmBj0_Hiv z;X;cu+eN|uGE+A|xTnhC+!(Ldafk3QEBzg2Lz~X-%_=Mw(``cp#t4^PSmVW-1ec9B zYLV3f&Mr7eF*_I&jk&K$q@$TpeD1JZMItvdiD=1M#>pjK%dG2PJw$C19c7#~<&=7O zvCGAa_LB--A&F9?ax*`b=QYD^HcN-;tVx}OMr;wPk21+lC*7WjmmG%A70J2{xl9@_ znNCw_20_hpxh7FuV{It%>EWR;Icb(QDiWly8EA`9_4Aogl61|uYZ>DbMRJeIsM|pn zVop`|RwQ%E0J){`^fAim|D6YAGSxK9V{>`8O)nBphR@}yhp}1ZI-b5sWknod`SL6$ z8Kp3n_>V(iuNNMNpXwi0cT{mUDa%D(v&oV*-?M|lZnEbPh9Amcl@Rt&ZQ?~8ClCy; zlFi{6rOLttQf|_i{!)r;Jo;SKQ&Oo&afJmj@jPc*PQp~f`C?2Yw=pV~E1xLhxv-UZ z%NhK=a+GE}n<*99l+$RBshN&;L-Oy&FlW zLaum?!lbivXsmOTk3Uk(!JBZ>@^Lh2IVp~`yx>FVrN8@-mM?-yoBP`@UcBf}=nJn@ zq~+^iLce}xB=plA8lqgeJDR;`^|0khvuk226t|8oA2g?P$|Cj5;q|HW!{7PA ztvq4q8yEekW_eCki}XdkCtN>Z?Ya<)T_{c!{~1lXZpy+^LCu$_sbL7 zw43wxcaE^edSg`!+GA0Sa6Ea`+~D+*Uc&LI?Y*Y2Y%~{*HfDc zZ`|(?R}{OmDZ_hCwTIsyR}aq(8CpqIJidvt{c7pt+EtBh)#_?|P0p_O@z7{n%|cV&A+}@OjE6b9r+WAXk8Rw@;YN$Tta`?j zj{Vzu8e=H$&WSC(4XI;$Ot`$q&o^ZFlCovvcTDn&SesFIx%1u+19~|x)%39JovnSV z&<9%P(5B{v%$o1KeoDClst8!X-piFfkLtBX`;#<4$PGavc5}@XqGA`thQO`$rI^F zb+gvfk&&uoms4<(PTGGppN@3;kLc?pYdxi}s#*HcVJsGhPBMTLrkNa8CtvE4b!Iwe zu}Th3w30r=M7fAwlg^cuB4K0%l}>w*j#xg}dAg&#)GqKm1qs5Z(pm<~Pr3q&sB(^e zz@$PDc&id5zDOb||B-tUDko%L0(tL$=|TDg(HY`Y&1#QM#3IP)Y^hf%oSd>XJ&?_U z*rSKP(R%a?Xg%*2s&r&cB0QTeAs^jY$6QuCilFhriH!xuMiCWS0U1km^gWt zegk=57KTbS-F**cdbCP(EX1Ef|Sak1P`V zYnnx%z#9ZM3G1J*233+^gPyKvUN45de4_wKI^(trSZQY3+_FBA)uA)u8daq1-K_hR zDJ(>w16AiNcBHJ+%4-2pUZVeC=t;Sh$KI$xB^KDFSR*ztr9*k-aVTGE%Fz8s80v_p z_FpB))1LoPfMgC^2ua{M9K~B`U(G@ZmebQ2C_o`jm0D=wTxLP-;4CgFmAr+=6uLOt zj1`_UpW`p1YV$dBfeH(CXOW&_C1-poVX*Xa(w$i;*ENXlrH1~KoBUGmvy_hPP^9tZ z^nZ2WG?gU97rIE1!ff~c48`p%G}o}1cDD`7H%P-UPu3>5Gf9?qkWG<(`7-9CjYNaOkvd` zhsCpngkdA6f#QfqvtSGx^hO?%p>I^uAH>Jt&p5I4V-&oz`m*MYOcd$y5vxh8DVu(T zs&olttnys~HIa|ZE9*>@&J?1_u%XEbMSQthz`KkhI+4Y8N+B&`Q5n|FTgAjXOZlMY zO`P<_tSk%Z5whkjr)O*A6`)*jIn*dJ9x{o}QQL&!(24vp2oZ~ou=-)z;er)KZ}F(w zsQXtcLJQnvARowdx}An6&8ksQDiWNYzL4>G8*VV@!uvaD2FuFBTCQ*X zMlqWE;I2THywQz=^nrT2p%-Iu?To zlVp{wCf&rLlD9pa1~141r%RftRb;pzbvm&bE~hlJFV@PE*wFY!NrpM}lTDYH1QbmK z1No@DqsYMijA{r5F9(^R8p1p5rNn@1shu;pWl{MKRZjo(Mj_tSGz&(Q`#*|ITZ6wM zldIRWNxVzqr?E*G4?Jfwf=ftQhYv1y%H1GYG0I|ba5-s(XyU2CYrL7koI~GIG1w-Y zVN;8(JpGrtSuHy0>1>+h$S22`G>4>5GYz;WG)u-cmGpITf->(qM~1UD(KU*At7N@s zBe7Hh`RCRN6po!!dNixX{a;|f#P*x{3ze)79k)6`>TJkk5{0sqdD>=m=tAroN+PU6 zbQ$JBut6!B(T^xG{b$V(L9$syiN1S;D2!ZS=ojSJ7Gy<>se}cXCz;Qb=TEp^$Ir zufhgNl8Y8qNfKJVvJK_ZKUY|Xle4NMj60UYf;@h>o#u_GN0CpNkNE>F9oJ^1Sba^E z;1H2s%vzElrhLK1yURGkr(QlvbEmqu8BCXCF(gn?p=S~22}{}qu_)y-) zu~Ea9@KuLN>c8R=TagYf$%3uNJcSf*4#b${u#&;Dz%*}TF@k3j6Lx6I0S+Rk|ReM2hfJiTo{p5_ya&@}{o; z?;dPAc2v#%*)pz_SN36(i*5PUN7>@!d>gIRzXgl0MwMh;af0!eb5O&?Cib;8{hK9th0DS?9GH|VjO2YPO7P+BrmXr?q z7wrEU-iPc&63<{Hj7j7m?7`|1N!?9yz-U-Rwu@rCMC* z*AHZs7bvSlNX6I9Qc*?dm%quXE|Ro-uEsSAvhe|w>g#u(LtDNkVQGKj`m`&)x%djQ zP?nvqkB~Sfl(eK3R7biGNuw3IEjrTX^~Gm9UXvi5nza7u)dPg3gNg9CyP`LKJNZGh ztVcr$J?WVav2;W%rOon@Q5lUCAqqwlRoN^bG_ggL2|`UBp0lyGYDz0FZCoWq@P&7f zyvW}*Rp}d?RvQH+*v2o)y0g;Ma+;?$w&>=iswvak>SX!YV$Tc}OV#YGBne8kMcR@m z{m!YBIxK5__NLqh5C4d)t!0l|?EO#^Z7o}1TAt+DUgOBwS=H34m9{UnI>Qv-y0XJt zWp{6GN~oUGX`r$4`J*e)uuvPy7K#})`uM3iJB^B#tyuFcttlEQA}v3Gj7;4B2?P-y zVQugTW>h-c4IjPPy8jdN;7=pQZV`Vy z&L8yej@5hAqiegSv~DX$dflYyVcAoo&zv?(6P?$%#5AQE;_YJDKad|wB zmYdGFRzBg_06^>bEFxhmtKZF|A7m{k(*Hb-?GQb&v8L&x4sVqh&u_Z2w576Qi~IZ%tPan1+}^dUb^eegl>5Gds)V5I|L)Vr zx4sFP)~@@wZO{MuGvsKGfuBXZDUIN*4gW(~wsyhpf_2CnS^0IM|DxF78DEp&rL&2) z_y2pi`>)!vwuj%%&@{h!t(uLBM>SE@`Kuc<h+W^LWKVnh8JpsOKHI`fqkh%|!_Hpqrc=P-IlNY-_z59w6+3gFpR* zOWdd1v9ri*zG$JNSW#NLlBn2?O}bSy!#uct906kXQ=Lc=yJv4N9X{Q>JZvy7IcYp{EBhgwwrvAK z#9g`3nuM~}2kls*VJEWhcpwDj*z*@YWoM?oKtQp(3IU@EW?GOV?WnRI2%jZ|MB|qF zR5`L2ZW=)sF)vN`=^RG!c5ei(9O!?6#Ii@-9>2>EGzGQ9r|e~yXnY!*=C8ozdFrz? zqGM)kx{MgMJ!*s&t347j7>~U<{LXmTg7$0}jPJH?>o*RMJ^#FmT(~!XtV7D*XNM>u-20X!o&+eAk8iwF;&mI{<*`15V=OV(40wTA* z@Rl`euw$(WIj|O=LPw8Ths%rumQg5&mv5P}@u||j<`QCFecSsoKAr4n{0*0fzC+I8 zvSLf;^SGS7kd;oN7^`9zQu6Zd+B778#Zm_*~wT7vgfLwRIpJ%S6rThg%yL|LTWJ zQNd+3;?r(;RIOu|%u>p&$z8Y)6~QbSb{#R(-o@Y(uW7%09% z99BHyl*=OPKs!E}+4jvM)OcvWpIad&W{tTQ!d>e2drt&-Ib;&r=hdN$3=i3i{P71! z2Aj3|!hYPjduOX5n|(O{{z5z|>(pC|@Lk3$Cqkc8eum_oP4!3e!{>$XLyFgrKZdVj zwbSWjCT-5AxRGT`)RHnL{LXLnGuZRr-e@n!(qdr&iDk|%8B!g}6n%5BI+V@+y=4ou zf))L`Q;0vg!({`-?#!DJjfz&g&4{OHkZTowI&tMDxtS=AockQ%vJ;0GbnHask1iCy zd-^s*EmPah8%fJ<^I5fxnHt>-sd>KK#}LbG-qnI%z{ZSM-^Y`K-fo9L$<%kaO$B~O z*D}d}4>a4@qnj4?C$&sXySsRz*tzl_gdID*FTIsr^7WJs$lt76Yv_keP1i8o-*$dW z7RoWUZ%QUoBnI!AgwXTYONevUpK{4)I=)LkXM&oZn;G`;+#Vy)4DVLlLiHMdzm=+Cw0WY&6e*L9S|<7stoz({N(sIyYN+! zZ%;h`;l{)r$hN|}9e;^U>+SP4S2)VV?2KZ34HiAIf~yV zDn~UuWVQ`|#Xn>hTwOgGIe3^;R)ss;be|yBxDod%5$n9=$RcxeZXK;*=1m!EUciR- zN*jTSbcgLSL@~O1{jO!OF(c-fFg!ktQdv%rJ!a1%${BgQPeT7jL6ibq@JTmHgh)OPJJblwXU*$Cvz#k zcl}QG{KldJJWTc0#R_!h*=CN3X54vuGfDh_^*+|JtA46-@*AkE8qw!edyZ&vI zhw)%$jQzs=keTs(ym>2oa_hV>M5o^NoW8^!{o?Or#E^We&K$)U`>M=FCiz?L+H+=B z$XVosZMcOlIQHWZi(I!;lo4*vd5CKj2e|*&t`NPZ@^Lm(!7qySnb1 zk&IC9Ro}`5`2BD9aM}3d%DcEQQg$I}WbgK%B9O1jc)h$ZNZ#D7`XRf`pzMi+=z866 zILodu`S|03>&J{iCW8_VwL^Nf9X>Qm9ll(#d*X6jHh(y>9+z0sV+Af#Pn|u4Xix4PQ?FxB zzO!N^a-H~WD^d~Ld(}4lc;25+wj+RIwujHrHnF1TneFmQFQ(_cp;#zFYoGM?JL3I6 z%(QS5ZZ21f!aKFcXn8s5Plf)MZ*k)`^SFx0%Ud`MD+8z&IBA=+lWIY;Y-|)oU>j8j zC2!1u)szj4k_}ctLafrR=e>NRXy-y9~etz z3+qguVA$j)EK5|%YB%TDZdfOMx>J*7-AuY*T_0sVR4ZEw1AB=O0ax~VYWu?Oy2-N7 ztK(ZGkIyzi+I1RO`I6~uWj^j~{y^O$ch3yiNoQ%^7%5?AGJTS`-zPYIL6mIk^o8t; zt78R3JIBoLhkCYr-;v zjgUzP7Qhip7i@D& zn-%Ze5a`u+A3wZxW#2EIWOXdd_F7rC2g=qt(-g+LYlKM*=!YEZ6KDA7<5w`=%gi%0 zJ#tSXlY{1H`-Tl=J`PIP4<+XwP8hb>!4!>W1tqVHGDX#`}pk@uWp}OH>$h-%g6m&v}_f=pxGkc1?AU6wn@HKa#!=UbsyB- zN^xOy9*VdSW&KUQoTXL~i<*Vj;@HXh-oc>_kPw&2Xdhf~^w3bOPU%vYLS=$_AWea(e zMx|@s<tvOZboiiK3#?^NuH3~Fw-|; z%(gOb%idXmy&NYSH(3VF>G0MhxoxA-qbY_BUjZ|@T>Z|V{htI6U6!`S$)~<4*-wzm zh-YGI;JDJUOe(1 zg8uroyqWOk-<{JNU^~CKyZFuC!<7SX8i(`Rk@~mK+<#|V*`Jpi*0p41R0w3M~&m-4=mD4YDetX-c`JHdkET^}tnoYIXq=zY#ctkZjcR6J9k6RLzO4)wr zkciL3cu{gYSbUwq!<9Bya!T+TlnuCpM9!5jl@g-{@1*=vh8Wahxm_xD(|S#YST5Pz z#l);m<9Vw~LEMQv?~*OhnZ>e$uLw6^CG7=YEHB34XiGd}-IApcmYj$zM})-lsz_go z_%ry8cL)h)q{0Z|OY-0wXicN{(&tEiP`u!TH&i~nH_7MP_-2!(@go)5gQ1#=~&DEhrs+Q#Rfu`xm#T+)6J+I!`?5_9Rd_h(VEt zja9%L;!R231uz_2!!q@A86Ml-%b84`qrXx5vJnUs;G+y;Gb0?lp`9+X*eW{V!e@nj zI}t@)Ztz#bjTzNJW~iJfwM=~j<)x?9a741ff0qv!u7S178sT=35)!9@Vaf`FpFSDy z@`OBR8)aWDoQ&;7C|2^EUZmoR>~4pCv)-2#V77n@C9>c`cFa9JVLq|a0dQLk(51-^ z)1T!st}(&$xMaY-D@vZNW2wU?yxgdX0J0a z9$U==DSLW06|ZldlE&D%eB#NXy5h}gv?mo$hst)jUsc6+8|NlBBEEzv7B)gFnXAbx zMT&S%-(Jm^I$W8Yz8E|7Rw<2qpv!a@@>ysf5#$49@T61}qSY;;rum%x%S8nx4 zZ$0eUE?Q;kzzmU`ksVr4Lx>maK=JeIm?5Q3&-p5RUn*3W&>jjrZ3>!6n({B?(19Fo zQy}vZ&a#X6$6sET9D9lVccJ4#aeGcHF&q(7zLcqKj)f6C<@ zu6DcNzZu~jZcbdoi>vPWmd9^d>aX1oV=j-K~7lMlJy-GPM&|pS`_w!6g zr?*BJKTRel=E13y42Ey0l;^BYKzDbf@u##-w8*p~NrZVJ({0+SMl0Z*jx~jFRFzyJ zF*EV`LfuzfoNPnd;7Kx#iRejux$HzDSj{Mje0niPn(-u7k;T*R=}VDY7y7Q~Hq=z5 zu~vc4BYoM-ygXZ70XfNvLb}_~he{_;uv3=~SLk~9mMh|gU5Rkp;E&Uu3a8D}6NeH5 z^>c>}VjZX*Q!p#K^!F>HoMW#r41&xNiUS3g- zcJ2>T6uUQER=6dD5lxG?)cvD3szym-2~R$*8qLLH^l;Q?@WbI>=~n8flCbX;@$N@g z$bI+>VBB^n1=fKk9(jnOcvFrsE5cFE%bAu(DS42>>i1AvziD3oWA)qn90{c{qDT%6 z^PgUgUtDkfIGKEeH!Z)@!If~wv-<5E-ble*<6p7Cu zQM7L|C8vNIhA)R_QduKLjvAS*)MSmuYs~1;Z{zDQxVE*T-Wi7sD799K8b3aV!z*|2 zV2j0S!Hdt!&!;GxQfCzk3JVJ6xGeA zn_mfyKbNE=jHXmW2vtTCI*SRR)HLy&qG~Q`LdW>ZlYKcw;mH;6(w;;!@jVTtn$(i+ zi%2@Csg~ZDfXccK0it0=;gd|Tv3vS!CRja8;cXf63G@uWmrjuY5;_K3d zvJRCU*%Ixn*@VCc#MgJigb5!_Agtjl1%9!crRGpI zNN721&a*zAlJN@nd{Ks4#D)OP5bb-|)Gu=1rd8S-?}~1Q9a!(;!rJR8*8G z&Ys;tuKhVtQJ*dFT}T977{d503iViQzMw*&C7GnNCyDCP<@2Rq)YjG3)-GGVjF7DH z<5y5r{mNDKc#+j)O$UlvyH2D@)_PGCk&_Kw5KZ%Io&10BmbF*|{>L#-VG-QrZx(^u zdid`S4nDuQMeyS(Ewul&1OM0M-sT$OcRM@Hul9{=-lF8O@|=Q0M-I*w8*40*N_nR% z(jwV?I>?hsy9E}by(2={o1NfG`@|Y^<%2{{YlQt@Z@EbLOLU+-JF9YWmv`tX1<#QL zs+1%|$5RjEwD_^;6f7>Td5PyTx&*$2*6`9*ZEw^f6kw5|XXr>}oXacC26# zxByMIWN(iIT5R%?OTeKn8(##EGb??Ia_AS4K5_8hffI?1i|t{wH6IeuP&)nBW1Cli zg~d*{(Jh~&az?=8sv@6v>X))8+fE&VP_{voGct%`@R@8#D}W=5fb>T9GRl5Vk-0*J zO8;pP%>Pj_#saKh3;|Tz-fD_P-MDC9b(F(V9%J+lru@t?G0s9PIB}}WZOoU?pbX6N zN1AgvC;oAge7MbsLj!IbSw)gUd8gor>R~L$ghPB8FXYvim5k+4N*{elmSC6QmTz>+ z)3P%nUvgBSGAhg=RG|I=W0!bkN6I;}BNf@7B5{fe-Y<@-7=rOcAe{_VtQ#kTI7(y0 zX(_?M>ARQ`nUYe0J{SZ?G2Rv7M$LqE-?dYt4G@sC0&7V~|3I4#9S zEH=hdlKy?#UnBPIVYC!GJ=>9oD&7uXIzx!YC}6GRhzOWO9tL#6`K?li(4iygALPOc z*~vxsHgbG6R@M>eA1Y9;{pn7A9a$^HL&};C9X8;WX2Y$@F8 zIOM}2pGpyVeQHV%eNVO9Ddh?dQ?e?bb5?Nr9Lk$Z4V4edIB@#SDDtM|^hRw(k>rF> zFnaeS5hM|=GcHe!f|vZ-`9)GiFbB0WokkK6!j+4K6m)^nB9(auq@PyA#^CmRQzug& zRW24;&S>-^jvu2gD7KdOjIt93%?BN)pt#%~WpwrA)bbZqe0Y(q*j{8qeHd*VoH8Q{ zB%Hw1OZv+lbd;ND0(2DDuM?%_P+$(Uqagz*<+y-%s$yeZ7A)N?yc6!2Zr)|2U%RlkHHl+}%fXUvnhjtZOP7g%*m77=uy+|*5G5(6<5ohQ5nNv(9oo}<)Hb6F9YWb#|18@r zS*;Kw4kLz~LLA?Tz>*r(N+xj6m7ORlx)W-G?nri_5lZBJ)>z7kRo^yM=^KMe1ZTeB z5V+W=T~*w6f^}kO1&1?4&T>0f?%Ln~aF!>0)&ew!-EsMaZyX7YB?4BO5Tf08iRvP* zSd8g=_^BdS;0LJkB|BeXDr=gekO~{jWQ8JM8q(T) zP;3u+Fdb0m4y_Q&A*_uU1;qWgxLo2zx%QvcAzvk&ldp*Ayd8f&(aV582!fUjqcK%7 zKu4)Xj4Ki!Z=|SVZzChR>`g;^>nX1VjMpn=tSGoR6uuqZ3b;)}{QD{S9Z`tR5!DM{ zIWcGGBdd<3;S?EE0i=OjaNCXfI1b52*gXP=DE&IK7*^eIE6k7Vf`OPZLNs-upgr)B zJL|t*sg8~@+Bwl{a2thHDjphSE#@3N4s7u*|18Gz9=9b96P|!^1ygUa6D1zu^_7Pj z6R;%Y>lejh+~?#2!F^(k`Jz-3R&8ph3<%IDERky_MU`^-Xp=77=8d9PDC3jqL4;U0 zJP4i9vBpxKgJ6${_<{VR&kE?Mu#|Y^c^J;Hk(^|&Iv9smiE+gWKUZTBHUlD>{v>_L zpJ?_)M2kPPq9yIk6Uco{R4isSr^9O<{z{6$Q+Y;V=Q{+VGjk+B){#Iew_l*#umcCgP-IcSP=%#bIYg1`G`(Q43^0-*o-yQ+ zS27!Abwu?xx{BRyiHo$<52O7l&fDMTA;X=4G1*ycx~?r0u;YRSgHLZyjU59Awl_?s za3>Z)PWe1>q{TsxV_g{dO+(a1j0AY?Y#SY$lG_gSTUw65=JXy+O z?6nJC-zoK7P5CVDIO02E3(q1cJ4E-R6n1VvKP<=s`r&WbWFMZVbnN7rq@xsw$P{Sw^wfJA6=FWo=`ZNYteA9c zLVkYJM-3tgH0@WE37&zJwd3bra_|6!HNvkaJQTgQczFi+eB`MpDE5l*QW@>#yptYG z`Rv-GM&7v|(Z`Yz-mpg@Mq^Orh=1Q-mEEwHL`1mnsr?r)5_J zeOZ&sTMEm2#Cl!w2>rg*4!D zES7i?GyST(Oar3z6vh$38{EPn|Xrt>>wnvSoYYx_OekpVPY(`SS0LJS_<_?^nI$A z?^?j(OM4{LfJEGyT&KNM-lgh*`J@9?b;Uo#E6@Xf+m-aM?n`203I&UVZOq(aYyvrr z+-0vp9<$r}U-pdbNa^R|aK&5E$f^tHfsGPkR7ts6_A4j zV8Zb_>=AC|U3_7$*nA-u9V~=v>ErNWdc_)vA5PHtasQ~$DvE4gDQ&f-d~PnrTQ~WG z6k|ppTj@ak=UR)C6Y??f)^*n!;o)9{$xi6TWl^>!H)ZrnZ^J}bxnA#f-s>4!AfJ52 zSckkDh$*CZ2P#)fF<2LRJ@%qRpHG5OPU`w|`X#4(6jy^(VYO^@+mdT!iJl5s56# z^O~z>M5$utMg2%^sS{lo^G?;5KAlyZ5BZEK#W`6GWe@D9A{y*2VxHn%4t;>vC@+R{ zMfz8=-tDk1Xh-peE&9)|{Xs=~-SEQt2crbM#mfVK^r8bPE;^7RR&q6*E8!~eH#sUW z%sBiHvL=5|f5u|WC4PzuslFvsmRIbR(uOfQC7)m~;tEXn^$+&yJlvO@47Ce|d`xeY zk;BcsqU+iJtG#OvZ>mi5=Rg8Un;eoulBQ`&liC&#+D+37v;&xv(xxqKX$v%Mm64O= zv`L#AN!pMC0!keL#}OSbh{9B;=zurC1>Jp6aBvtGuXq_}b%Al!(Q$RX%`lAM{JxVG z3PpE)#)oJAnD6O#a=!a}zRUaFe(zhXMkr>=J4-)owo@H+I{I5WMR}w}{l7T)sXl=H2nO=WuO%TLe_Ct6U@#oMhj+^$=*(zY zS!tO~_q2k_hbjQV*UZNTDwJj_C^u837%}ENN?eX-3I4#xZ?P278c#G`$I$JWt) z6Ba!|RDhRf)+%${*iv@qcSiVL`B&zTEjWql#E8ct(gRR`v3aXHO%kvvb*wjKFeL{& z$OwYM5#NU@bFT^O!Du&jSv-G3U^V??iIe<`xBdO<=IWe5d(sU9nQn2TQCr zv{so~f=TRH8dh1sP(d1oxK5o)xIs^}*rNM4)#8Bp5!xsz7BVv^i=b6w(Tmo5LeFi- zvZ+9aYnD=J2D6)$<#)O(pEia9r`fsQWCW>U0UV{Zg?v1HD93t8}GJQ-g{mX&Bxe}urrll$Lza| zCNAFG7z_pzHmVS?##IQLqBG2pYoa~S%MmdwR8ziz3Wir3tSS7L1)wHK9x>e)>AKx& z3$_X~F$xeDEmSf~ZnS4m%|LW{O%O^i4`jXR25x1l4j=9aP^zyCjlWL|5)s z=CU@qc?|QpPUa~rV;mFpSfAy)LUF_9i6?*Z(>`8goKmtPvcUbfi`bQWJMQc8< z%B0Mv^AAuK=9>xf2YRxZcMVJ`W7wk!IA2(-!Bt5l;b+EZ3r`LH4=+_`q#Goz0$nxB zP^_wlFSOWx>M;BTuc{KMBbHA({M^>@YJnG9YoY`?OXBIv%;LsLI>Jm?F7t(>0SOfw+_~m|>DgG=u=@)}#fNwdnQCPg(`!)9!C}ZOHGS3)lA+Uf;+oEOn2`vHEAKbZ70fSE+0&%E zMub4ZW|%2w`ct^)6gZn$#%xqm5yM#p9TL_KKm&$g&!iyqQ~ygU%5Gu|%M5Hhv)?(e zn9DI)64r>V6T1?=(eNfi>s66N&}u2bRMc8k`-XxQlO&yjL_@hArhlH0m^4=oy;lK z$t%pK%Q3AB>67OC2q$5fkeS7DDmKtS+1T+aRem>`tHh{G*Te;Mv~H!I`9`Mk$3y-O zVbbks5_J4kscdx`6+8L70<(@y*P=A1(W?gEVGhXAC>RW(ytn1%C#e{7Tr1&yb|XcX zQT|t&uowYLiRSH6t=UiYZJeO6^+Iszg%o<}n$_tUR1en4y}qxQR{5G2HC}5W)Il7a zKu(wZ^gN>=>IlNXzuJVg)P5@Pn=^X&;e;jn$~9aq-9bO4EY)TRpOEot)5|=3oOxfP zz+#+d6)E%z3E1naMh%t@5H@4$vd|b$$kaNjtN%Kh<1IQFBjI`Gg>)GYdD(;_njhxr z{pPReeG;#5uu=ook3BB4L1j2K(L6?-fhA^!0!wsP0oL^qWeFwdrfZnV%ygWr`LK2J z_9TK{+Rc2Ul4?{~>GsUjPNag`$=Hf%ICeUCi$4^_Cd!qFyxv3gaqs9}OqKWrLw72c zTgSP_*hm)IXzByzQ!*UkFs(9K&==K~qpc*-P;)uOy%5Nh3B_fGJF}PqIkQ9g&`CCu zF0ojdLk#Xw7>ynS;dXvC6=H5n%NtaJ7oOG_c4^jZ!HG(~Ij-=bX4F|!*1`Fz9_+OP zx-zETkbNY@utSEeB2`cmWyTwcOfJ*MFpYA33^%rXF!AI!+7wwn)Z93@jB?C8J<41~ zA9lOa9J7oyo7M%{62;*AT>6w4v=t>V@4I*(vp6BW#`T|*{6ZDjc;(1myf^`YbzJw4tMb_(h@fdP=+7dS_IDZAR z`daqe+X>I2$9H^sK-6+UG+4ZufK1cbzuIuw^Yzp59K5fDpwoeD2s-__5w}ZkY&~nF z6<^vUMqD33km-|`Rzm#f)IJ?8e`DXLSNUn_fj5bUj58)${`r~E@dD)e^l?N!fJ>ya zM0v(_^G&$23W(Q9Tfc(*(pf!pZk#>yXXxBeo<4K-%+nL(l^{$;q=e`(<5W|YoPe$g zIXk8CvDHB75c#AN&m0^Jbd2pL6MsmOqRnds-0ONN05Z@XaT2&MJw%BJqDN^c4gUcz z<0k-i1ANo7K!VwCQkj!TtXrov9U{69YJWbHs9Jy^|Mtd>F2G};QDeyvW(D9C0{jxy zQv6YVxd$8$Qpuc8B-hO#0L4VC&0lbt*9}#n!i}LJi z6=ti4XvqNjYY1bO)3kdn#GHU<5+hJ> z3gnZRq;0SNtwRK;bM;fP&t4t;k2H&lr&f{ADk8KANA3oZL{rAg+ne@uC^we`M2H&D zOQ93btRs3wNF@$IfZjhkP$XiNK6>r(1s^b5pfUrB3MS770_;y8zqP0HrAc#^pZ@a0 zV+79zgwkKW_~50jSD|fEa=fU{)hX##puh$yZepjl|H|96t#X5LcHfsW$jXSYH^@BI z-hO4j<`rY1jkY?Fj+C#Jq&b9k_06h8{XK}1|Sd+TL2I31BM4J_Q*9)#Xfu$ z${HR4?B}!JzrORXN3VV6difBD>PGdn4uS*|YqL%if65113W_pbAgy|oeDxM?gN`88 zpeO^fXG4&%mWS_oq8r7bT!W||V+eb+zbA=3dgtF)j!k=UGX%+=TbJRu<(jhS2QLrK z+Yww`-b=t_1TGc-^u*x}lS7k4nWdebkN(%}6@NkP?we0)_w;A)lGlcX9*zXXPCLiVh?8~H>1M0W4Utvu+eIfQVeG`wQd*zKPF78kPLnPpO1Q4VN_oK^wdF>5yb=b(tI zU~G8jK$RV~0+Lj7VJeYt72FO`Q(awMgVeS2=GCd`S@kdt4XOs|e3-_@CONHdZf<55 z(vcD!E=dkyGXjNc!_UJJVfZeB}ETy7NNK_Wp->pMC+ym z!Cg{Xqv;+Gzz1jyhHx6MrKc}ls#>aw|kHo;RY%pWyc<*mZbcPt%GG8#dgGSAa99TR`p4?PeW@7BjK zBMd9X9q;z{5FBT6PS}#CJF3guJ~%e-SDVHHbUV7+5^;gizSN~AgY=tADE&ABHVg1t z2AXnn$BfX!OKBBwk5IytOVtq+h3GvjK|Ab6k`iceE?ajqYb4QzB8pg%;aDWYBjUGmq#z~52|bG2No~B43O&|7SVG#7crMc!Jd*-7WMVwiA|ujpK!O~lR-J^P%x z>ujVG0g*=!k$HwnQ+Z(WMP}UQ`k)ONjZgeMC>l+CoZgi2uz897Gch;x4MHpS!Iqx5~w4v!|ba^8JE-LZ+c2aTZK)3+V zEcDyu*?u3cTp|fF7Q)c`4FCiw9XNuhFBBxcg)<5)1(s~|?(CtWM+90CR+PPx(kr;; z=K0R@T%7_TqBCd+#xF%R)=5bsxXYV9_om@qy=x4WI5FRIJi^QqbA_*X?L^~w^|!|UDYiySFqkU-)DEVE{J#51cvENAB zzPbv|*BBz6{T-fae|)H% zuq)VPZK$Xr^u*d*o4rVHX(Kc^N(x<$Wo300nUN8!WBXWI96srD*dwOTul|vTo2q^6Bw73*$mdnMG)|59Lz^%w($(%L^VRrBpJ)%aJC}72*9~c-b74Wd7yZQRE*x10 z&0deA#ln-i71N_#TJ0qx+nBT&+AfSXvNlx3`dX9q`TuIqHmlzvB;#MA9UH=)>Ta?B zU9R?PcNThiXZ+jxyx--l3cHg1A>QUJ>27On7}=l6xWTw;{?@v3S!sM!C36Vxb8z6B)+Zv+AxkWCj4%n*pDwo*T`{piGJ?y@yu=e z4w?!*q_4IW)%rneFUlzuX>|B~?m$%n?MHY%UK12~&v_aHEeW^P-|HfIR@5Zwy11VDL~R+zwSmdmhvL=a-A%Y&i15Y|@>VDE zLy=Rq6Zc$^$%3CERUt*ZHm5&Ia&9B>3;Xbx30GxRl8E#4kT}Wm?Ol9pvq(Ny(}nocD)IOi1llXS zVx&dnDe`yG$&opMBjb~sNAjN*8jf#o79)^K8X3QuxXt{Xr0k@E=g7U#1Hj^rcO x5Am~~r-YkL=}loi0&qwy-yII;i<7izXc9Ihr|0w)*anXNqm}{e-H3m6{Wq#BMxp=! literal 0 HcmV?d00001 From a495811834ab9eea7d9af556c77ccfa4f49e6c86 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 16:37:08 +0000 Subject: [PATCH 15/22] allow found by blocking rules --- splink/internals/accuracy.py | 7 ++-- .../internals/linker_components/inference.py | 15 +++++++- splink/realtime.py | 35 +++++++++++++++---- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/splink/internals/accuracy.py b/splink/internals/accuracy.py index b7d342b5f..7ec766b5c 100644 --- a/splink/internals/accuracy.py +++ b/splink/internals/accuracy.py @@ -20,6 +20,7 @@ if TYPE_CHECKING: from splink.internals.linker import Linker + from splink.internals.settings import Settings def truth_space_table_from_labels_with_predictions_sqls( @@ -289,8 +290,8 @@ def truth_space_table_from_labels_with_predictions_sqls( return sqls -def _select_found_by_blocking_rules(linker: "Linker") -> str: - brs = linker._settings_obj._blocking_rules_to_generate_predictions +def _select_found_by_blocking_rules(settings_obj: "Settings") -> str: + brs = settings_obj._blocking_rules_to_generate_predictions if brs: br_strings = [ @@ -425,7 +426,7 @@ def predictions_from_sample_of_pairwise_labels_sql(linker, labels_tablename): ) sqls.extend(sqls_2) - br_col = _select_found_by_blocking_rules(linker) + br_col = _select_found_by_blocking_rules(linker._settings_obj) sql = f""" select *, {br_col} diff --git a/splink/internals/linker_components/inference.py b/splink/internals/linker_components/inference.py index 4691b6c90..1556ede0d 100644 --- a/splink/internals/linker_components/inference.py +++ b/splink/internals/linker_components/inference.py @@ -4,6 +4,7 @@ import time from typing import TYPE_CHECKING, Any +from splink.internals.accuracy import _select_found_by_blocking_rules from splink.internals.blocking import ( BlockingRule, block_using_rules_sqls, @@ -639,7 +640,10 @@ def find_matches_to_new_records( return predictions def compare_two_records( - self, record_1: dict[str, Any], record_2: dict[str, Any] + self, + record_1: dict[str, Any], + record_2: dict[str, Any], + include_found_by_blocking_rules: bool = False, ) -> SplinkDataFrame: """Use the linkage model to compare and score a pairwise record comparison based on the two input records provided @@ -799,6 +803,15 @@ def compare_two_records( ) pipeline.enqueue_list_of_sqls(sqls) + if include_found_by_blocking_rules: + br_col = _select_found_by_blocking_rules(linker._settings_obj) + sql = f""" + select *, {br_col} + from __splink__df_predict + """ + + pipeline.enqueue_sql(sql, "__splink__found_by_blocking_rules") + predictions = linker._db_api.sql_pipeline_to_splink_dataframe( pipeline, use_cache=False ) diff --git a/splink/realtime.py b/splink/realtime.py index 10faab076..025e25c94 100644 --- a/splink/realtime.py +++ b/splink/realtime.py @@ -3,14 +3,15 @@ from pathlib import Path from typing import Any, Dict -from .internals.database_api import DatabaseAPISubClass -from .internals.misc import ascii_uid -from .internals.pipeline import CTEPipeline -from .internals.predict import ( +from splink.internals.accuracy import _select_found_by_blocking_rules +from splink.internals.database_api import DatabaseAPISubClass +from splink.internals.misc import ascii_uid +from splink.internals.pipeline import CTEPipeline +from splink.internals.predict import ( predict_from_comparison_vectors_sqls_using_settings, ) -from .internals.settings_creator import SettingsCreator -from .internals.splink_dataframe import SplinkDataFrame +from splink.internals.settings_creator import SettingsCreator +from splink.internals.splink_dataframe import SplinkDataFrame __all__ = [ "compare_records", @@ -26,6 +27,7 @@ def compare_records( settings: SettingsCreator | dict[str, Any] | Path | str, db_api: DatabaseAPISubClass, use_sql_from_cache: bool = True, + include_found_by_blocking_rules: bool = False, ) -> SplinkDataFrame: """Compare two records and compute similarity scores without requiring a Linker. Assumes any required term frequency values are provided in the input records. @@ -83,6 +85,13 @@ def compare_records( settings_obj = settings_creator.get_settings(db_api.sql_dialect.sql_dialect_str) + retain_matching_columns = settings_obj._retain_matching_columns + retain_intermediate_calculation_columns = ( + settings_obj._retain_intermediate_calculation_columns + ) + settings_obj._retain_matching_columns = True + settings_obj._retain_intermediate_calculation_columns = True + pipeline = CTEPipeline([df_records_left, df_records_right]) cols_to_select = settings_obj._columns_to_select_for_blocking @@ -109,9 +118,23 @@ def compare_records( ) pipeline.enqueue_list_of_sqls(sqls) + if include_found_by_blocking_rules: + br_col = _select_found_by_blocking_rules(settings_obj) + sql = f""" + select *, {br_col} + from __splink__df_predict + """ + + pipeline.enqueue_sql(sql, "__splink__found_by_blocking_rules") + predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline) _sql_used_for_compare_records_cache["sql"] = predictions.sql_used_to_create _sql_used_for_compare_records_cache["uid"] = uid + settings_obj._retain_matching_columns = retain_matching_columns + settings_obj._retain_intermediate_calculation_columns = ( + retain_intermediate_calculation_columns + ) + return predictions From f12879068ef8edb468ceca75fdf67317818c5bb6 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Mon, 11 Nov 2024 16:51:10 +0000 Subject: [PATCH 16/22] hardcode values so they work across backends --- tests/test_realtime.py | 87 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 6 deletions(-) diff --git a/tests/test_realtime.py b/tests/test_realtime.py index 973197c1f..6556f6e8d 100644 --- a/tests/test_realtime.py +++ b/tests/test_realtime.py @@ -119,13 +119,88 @@ def test_realtime_cache_multiple_records(test_helpers, dialect): helper = test_helpers[dialect] db_api = helper.extra_linker_args()["db_api"] - # Load data from CSV instead of hard-coding - df = helper.load_frame_from_csv("./tests/datasets/fake_1000_from_splink_demos.csv") + df1 = pd.DataFrame( + [ + { + "unique_id": 0, + "first_name": "Julia", + "surname": "Taylor", + "city": "London", + "email": "hannah88@powers.com", + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + { + "unique_id": 1, + "first_name": "John", + "surname": "Smith", + "city": "Manchester", + "email": "john.smith@email.com", + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + ] + ) + + df2 = pd.DataFrame( + [ + { + "unique_id": 2, + "first_name": "Julia", + "surname": "Taylor", + "city": "London", + "email": "hannah88@powers.com", + "cluster": 0, + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + { + "unique_id": 3, + "first_name": "Jane", + "surname": "Wilson", + "city": "Birmingham", + "email": "jane.w@example.com", + "cluster": 1, + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + ] + ) - # Select subsets of records for testing - df1 = df.iloc[[0, 1]].copy() # First two records - df2 = df.iloc[[2, 3]].copy() # Next two records - df3 = df.iloc[[4, 5, 6]].copy() # Next three records + df3 = pd.DataFrame( + [ + { + "unique_id": 4, + "first_name": "Noah", + "surname": "Watson", + "city": "Bolton", + "email": "matthew78@ballard-mcdonald.net", + "cluster": 2, + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + { + "unique_id": 5, + "first_name": "Emma", + "surname": "Brown", + "city": "Leeds", + "email": "emma.b@test.com", + "cluster": 3, + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + { + "unique_id": 6, + "first_name": "Oliver", + "surname": "Davies", + "city": "Bristol", + "email": "oliver.d@example.net", + "cluster": 4, + "tf_city": 0.2, + "tf_first_name": 0.1, + }, + ] + ) # Add required columns if they don't exist for frame in [df1, df2, df3]: From 69049a189600c479e853e48abac795b95a73a3a5 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Tue, 12 Nov 2024 08:49:39 +0000 Subject: [PATCH 17/22] fix mypy issues --- .../internals/linker_components/inference.py | 22 ++++++++++++++----- splink/realtime.py | 17 +++++++------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/splink/internals/linker_components/inference.py b/splink/internals/linker_components/inference.py index 1556ede0d..c22629981 100644 --- a/splink/internals/linker_components/inference.py +++ b/splink/internals/linker_components/inference.py @@ -641,18 +641,30 @@ def find_matches_to_new_records( def compare_two_records( self, - record_1: dict[str, Any], - record_2: dict[str, Any], + record_1: dict[str, Any] | AcceptableInputTableType | str, + record_2: dict[str, Any] | AcceptableInputTableType | str, include_found_by_blocking_rules: bool = False, ) -> SplinkDataFrame: """Use the linkage model to compare and score a pairwise record comparison - based on the two input records provided + based on the two input records provided. + + If your inputs contain multiple rows, scores for the cartesian product of + the two inputs will be returned. + + If your inputs contain hardcoded term frequency columns (e.g. + a tf_first_name column), then these values will be used instead of any + provided term frequency lookup tables. or term frequency values derived + from the input data. Args: record_1 (dict): dictionary representing the first record. Columns names and data types must be the same as the columns in the settings object record_2 (dict): dictionary representing the second record. Columns names and data types must be the same as the columns in the settings object + include_found_by_blocking_rules (bool, optional): If True, outputs a column + indicating whether the record pair would have been found by any of the + blocking rules specified in + settings.blocking_rules_to_generate_predictions. Defaults to False. Examples: ```py @@ -702,12 +714,12 @@ def compare_two_records( # Check if input is a DuckDB relation without importing DuckDB if isinstance(record_1, dict): - to_register_left = [record_1] + to_register_left: AcceptableInputTableType = [record_1] else: to_register_left = record_1 if isinstance(record_2, dict): - to_register_right = [record_2] + to_register_right: AcceptableInputTableType = [record_2] else: to_register_right = record_2 diff --git a/splink/realtime.py b/splink/realtime.py index 025e25c94..1b920a891 100644 --- a/splink/realtime.py +++ b/splink/realtime.py @@ -1,10 +1,10 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Dict +from typing import Any from splink.internals.accuracy import _select_found_by_blocking_rules -from splink.internals.database_api import DatabaseAPISubClass +from splink.internals.database_api import AcceptableInputTableType, DatabaseAPISubClass from splink.internals.misc import ascii_uid from splink.internals.pipeline import CTEPipeline from splink.internals.predict import ( @@ -18,12 +18,12 @@ ] -_sql_used_for_compare_records_cache = {"sql": None, "uid": None} +_sql_used_for_compare_records_cache: dict[str, str | None] = {"sql": None, "uid": None} def compare_records( - record_1: Dict[str, Any], - record_2: Dict[str, Any], + record_1: dict[str, Any] | AcceptableInputTableType, + record_2: dict[str, Any] | AcceptableInputTableType, settings: SettingsCreator | dict[str, Any] | Path | str, db_api: DatabaseAPISubClass, use_sql_from_cache: bool = True, @@ -45,12 +45,12 @@ def compare_records( uid = ascii_uid(8) if isinstance(record_1, dict): - to_register_left = [record_1] + to_register_left: AcceptableInputTableType = [record_1] else: to_register_left = record_1 if isinstance(record_2, dict): - to_register_right = [record_2] + to_register_right: AcceptableInputTableType = [record_2] else: to_register_right = record_2 @@ -71,7 +71,8 @@ def compare_records( if _sql_used_for_compare_records_cache["sql"] is not None and use_sql_from_cache: sql = _sql_used_for_compare_records_cache["sql"] uid_in_sql = _sql_used_for_compare_records_cache["uid"] - sql = sql.replace(uid_in_sql, uid) + if uid_in_sql is not None: + sql = sql.replace(uid_in_sql, uid) return db_api._sql_to_splink_dataframe( sql, templated_name="__splink__realtime_compare_records", From 8553393b90af4bcdba608fb234c8057bd6e2edda Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Tue, 12 Nov 2024 09:00:36 +0000 Subject: [PATCH 18/22] make the new realtime functions private --- splink/{ => internals}/realtime.py | 5 ----- tests/test_realtime.py | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) rename splink/{ => internals}/realtime.py (99%) diff --git a/splink/realtime.py b/splink/internals/realtime.py similarity index 99% rename from splink/realtime.py rename to splink/internals/realtime.py index 1b920a891..016c583e2 100644 --- a/splink/realtime.py +++ b/splink/internals/realtime.py @@ -13,11 +13,6 @@ from splink.internals.settings_creator import SettingsCreator from splink.internals.splink_dataframe import SplinkDataFrame -__all__ = [ - "compare_records", -] - - _sql_used_for_compare_records_cache: dict[str, str | None] = {"sql": None, "uid": None} diff --git a/tests/test_realtime.py b/tests/test_realtime.py index 6556f6e8d..6e38bd134 100644 --- a/tests/test_realtime.py +++ b/tests/test_realtime.py @@ -5,7 +5,7 @@ import splink.comparison_library as cl from splink import SettingsCreator, block_on -from splink.realtime import compare_records +from splink.internals.realtime import compare_records from .decorator import mark_with_dialects_excluding From e045354d078bafd989386982028b9a04d3db1815 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Tue, 12 Nov 2024 09:41:17 +0000 Subject: [PATCH 19/22] fix mypy issues --- splink/internals/linker_components/inference.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splink/internals/linker_components/inference.py b/splink/internals/linker_components/inference.py index c22629981..035dfcfe8 100644 --- a/splink/internals/linker_components/inference.py +++ b/splink/internals/linker_components/inference.py @@ -641,8 +641,8 @@ def find_matches_to_new_records( def compare_two_records( self, - record_1: dict[str, Any] | AcceptableInputTableType | str, - record_2: dict[str, Any] | AcceptableInputTableType | str, + record_1: dict[str, Any] | AcceptableInputTableType, + record_2: dict[str, Any] | AcceptableInputTableType, include_found_by_blocking_rules: bool = False, ) -> SplinkDataFrame: """Use the linkage model to compare and score a pairwise record comparison From ef682b162141c9d787f861fce8b8aaaa4d459fa1 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Tue, 12 Nov 2024 15:07:51 +0000 Subject: [PATCH 20/22] improve caching implementation --- splink/internals/realtime.py | 44 +++++++++++++++--------- tests/test_realtime.py | 65 ++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 15 deletions(-) diff --git a/splink/internals/realtime.py b/splink/internals/realtime.py index 016c583e2..d520ffdc3 100644 --- a/splink/internals/realtime.py +++ b/splink/internals/realtime.py @@ -13,7 +13,25 @@ from splink.internals.settings_creator import SettingsCreator from splink.internals.splink_dataframe import SplinkDataFrame -_sql_used_for_compare_records_cache: dict[str, str | None] = {"sql": None, "uid": None} + +class SQLCache: + def __init__(self): + self._cache: dict[int, tuple[str, str | None]] = {} + + def get(self, settings_id: int, new_uid: str) -> str | None: + if settings_id not in self._cache: + return None + + sql, cached_uid = self._cache[settings_id] + if cached_uid: + sql = sql.replace(cached_uid, new_uid) + return sql + + def set(self, settings_id: int, sql: str, uid: str | None) -> None: + self._cache[settings_id] = (sql, uid) + + +_sql_cache = SQLCache() def compare_records( @@ -35,7 +53,7 @@ def compare_records( Returns: SplinkDataFrame: Comparison results """ - global _sql_used_for_compare_records_cache + global _sql_cache uid = ascii_uid(8) @@ -63,16 +81,14 @@ def compare_records( ) df_records_right.templated_name = "__splink__compare_records_right" - if _sql_used_for_compare_records_cache["sql"] is not None and use_sql_from_cache: - sql = _sql_used_for_compare_records_cache["sql"] - uid_in_sql = _sql_used_for_compare_records_cache["uid"] - if uid_in_sql is not None: - sql = sql.replace(uid_in_sql, uid) - return db_api._sql_to_splink_dataframe( - sql, - templated_name="__splink__realtime_compare_records", - physical_name=f"__splink__realtime_compare_records_{uid}", - ) + settings_id = id(settings) + if use_sql_from_cache: + if cached_sql := _sql_cache.get(settings_id, uid): + return db_api._sql_to_splink_dataframe( + cached_sql, + templated_name="__splink__realtime_compare_records", + physical_name=f"__splink__realtime_compare_records_{uid}", + ) if not isinstance(settings, SettingsCreator): settings_creator = SettingsCreator.from_path_or_dict(settings) @@ -124,9 +140,7 @@ def compare_records( pipeline.enqueue_sql(sql, "__splink__found_by_blocking_rules") predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline) - - _sql_used_for_compare_records_cache["sql"] = predictions.sql_used_to_create - _sql_used_for_compare_records_cache["uid"] = uid + _sql_cache.set(settings_id, predictions.sql_used_to_create, uid) settings_obj._retain_matching_columns = retain_matching_columns settings_obj._retain_intermediate_calculation_columns = ( diff --git a/tests/test_realtime.py b/tests/test_realtime.py index 6e38bd134..c735c0b84 100644 --- a/tests/test_realtime.py +++ b/tests/test_realtime.py @@ -289,3 +289,68 @@ def test_realtime_cache_multiple_records(test_helpers, dialect): merged["match_weight_from_cache"], check_names=False, ) + + +@mark_with_dialects_excluding() +def test_realtime_cache_different_settings(test_helpers, dialect): + helper = test_helpers[dialect] + db_api = helper.extra_linker_args()["db_api"] + + df1 = pd.DataFrame( + [ + { + "unique_id": 0, + "first_name": "Julia", + "surname": "Taylor", + "city": "London", + "email": "julia@email.com", + } + ] + ) + + df2 = pd.DataFrame( + [ + { + "unique_id": 1, + "first_name": "Julia", + "surname": "Taylor", + "city": "London", + "email": "bad@address.com", + } + ] + ) + + settings_1 = SettingsCreator( + link_type="dedupe_only", + comparisons=[ + cl.ExactMatch("first_name"), + cl.ExactMatch("surname"), + cl.ExactMatch("city"), + ], + blocking_rules_to_generate_predictions=[block_on("first_name")], + ) + + settings_2 = SettingsCreator( + link_type="dedupe_only", + comparisons=[ + cl.ExactMatch("first_name"), + cl.ExactMatch("surname"), + cl.ExactMatch("email"), + ], + blocking_rules_to_generate_predictions=[block_on("first_name")], + ) + + res1 = compare_records( + df1, df2, settings_1, db_api, use_sql_from_cache=True + ).as_record_dict()[0]["match_weight"] + + res2 = compare_records( + df1, df2, settings_2, db_api, use_sql_from_cache=True + ).as_record_dict()[0]["match_weight"] + + assert res1 != pytest.approx(res2) + + res1_again = compare_records( + df1, df2, settings_1, db_api, use_sql_from_cache=True + ).as_record_dict()[0]["match_weight"] + assert res1 == pytest.approx(res1_again) From eddbdb62f75b0b2181f2a84281252a40e3239aad Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Tue, 12 Nov 2024 15:09:40 +0000 Subject: [PATCH 21/22] mypy --- splink/internals/realtime.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/splink/internals/realtime.py b/splink/internals/realtime.py index d520ffdc3..64fba85f4 100644 --- a/splink/internals/realtime.py +++ b/splink/internals/realtime.py @@ -27,8 +27,9 @@ def get(self, settings_id: int, new_uid: str) -> str | None: sql = sql.replace(cached_uid, new_uid) return sql - def set(self, settings_id: int, sql: str, uid: str | None) -> None: - self._cache[settings_id] = (sql, uid) + def set(self, settings_id: int, sql: str | None, uid: str | None) -> None: + if sql is not None: + self._cache[settings_id] = (sql, uid) _sql_cache = SQLCache() From 5e9a69bf116e5aa499f624823a29532f6bf31c43 Mon Sep 17 00:00:00 2001 From: Robin Linacre Date: Wed, 13 Nov 2024 16:11:09 +0000 Subject: [PATCH 22/22] don't need to remember settings since they're not saved --- splink/internals/realtime.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/splink/internals/realtime.py b/splink/internals/realtime.py index 64fba85f4..168b83875 100644 --- a/splink/internals/realtime.py +++ b/splink/internals/realtime.py @@ -98,10 +98,6 @@ def compare_records( settings_obj = settings_creator.get_settings(db_api.sql_dialect.sql_dialect_str) - retain_matching_columns = settings_obj._retain_matching_columns - retain_intermediate_calculation_columns = ( - settings_obj._retain_intermediate_calculation_columns - ) settings_obj._retain_matching_columns = True settings_obj._retain_intermediate_calculation_columns = True @@ -143,9 +139,4 @@ def compare_records( predictions = db_api.sql_pipeline_to_splink_dataframe(pipeline) _sql_cache.set(settings_id, predictions.sql_used_to_create, uid) - settings_obj._retain_matching_columns = retain_matching_columns - settings_obj._retain_intermediate_calculation_columns = ( - retain_intermediate_calculation_columns - ) - return predictions