Skip to content

Commit

Permalink
Merge pull request #156 from microsoft/v1.8.3
Browse files Browse the repository at this point in the history
Added Fabric Relation
  • Loading branch information
prdpsvs authored Apr 25, 2024
2 parents 0b3a906 + 8a4dd41 commit adf9f8d
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 86 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.2"
version = "1.8.3"
18 changes: 2 additions & 16 deletions dbt/adapters/fabric/fabric_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
from dbt.adapters.fabric.fabric_column import FabricColumn
from dbt.adapters.fabric.fabric_configs import FabricConfigs
from dbt.adapters.fabric.fabric_connection_manager import FabricConnectionManager
from dbt.adapters.fabric.fabric_relation import FabricRelation


class FabricAdapter(SQLAdapter):
ConnectionManager = FabricConnectionManager
Column = FabricColumn
AdapterSpecificConfigs = FabricConfigs
Relation = FabricRelation

_capabilities: CapabilityDict = CapabilityDict(
{
Expand Down Expand Up @@ -183,22 +185,6 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False

def render_limited(self) -> str:
rendered = self.render()
if self.limit is None:
return rendered
elif self.limit == 0:
return f"(select * from {rendered} where 1=0) _dbt_top_subq"
else:
return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq"

# TODO: Standardizing quote characters
# def quoted(self, identifier):
# return "[{identifier}]".format(
# quote_char=self.quote_character,
# identifier=identifier,
# )

@available
@classmethod
def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]:
Expand Down
27 changes: 27 additions & 0 deletions dbt/adapters/fabric/fabric_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dataclasses import dataclass, field
from typing import Optional, Type

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.utils import classproperty

from dbt.adapters.fabric.relation_configs import FabricQuotePolicy, FabricRelationType


@dataclass(frozen=True, eq=False, repr=False)
class FabricRelation(BaseRelation):
type: Optional[FabricRelationType] = None # type: ignore
quote_policy: FabricQuotePolicy = field(default_factory=lambda: FabricQuotePolicy())

@classproperty
def get_relation_type(cls) -> Type[FabricRelationType]:
return FabricRelationType

@classmethod
def render_limited(self) -> str:
rendered = self.render(self=self)
if self.limit is None:
return rendered
elif self.limit == 0:
return f"(select * from {rendered} where 1=0) _dbt_top_subq"
else:
return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq"
5 changes: 5 additions & 0 deletions dbt/adapters/fabric/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dbt.adapters.fabric.relation_configs.policies import (
FabricIncludePolicy,
FabricQuotePolicy,
FabricRelationType,
)
63 changes: 63 additions & 0 deletions dbt/adapters/fabric/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass
from typing import Any, Dict

import agate
from dbt.adapters.base.relation import Policy
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs import RelationConfigBase, RelationResults

from dbt.adapters.fabric.relation_configs.policies import FabricIncludePolicy, FabricQuotePolicy


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class FabricRelationConfigBase(RelationConfigBase):
"""
This base class implements a few boilerplate methods and provides some light structure for Fabric relations.
"""

@classmethod
def include_policy(cls) -> Policy:
return FabricIncludePolicy()

@classmethod
def quote_policy(cls) -> Policy:
return FabricQuotePolicy()

@classmethod
def from_relation_config(cls, relation_config: RelationConfig):
relation_config_dict = cls.parse_relation_config(relation_config)
relation = cls.from_dict(relation_config_dict)
return relation

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict:
raise NotImplementedError(
"`parse_relation_config()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def from_relation_results(cls, relation_results: RelationResults):
relation_config = cls.parse_relation_results(relation_results)
relation = cls.from_dict(relation_config)
return relation # type: ignore

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_relation_results()` needs to be implemented on this RelationConfigBase instance"
)

# @classmethod
# def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
# if cls.include_policy().get_part(component) and value:
# if cls.quote_policy().get_part(component):
# return f"[{value}]"
# return value.lower()
# return None

@classmethod
def _get_first_row(cls, results: agate.Table) -> agate.Row:
try:
return results.rows[0]
except IndexError:
return agate.Row(values=set())
23 changes: 23 additions & 0 deletions dbt/adapters/fabric/relation_configs/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy
from dbt_common.dataclass_schema import StrEnum


class FabricRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"


class FabricIncludePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
class FabricQuotePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True
3 changes: 2 additions & 1 deletion dbt/include/fabric/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

{% macro fabric__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) -%}

select name as [schema]
from sys.schemas {{ information_schema_hints() }}
{% endcall %}
Expand All @@ -28,6 +27,7 @@

{% macro fabric__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
with base as (
select
DB_NAME() as [database],
Expand All @@ -51,6 +51,7 @@

{% macro fabric__get_relation_without_caching(schema_relation) -%}
{% call statement('get_relation_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
with base as (
select
DB_NAME() as [database],
Expand Down
30 changes: 0 additions & 30 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,6 @@
path={"schema": reference[0], "identifier": reference[1]})) }}
{% endfor %}
{% elif relation.type == 'table'%}
{# {% call statement('find_references', fetch_result=true) %}
USE [{{ relation.database }}];
SELECT obj.name AS FK_NAME,
sch.name AS [schema_name],
tab1.name AS [table],
col1.name AS [column],
tab2.name AS [referenced_table],
col2.name AS [referenced_column]
FROM sys.foreign_key_columns fkc
INNER JOIN sys.objects obj
ON obj.object_id = fkc.constraint_object_id
INNER JOIN sys.tables tab1
ON tab1.object_id = fkc.parent_object_id
INNER JOIN sys.schemas sch
ON tab1.schema_id = sch.schema_id
INNER JOIN sys.columns col1
ON col1.column_id = parent_column_id AND col1.object_id = tab1.object_id
INNER JOIN sys.tables tab2
ON tab2.object_id = fkc.referenced_object_id
INNER JOIN sys.columns col2
ON col2.column_id = referenced_column_id AND col2.object_id = tab2.object_id
WHERE sch.name = '{{ relation.schema }}' and tab2.name = '{{ relation.identifier }}'
{% endcall %}
{% set references = load_result('find_references')['data'] %}
{% for reference in references -%}
-- dropping referenced table {{ reference[0] }}.{{ reference[1] }}
{{ fabric__drop_relation_script(relation.incorporate(
type="table",
path={"schema": reference[1], "identifier": reference[2]})) }}
{% endfor %} #}
{% set object_id_type = 'U' %}

{%- else -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

{% materialization incremental, adapter='fabric' -%}

{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this.incorporate(type='table') %}
{%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}

{%- set existing_relation = none %}
{% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%}
{% set existing_relation = target_relation %}
{% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %}
{% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %}
{% endif %}

{{ log("Full refresh mode" ~ full_refresh_mode)}}
{{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }}
{{ log("target relation: " ~target_relation ~ " type "~ target_relation.type ~ " is view? "~target_relation.is_view) }}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{%- set temp_relation = make_temp_relation(target_relation)-%}

{% set grant_config = config.get('grants') %}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{%- endcall -%}

{% elif existing_relation.is_view %}

{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(existing_relation) }}
{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{%- endcall -%}

{% elif full_refresh_mode %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{%- endcall -%}

{% else %}

{%- call statement('create_tmp_relation') -%}
{{ fabric__create_table_as(True, temp_relation, sql)}}
{%- endcall -%}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{%- call statement('main') -%}
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(temp_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set target_relation = target_relation.incorporate(type='table') %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.commit() %}
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{% if arg_dict["unique_key"] %}
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
-- Incremental Append will insert data into target table.
{% do return(get_incremental_append_sql(arg_dict)) %}
{% endif %}

Expand Down
43 changes: 30 additions & 13 deletions dbt/include/fabric/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
{% materialization table, adapter='fabric' %}

-- Create target relation
-- Load target relation
{%- set target_relation = this.incorporate(type='table') %}
-- Load existing relation
{%- set relation = fabric__get_relation_without_caching(this) %}

{% set existing_relation = none %}
{% if (relation|length == 1) %}
{% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %}
{% endif %}

{%- set backup_relation = none %}
{{log("Existing Relation type is "~ existing_relation.type)}}
{% if (existing_relation != none and existing_relation.type == "table") %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
{% elif (existing_relation != none and existing_relation.type == "view") %}
{%- set backup_relation = make_backup_relation(target_relation, 'view') -%}
{% endif %}

{% if (existing_relation != none) %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
-- Rename target relation as backup relation
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}

-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
-- Rename target relation as backup relation
{%- set relation = fabric__get_relation_without_caching(target_relation) %}
{% if relation|length > 0 %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- `BEGIN` happens here:
Expand All @@ -29,10 +44,12 @@
-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the foreign key references if exists
{{ drop_fk_indexes_on_table(backup_relation) }}
-- drop existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% if (backup_relation != none) %}
-- finally, drop the foreign key references if exists
{{ drop_fk_indexes_on_table(backup_relation) }}
-- drop existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% endif %}
-- Add constraints including FK relation.
{{ fabric__build_model_constraints(target_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
Expand Down
Loading

0 comments on commit adf9f8d

Please sign in to comment.