diff --git a/dbt/adapters/fabric/__version__.py b/dbt/adapters/fabric/__version__.py index ba22724d..eaf9d19e 100644 --- a/dbt/adapters/fabric/__version__.py +++ b/dbt/adapters/fabric/__version__.py @@ -1 +1 @@ -version = "1.8.2" +version = "1.8.3" diff --git a/dbt/adapters/fabric/fabric_adapter.py b/dbt/adapters/fabric/fabric_adapter.py index 1e787e8e..47e8bbc1 100644 --- a/dbt/adapters/fabric/fabric_adapter.py +++ b/dbt/adapters/fabric/fabric_adapter.py @@ -21,12 +21,14 @@ from dbt.adapters.fabric.fabric_column import FabricColumn from dbt.adapters.fabric.fabric_configs import FabricConfigs from dbt.adapters.fabric.fabric_connection_manager import FabricConnectionManager +from dbt.adapters.fabric.fabric_relation import FabricRelation class FabricAdapter(SQLAdapter): ConnectionManager = FabricConnectionManager Column = FabricColumn AdapterSpecificConfigs = FabricConfigs + Relation = FabricRelation _capabilities: CapabilityDict = CapabilityDict( { @@ -183,22 +185,6 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False - def render_limited(self) -> str: - rendered = self.render() - if self.limit is None: - return rendered - elif self.limit == 0: - return f"(select * from {rendered} where 1=0) _dbt_top_subq" - else: - return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq" - - # TODO: Standardizing quote characters - # def quoted(self, identifier): - # return "[{identifier}]".format( - # quote_char=self.quote_character, - # identifier=identifier, - # ) - @available @classmethod def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]: diff --git a/dbt/adapters/fabric/fabric_relation.py b/dbt/adapters/fabric/fabric_relation.py new file mode 100644 index 00000000..2ca2c7fb --- /dev/null +++ b/dbt/adapters/fabric/fabric_relation.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass, field +from typing import Optional, Type + +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.utils import classproperty + +from dbt.adapters.fabric.relation_configs import FabricQuotePolicy, FabricRelationType + + +@dataclass(frozen=True, eq=False, repr=False) +class FabricRelation(BaseRelation): + type: Optional[FabricRelationType] = None # type: ignore + quote_policy: FabricQuotePolicy = field(default_factory=lambda: FabricQuotePolicy()) + + @classproperty + def get_relation_type(cls) -> Type[FabricRelationType]: + return FabricRelationType + + @classmethod + def render_limited(self) -> str: + rendered = self.render(self=self) + if self.limit is None: + return rendered + elif self.limit == 0: + return f"(select * from {rendered} where 1=0) _dbt_top_subq" + else: + return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq" diff --git a/dbt/adapters/fabric/relation_configs/__init__.py b/dbt/adapters/fabric/relation_configs/__init__.py new file mode 100644 index 00000000..971a0b2c --- /dev/null +++ b/dbt/adapters/fabric/relation_configs/__init__.py @@ -0,0 +1,5 @@ +from dbt.adapters.fabric.relation_configs.policies import ( + FabricIncludePolicy, + FabricQuotePolicy, + FabricRelationType, +) diff --git a/dbt/adapters/fabric/relation_configs/base.py b/dbt/adapters/fabric/relation_configs/base.py new file mode 100644 index 00000000..e2b48064 --- /dev/null +++ b/dbt/adapters/fabric/relation_configs/base.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass +from typing import Any, Dict + +import agate +from dbt.adapters.base.relation import Policy +from dbt.adapters.contracts.relation import RelationConfig +from dbt.adapters.relation_configs import RelationConfigBase, RelationResults + +from dbt.adapters.fabric.relation_configs.policies import FabricIncludePolicy, FabricQuotePolicy + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class FabricRelationConfigBase(RelationConfigBase): + """ + This base class implements a few boilerplate methods and provides some light structure for Fabric relations. + """ + + @classmethod + def include_policy(cls) -> Policy: + return FabricIncludePolicy() + + @classmethod + def quote_policy(cls) -> Policy: + return FabricQuotePolicy() + + @classmethod + def from_relation_config(cls, relation_config: RelationConfig): + relation_config_dict = cls.parse_relation_config(relation_config) + relation = cls.from_dict(relation_config_dict) + return relation + + @classmethod + def parse_relation_config(cls, relation_config: RelationConfig) -> Dict: + raise NotImplementedError( + "`parse_relation_config()` needs to be implemented on this RelationConfigBase instance" + ) + + @classmethod + def from_relation_results(cls, relation_results: RelationResults): + relation_config = cls.parse_relation_results(relation_results) + relation = cls.from_dict(relation_config) + return relation # type: ignore + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + raise NotImplementedError( + "`parse_relation_results()` needs to be implemented on this RelationConfigBase instance" + ) + + # @classmethod + # def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]: + # if cls.include_policy().get_part(component) and value: + # if cls.quote_policy().get_part(component): + # return f"[{value}]" + # return value.lower() + # return None + + @classmethod + def _get_first_row(cls, results: agate.Table) -> agate.Row: + try: + return results.rows[0] + except IndexError: + return agate.Row(values=set()) diff --git a/dbt/adapters/fabric/relation_configs/policies.py b/dbt/adapters/fabric/relation_configs/policies.py new file mode 100644 index 00000000..20803b0d --- /dev/null +++ b/dbt/adapters/fabric/relation_configs/policies.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass + +from dbt.adapters.base.relation import Policy +from dbt_common.dataclass_schema import StrEnum + + +class FabricRelationType(StrEnum): + Table = "table" + View = "view" + CTE = "cte" + + +class FabricIncludePolicy(Policy): + database: bool = True + schema: bool = True + identifier: bool = True + + +@dataclass +class FabricQuotePolicy(Policy): + database: bool = True + schema: bool = True + identifier: bool = True diff --git a/dbt/include/fabric/macros/adapters/metadata.sql b/dbt/include/fabric/macros/adapters/metadata.sql index fb66feb7..bfea1ef8 100644 --- a/dbt/include/fabric/macros/adapters/metadata.sql +++ b/dbt/include/fabric/macros/adapters/metadata.sql @@ -11,7 +11,6 @@ {% macro fabric__list_schemas(database) %} {% call statement('list_schemas', fetch_result=True, auto_begin=False) -%} - select name as [schema] from sys.schemas {{ information_schema_hints() }} {% endcall %} @@ -28,6 +27,7 @@ {% macro fabric__list_relations_without_caching(schema_relation) -%} {% call statement('list_relations_without_caching', fetch_result=True) -%} + USE [{{ schema_relation.database }}]; with base as ( select DB_NAME() as [database], @@ -51,6 +51,7 @@ {% macro fabric__get_relation_without_caching(schema_relation) -%} {% call statement('get_relation_without_caching', fetch_result=True) -%} + USE [{{ schema_relation.database }}]; with base as ( select DB_NAME() as [database], diff --git a/dbt/include/fabric/macros/adapters/relation.sql b/dbt/include/fabric/macros/adapters/relation.sql index 4918eed4..b8f607bd 100644 --- a/dbt/include/fabric/macros/adapters/relation.sql +++ b/dbt/include/fabric/macros/adapters/relation.sql @@ -39,36 +39,6 @@ path={"schema": reference[0], "identifier": reference[1]})) }} {% endfor %} {% elif relation.type == 'table'%} - {# {% call statement('find_references', fetch_result=true) %} - USE [{{ relation.database }}]; - SELECT obj.name AS FK_NAME, - sch.name AS [schema_name], - tab1.name AS [table], - col1.name AS [column], - tab2.name AS [referenced_table], - col2.name AS [referenced_column] - FROM sys.foreign_key_columns fkc - INNER JOIN sys.objects obj - ON obj.object_id = fkc.constraint_object_id - INNER JOIN sys.tables tab1 - ON tab1.object_id = fkc.parent_object_id - INNER JOIN sys.schemas sch - ON tab1.schema_id = sch.schema_id - INNER JOIN sys.columns col1 - ON col1.column_id = parent_column_id AND col1.object_id = tab1.object_id - INNER JOIN sys.tables tab2 - ON tab2.object_id = fkc.referenced_object_id - INNER JOIN sys.columns col2 - ON col2.column_id = referenced_column_id AND col2.object_id = tab2.object_id - WHERE sch.name = '{{ relation.schema }}' and tab2.name = '{{ relation.identifier }}' - {% endcall %} - {% set references = load_result('find_references')['data'] %} - {% for reference in references -%} - -- dropping referenced table {{ reference[0] }}.{{ reference[1] }} - {{ fabric__drop_relation_script(relation.incorporate( - type="table", - path={"schema": reference[1], "identifier": reference[2]})) }} - {% endfor %} #} {% set object_id_type = 'U' %} {%- else -%} diff --git a/dbt/include/fabric/macros/materializations/models/incremental/incremental.sql b/dbt/include/fabric/macros/materializations/models/incremental/incremental.sql new file mode 100644 index 00000000..797f09a4 --- /dev/null +++ b/dbt/include/fabric/macros/materializations/models/incremental/incremental.sql @@ -0,0 +1,87 @@ + +{% materialization incremental, adapter='fabric' -%} + + {%- set full_refresh_mode = (should_full_refresh()) -%} + {% set target_relation = this.incorporate(type='table') %} + {%- set relations_list = fabric__get_relation_without_caching(target_relation) -%} + + {%- set existing_relation = none %} + {% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema) + and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%} + {% set existing_relation = target_relation %} + {% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema) + and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %} + {% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %} + {% endif %} + + {{ log("Full refresh mode" ~ full_refresh_mode)}} + {{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }} + {{ log("target relation: " ~target_relation ~ " type "~ target_relation.type ~ " is view? "~target_relation.is_view) }} + + -- configs + {%- set unique_key = config.get('unique_key') -%} + {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} + {%- set temp_relation = make_temp_relation(target_relation)-%} + + {% set grant_config = config.get('grants') %} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} + + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% if existing_relation is none %} + + {%- call statement('main') -%} + {{ fabric__create_table_as(False, target_relation, sql)}} + {%- endcall -%} + + {% elif existing_relation.is_view %} + + {#-- Can't overwrite a view with a table - we must drop --#} + {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }} + {{ drop_relation_if_exists(existing_relation) }} + {%- call statement('main') -%} + {{ fabric__create_table_as(False, target_relation, sql)}} + {%- endcall -%} + + {% elif full_refresh_mode %} + + {%- call statement('main') -%} + {{ fabric__create_table_as(False, target_relation, sql)}} + {%- endcall -%} + + {% else %} + + {%- call statement('create_tmp_relation') -%} + {{ fabric__create_table_as(True, temp_relation, sql)}} + {%- endcall -%} + {% do adapter.expand_target_column_types( + from_relation=temp_relation, + to_relation=target_relation) %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} + {% if not dest_columns %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {% endif %} + + {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} + {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} + {%- call statement('main') -%} + {{ strategy_sql_macro_func(strategy_arg_dict) }} + {%- endcall -%} + {% endif %} + + {% do drop_relation_if_exists(temp_relation) %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {% set target_relation = target_relation.incorporate(type='table') %} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + {% do adapter.commit() %} + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} diff --git a/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql b/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql index 2f0c346f..c95ce81b 100644 --- a/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql +++ b/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql @@ -3,6 +3,7 @@ {% if arg_dict["unique_key"] %} {% do return(get_incremental_delete_insert_sql(arg_dict)) %} {% else %} + -- Incremental Append will insert data into target table. {% do return(get_incremental_append_sql(arg_dict)) %} {% endif %} diff --git a/dbt/include/fabric/macros/materializations/models/table/table.sql b/dbt/include/fabric/macros/materializations/models/table/table.sql index 908023bd..24f71f31 100644 --- a/dbt/include/fabric/macros/materializations/models/table/table.sql +++ b/dbt/include/fabric/macros/materializations/models/table/table.sql @@ -1,17 +1,32 @@ {% materialization table, adapter='fabric' %} - -- Create target relation + -- Load target relation {%- set target_relation = this.incorporate(type='table') %} + -- Load existing relation + {%- set relation = fabric__get_relation_without_caching(this) %} + + {% set existing_relation = none %} + {% if (relation|length == 1) %} + {% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %} + {% endif %} + + {%- set backup_relation = none %} + {{log("Existing Relation type is "~ existing_relation.type)}} + {% if (existing_relation != none and existing_relation.type == "table") %} + {%- set backup_relation = make_backup_relation(target_relation, 'table') -%} + {% elif (existing_relation != none and existing_relation.type == "view") %} + {%- set backup_relation = make_backup_relation(target_relation, 'view') -%} + {% endif %} + + {% if (existing_relation != none) %} + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(backup_relation) }} + -- Rename target relation as backup relation + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {% endif %} + -- grab current tables grants config for comparision later on {% set grant_config = config.get('grants') %} - {%- set backup_relation = make_backup_relation(target_relation, 'table') -%} - -- drop the temp relations if they exist already in the database - {{ drop_relation_if_exists(backup_relation) }} - -- Rename target relation as backup relation - {%- set relation = fabric__get_relation_without_caching(target_relation) %} - {% if relation|length > 0 %} - {{ adapter.rename_relation(target_relation, backup_relation) }} - {% endif %} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- `BEGIN` happens here: @@ -29,10 +44,12 @@ -- `COMMIT` happens here {{ adapter.commit() }} - -- finally, drop the foreign key references if exists - {{ drop_fk_indexes_on_table(backup_relation) }} - -- drop existing/backup relation after the commit - {{ drop_relation_if_exists(backup_relation) }} + {% if (backup_relation != none) %} + -- finally, drop the foreign key references if exists + {{ drop_fk_indexes_on_table(backup_relation) }} + -- drop existing/backup relation after the commit + {{ drop_relation_if_exists(backup_relation) }} + {% endif %} -- Add constraints including FK relation. {{ fabric__build_model_constraints(target_relation) }} {{ run_hooks(post_hooks, inside_transaction=False) }} diff --git a/dbt/include/fabric/macros/materializations/models/view/view.sql b/dbt/include/fabric/macros/materializations/models/view/view.sql index 30444cdb..8576b2dd 100644 --- a/dbt/include/fabric/macros/materializations/models/view/view.sql +++ b/dbt/include/fabric/macros/materializations/models/view/view.sql @@ -1,23 +1,33 @@ {% materialization view, adapter='fabric' -%} - {%- set existing_relation = load_cached_relation(this) -%} {%- set target_relation = this.incorporate(type='view') -%} - - -- make back up relation - {%- set backup_relation_type = 'view' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + {{log("Target Relation "~target_relation)}} + + {%- set relation = fabric__get_relation_without_caching(this) %} + {% set existing_relation = none %} + {% if (relation|length == 1) %} + {% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %} + {% endif %} + {{log("Existing Relation "~existing_relation)}} + + {%- set backup_relation = none %} + {{log("Existing Relation type is "~ existing_relation.type)}} + {% if (existing_relation != none and existing_relation.type == "table") %} + {%- set backup_relation = make_backup_relation(target_relation, 'table') -%} + {% elif (existing_relation != none and existing_relation.type == "view") %} + {%- set backup_relation = make_backup_relation(target_relation, 'view') -%} + {% endif %} + + {% if (existing_relation != none) %} + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(backup_relation) }} + -- Rename target relation as backup relation + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {% endif %} {% set grant_config = config.get('grants') %} {{ run_hooks(pre_hooks, inside_transaction=False) }} - -- drop target relation if exists already in the database - {{ drop_relation_if_exists(backup_relation) }} - - {%- set relation = fabric__get_relation_without_caching(target_relation) %} - {% if relation|length > 0 %} - {{ adapter.rename_relation(target_relation, backup_relation) }} - {% endif %} - -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} @@ -32,9 +42,10 @@ {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks, inside_transaction=True) }} {{ adapter.commit() }} - {{ drop_relation_if_exists(backup_relation) }} + {% if (backup_relation != none) %} + {{ drop_relation_if_exists(backup_relation) }} + {% endif %} {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation]}) }} {%- endmaterialization -%} diff --git a/dbt/include/fabric/macros/utils/concat.sql b/dbt/include/fabric/macros/utils/concat.sql index 4eef52bd..01e4cccb 100644 --- a/dbt/include/fabric/macros/utils/concat.sql +++ b/dbt/include/fabric/macros/utils/concat.sql @@ -1,3 +1,7 @@ {% macro fabric__concat(fields) -%} - concat({{ fields|join(', ') }}) + {%- if fields|length < 2 -%} + {{ fields[0] }} + {%- else -%} + concat({{ fields|join(', ') }}) + {%- endif -%} {%- endmacro %} diff --git a/dbt/include/fabric/macros/utils/get_tables_by_pattern.sql b/dbt/include/fabric/macros/utils/get_tables_by_pattern.sql new file mode 100644 index 00000000..d10c652d --- /dev/null +++ b/dbt/include/fabric/macros/utils/get_tables_by_pattern.sql @@ -0,0 +1,12 @@ +{% macro fabric__get_tables_by_pattern_sql(schema_pattern, table_pattern, exclude='', database=target.database) %} + + select distinct + table_schema as {{ adapter.quote('table_schema') }}, + table_name as {{ adapter.quote('table_name') }}, + {{ dbt_utils.get_table_types_sql() }} + from {{ database }}.INFORMATION_SCHEMA.TABLES + where table_schema like '{{ schema_pattern }}' + and table_name like '{{ table_pattern }}' + and table_name not like '{{ exclude }}' + +{% endmacro %} diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index 7e2e7bef..65a1b490 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -15,8 +15,7 @@ from dbt.tests.adapter.basic.test_validate_connection import BaseValidateConnection -# Need help, assertions are failing -class TestSimpleMaterializationsFabric(BaseSimpleMaterializations): +class TestSimpleMaterializations(BaseSimpleMaterializations): pass diff --git a/tests/functional/adapter/test_empty.py b/tests/functional/adapter/test_empty.py index dcdd6644..10906a1c 100644 --- a/tests/functional/adapter/test_empty.py +++ b/tests/functional/adapter/test_empty.py @@ -1,9 +1,5 @@ -import pytest -from dbt.tests.adapter.empty.test_empty import BaseTestEmpty +# from dbt.tests.adapter.empty.test_empty import BaseTestEmpty -@pytest.mark.skip( - reason="render_limited() defaults to dbt-core implementation instead of using Fabric implementation" -) -class TestEmpty(BaseTestEmpty): - pass +# class TestEmpty(BaseTestEmpty): +# pass