Skip to content

Commit

Permalink
feat: append variable activity
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Nov 24, 2023
1 parent d8c0b2b commit 60f6d1b
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

from azure_data_factory_testing_framework.models.activities.activity import Activity
from azure_data_factory_testing_framework.models.activities.append_variable_activity import AppendVariableActivity
from azure_data_factory_testing_framework.models.activities.execute_pipeline_activity import ExecutePipelineActivity
from azure_data_factory_testing_framework.models.activities.filter_activity import FilterActivity
from azure_data_factory_testing_framework.models.activities.for_each_activity import ForEachActivity
Expand All @@ -13,6 +14,8 @@
def _get_activity_from_activity_data(activity_data: dict) -> Activity:
if activity_data["type"] == "SetVariable":
return SetVariableActivity(**activity_data)
if activity_data["type"] == "AppendVariable":
return AppendVariableActivity(**activity_data)
elif activity_data["type"] == "Until":
activities = _get_activity_from_activities_data(activity_data["activities"])
return UntilActivity(activities=activities, **activity_data)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Any

from azure_data_factory_testing_framework.models.activities.control_activity import ControlActivity
from azure_data_factory_testing_framework.models.data_factory_element import DataFactoryElement
from azure_data_factory_testing_framework.state import PipelineRunState


class AppendVariableActivity(ControlActivity):
def __init__(self, **kwargs: Any) -> None: # noqa: ANN401
"""This is the class that represents the Append Variable activity in the pipeline.
Args:
**kwargs: AppendVariableActivity properties coming directly from the json representation of the activity.
"""
kwargs["type"] = "AppendVariable"

super(ControlActivity, self).__init__(**kwargs)

self.variable_name: str = self.type_properties["variableName"]
self.value: DataFactoryElement = self.type_properties["value"]

def evaluate(self, state: PipelineRunState) -> "AppendVariableActivity":
super(ControlActivity, self).evaluate(state)

if isinstance(self.value, DataFactoryElement):
evaluated_value = self.value.evaluate(state)
else:
evaluated_value = self.value

state.append_variable(self.type_properties["variableName"], evaluated_value)

return self
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def set_variable(self, variable_name: str, value: Union[str, int, bool, float])

raise VariableBeingEvaluatedDoesNotExistError(variable_name)

def append_variable(self, variable_name: str, value: Union[str, int, bool, float]) -> None:
"""Appends a value to a variable if it exists and is an array. Otherwise, throws an exception.
Args:
variable_name: Name of the variable.
value: Appended value of the variable.
"""
for variable in self.variables:
if variable.name == variable_name:
if not isinstance(variable.value, list):
raise ValueError(f"Variable {variable_name} is not an array.")

variable.value.append(value)
return

raise VariableBeingEvaluatedDoesNotExistError(variable_name)

def get_variable_by_name(self, variable_name: str) -> PipelineRunVariable:
"""Gets a variable by name. Throws an exception if the variable is not found.
Expand Down
4 changes: 4 additions & 0 deletions tests/functional/append_variable_pipeline/item.metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "Pipeline",
"displayName": "append-variable-test"
}
58 changes: 58 additions & 0 deletions tests/functional/append_variable_pipeline/pipeline-content.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"name": "append-variable-test",
"properties": {
"activities": [
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"variableName": "values",
"value": {
"value": "@pipeline().parameters.appended_value",
"type": "Expression"
}
}
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"variableName": "values",
"value": {
"value": "@pipeline().parameters.initial_value",
"type": "Expression"
}
}
}
],
"parameters": {
"initial_value": {
"type": "array",
"defaultValue": []
},
"appended_value": {
"type": "int"
}
},
"variables": {
"values": {
"type": "Array",
"defaultValue": []
}
},
"annotations": []
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import List

import pytest
from azure_data_factory_testing_framework.models.activities.append_variable_activity import AppendVariableActivity
from azure_data_factory_testing_framework.models.activities.set_variable_activity import SetVariableActivity
from azure_data_factory_testing_framework.state import RunParameter, RunParameterType
from azure_data_factory_testing_framework.test_framework import TestFramework, TestFrameworkType


@pytest.mark.parametrize(
"initial_value,appended_value,expected_value",
[
([1, 2], 3, [1, 2, 3]),
([], 1, [1]),
([4], 5, [4, 5]),
],
)
def test_append_variable_activity(
initial_value: List[int], appended_value: int, expected_value: List[int], request: pytest.FixtureRequest
) -> None:
# Arrange
test_framework = TestFramework(
framework_type=TestFrameworkType.Fabric,
root_folder_path=request.fspath.dirname,
should_evaluate_child_pipelines=True,
)
pipeline = test_framework.repository.get_pipeline_by_name("append-variable-test")

# Act
activities = test_framework.evaluate_pipeline(
pipeline,
[
RunParameter(RunParameterType.Pipeline, "initial_value", initial_value),
RunParameter(RunParameterType.Pipeline, "appended_value", appended_value),
],
)

# Assert
activity: SetVariableActivity = next(activities)
assert activity.type == "SetVariable"
assert activity.value.value == initial_value

activity: AppendVariableActivity = next(activities)
assert activity.type == "AppendVariable"
assert activity.value.value == appended_value
66 changes: 66 additions & 0 deletions tests/unit/models/activities/test_append_variable_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import List

import pytest
from azure_data_factory_testing_framework.exceptions.variable_being_evaluated_does_not_exist_error import (
VariableBeingEvaluatedDoesNotExistError,
)
from azure_data_factory_testing_framework.models.activities.append_variable_activity import AppendVariableActivity
from azure_data_factory_testing_framework.models.data_factory_element import DataFactoryElement
from azure_data_factory_testing_framework.state import PipelineRunState, PipelineRunVariable
from azure_data_factory_testing_framework.test_framework import TestFramework, TestFrameworkType


@pytest.mark.parametrize(
"initial_value,appended_value,expected_value",
[
([1, 2], 3, [1, 2, 3]),
([], 1, [1]),
([4], 5, [4, 5]),
],
)
def test_when_int_variable_appended_then_state_variable_should_be_set(
initial_value: List[int], appended_value: int, expected_value: List[int]
) -> None:
# Arrange
TestFramework(framework_type=TestFrameworkType.Fabric)
variable_name = "TestVariable"
set_variable_activity = AppendVariableActivity(
name="AppendVariableActivity",
typeProperties={
"variableName": variable_name,
"value": DataFactoryElement(str(appended_value)),
},
)
state = PipelineRunState(
variables=[
PipelineRunVariable(name=variable_name, default_value=initial_value),
],
)

# Act
set_variable_activity.evaluate(state)

# Assert
variable = state.get_variable_by_name(variable_name)
assert variable.value == expected_value


def test_when_unknown_variable_evaluated_then_should_raise_exception() -> None:
# Arrange
TestFramework(framework_type=TestFrameworkType.Fabric)
variable_name = "TestVariable"
set_variable_activity = AppendVariableActivity(
name="AppendVariableActivity",
typeProperties={
"variableName": variable_name,
"value": DataFactoryElement("TestValue"),
},
)
state = PipelineRunState()

# Act
with pytest.raises(VariableBeingEvaluatedDoesNotExistError) as exception_info:
set_variable_activity.evaluate(state)

# Assert
assert exception_info.value.args[0] == "Variable being evaluated does not exist: TestVariable"

0 comments on commit 60f6d1b

Please sign in to comment.