Skip to content

Commit

Permalink
feat: fail activity
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Nov 24, 2023
1 parent 60f6d1b commit 6ee41d3
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ def _evaluate_expressions(

self._evaluate_expressions(item, state, visited, types_to_ignore)

def set_result(self, result: DependencyCondition, output: Any) -> None: # noqa: ANN401
def set_result(self, result: DependencyCondition, output: Any = None) -> None: # noqa: ANN401
self.status = result
self.output = output
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Any

from azure_data_factory_testing_framework.models.activities.control_activity import ControlActivity
from azure_data_factory_testing_framework.state import PipelineRunState
from azure_data_factory_testing_framework.state.dependency_condition import DependencyCondition


class FailActivity(ControlActivity):
def __init__(self, **kwargs: Any) -> None: # noqa: ANN401
"""This is the class that represents the Fail activity in the pipeline.
Args:
**kwargs: FailActivity properties coming directly from the json representation of the activity.
"""
kwargs["type"] = "Fail"

super(ControlActivity, self).__init__(**kwargs)

def evaluate(self, state: PipelineRunState) -> "FailActivity":
super(ControlActivity, self).evaluate(state)

self.set_result(DependencyCondition.FAILED)

return self
31 changes: 22 additions & 9 deletions src/azure_data_factory_testing_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from azure_data_factory_testing_framework.models.activities.execute_pipeline_activity import (
ExecutePipelineActivity,
)
from azure_data_factory_testing_framework.models.activities.fail_activity import FailActivity
from azure_data_factory_testing_framework.models.activities.for_each_activity import ForEachActivity
from azure_data_factory_testing_framework.models.activities.if_condition_activity import (
IfConditionActivity,
Expand Down Expand Up @@ -112,6 +113,7 @@ def evaluate_activities(self, activities: List[Activity], state: PipelineRunStat
Returns:
A list of evaluated pipelines, which can be more than 1 due to possible child activities.
"""
fail_activity_evaluated = False
while len(state.scoped_pipeline_activity_results) != len(activities):
any_activity_evaluated = False
for activity in filter(
Expand All @@ -125,30 +127,41 @@ def evaluate_activities(self, activities: List[Activity], state: PipelineRunStat
):
yield evaluated_activity

if isinstance(activity, FailActivity):
fail_activity_evaluated = True
break

any_activity_evaluated = True
state.add_activity_result(activity.name, activity.status, activity.output)

# Check if there are any child activities to evaluate
if self._is_iteration_activity(activity):
activities_iterator = []
if isinstance(activity, ExecutePipelineActivity) and self.should_evaluate_child_pipelines:
execute_pipeline_activity: ExecutePipelineActivity = activity
pipeline = self.repository.get_pipeline_by_name(
execute_pipeline_activity.type_properties["pipeline"]["referenceName"],
)

# Evaluate the pipeline with its own scope
for child_activity in self.evaluate_pipeline(
activities_iterator = self.evaluate_pipeline(
pipeline,
activity.get_child_run_parameters(state),
):
yield child_activity
)

if isinstance(activity, ControlActivity):
if not isinstance(activity, ExecutePipelineActivity) and isinstance(activity, ControlActivity):
control_activity: ControlActivity = activity
for child_activity in control_activity.evaluate_control_activities(
activities_iterator = control_activity.evaluate_control_activities(
state,
self.evaluate_activities,
):
yield child_activity
)

for child_activity in activities_iterator:
yield child_activity
if isinstance(child_activity, FailActivity):
fail_activity_evaluated = True
break

if fail_activity_evaluated:
break

if not any_activity_evaluated:
raise PipelineActivitiesCircularDependencyError()
Expand Down
54 changes: 54 additions & 0 deletions tests/test_test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from azure_data_factory_testing_framework.exceptions.pipeline_activities_circular_dependency_error import (
PipelineActivitiesCircularDependencyError,
)
from azure_data_factory_testing_framework.models.activities.fail_activity import FailActivity
from azure_data_factory_testing_framework.models.activities.set_variable_activity import SetVariableActivity
from azure_data_factory_testing_framework.models.data_factory_element import DataFactoryElement
from azure_data_factory_testing_framework.models.pipeline import Pipeline
Expand Down Expand Up @@ -55,3 +56,56 @@ def test_circular_dependency_between_activities_should_throw_error() -> None:
# Act & Assert
with pytest.raises(PipelineActivitiesCircularDependencyError):
next(test_framework.evaluate_pipeline(pipeline, []))


def test_fail_activity_halts_further_evaluation() -> None:
# Arrange
test_framework = TestFramework(TestFrameworkType.Fabric)
pipeline = Pipeline(
name="main",
parameters={},
variables={},
activities=[
SetVariableActivity(
name="setVariable1",
variable_name="variable",
typeProperties={
"variableName": "variable",
"value": DataFactoryElement("'1'"),
},
dependsOn=[
{
"activity": "failActivity",
"dependencyConditions": [
"Succeeded",
],
}
],
),
FailActivity(
name="failActivity",
typeProperties={
"message": DataFactoryElement("concat('Error code: ', '500')"),
"errorCode": "500",
},
dependsOn=[],
),
],
)
test_framework.repository.pipelines.append(pipeline)

# Act
activities = test_framework.evaluate_pipeline(pipeline, [])

# Assert
activity = next(activities)
assert activity is not None
assert activity.name == "failActivity"
assert activity.type == "Fail"
assert activity.status == "Failed"
assert activity.type_properties["message"].value == "Error code: 500"
assert activity.type_properties["errorCode"] == "500"

# Assert that there are no more activities
with pytest.raises(StopIteration):
next(activities)
28 changes: 28 additions & 0 deletions tests/unit/models/activities/test_fail_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from azure_data_factory_testing_framework.models.activities.fail_activity import FailActivity
from azure_data_factory_testing_framework.models.data_factory_element import DataFactoryElement
from azure_data_factory_testing_framework.state import PipelineRunState
from azure_data_factory_testing_framework.state.dependency_condition import DependencyCondition


def test_fail_activity_evaluates_to_failed_result() -> None:
# Arrange
fail_activity = FailActivity(
name="FailActivity",
typeProperties={
"message": DataFactoryElement("concat('Error code: ', '500')"),
"errorCode": "500",
},
depends_on=[],
)

state = PipelineRunState()

# Act
activity = fail_activity.evaluate(state)

# Assert
assert activity is not None
assert activity.name == "FailActivity"
assert activity.status == DependencyCondition.FAILED
assert activity.type_properties["message"].value == "Error code: 500"
assert activity.type_properties["errorCode"] == "500"

0 comments on commit 6ee41d3

Please sign in to comment.