Skip to content

Commit

Permalink
fix: if parameter of child_pipeline is not supplied, use defaultValue…
Browse files Browse the repository at this point in the history
… is available
  • Loading branch information
arjendev committed Dec 1, 2023
1 parent 36b5592 commit 6c78027
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 12 deletions.
10 changes: 10 additions & 0 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,37 @@


def _get_activity_from_activity_data(activity_data: dict) -> Activity:
type_properties = activity_data["typeProperties"]
if activity_data["type"] == "SetVariable":
return SetVariableActivity(**activity_data)
if activity_data["type"] == "AppendVariable":
return AppendVariableActivity(**activity_data)
elif activity_data["type"] == "Until":
activities = _get_activity_from_activities_data(activity_data["typeProperties"]["activities"])
activities = _get_activity_from_activities_data(type_properties["activities"])
return UntilActivity(activities=activities, **activity_data)
elif activity_data["type"] == "ExecutePipeline":
return ExecutePipelineActivity(**activity_data)
elif activity_data["type"] == "IfCondition":
if_true_activities = _get_activity_from_activities_data(activity_data["typeProperties"]["ifTrueActivities"])
if_false_activities = _get_activity_from_activities_data(activity_data["typeProperties"]["ifFalseActivities"])
if_true_activities = (
_get_activity_from_activities_data(type_properties["ifTrueActivities"])
if "ifTrueActivities" in type_properties
else []
)
if_false_activities = (
_get_activity_from_activities_data(type_properties["ifFalseActivities"])
if "ifFalseActivities" in type_properties
else []
)
return IfConditionActivity(
if_true_activities=if_true_activities, if_false_activities=if_false_activities, **activity_data
)
elif activity_data["type"] == "ForEach":
child_activities = _get_activity_from_activities_data(activity_data["typeProperties"]["activities"])
child_activities = _get_activity_from_activities_data(type_properties["activities"])
return ForEachActivity(activities=child_activities, **activity_data)
elif activity_data["type"] == "Switch":
default_activities = _get_activity_from_activities_data(activity_data["typeProperties"]["defaultActivities"])
default_activities = _get_activity_from_activities_data(type_properties["defaultActivities"])
cases_activities = {}
for case in activity_data["typeProperties"]["cases"]:
for case in type_properties["cases"]:
case_value = case["value"]
activities = case["activities"]
cases_activities[case_value] = _get_activity_from_activities_data(activities)
Expand Down
15 changes: 13 additions & 2 deletions src/azure_data_factory_testing_framework/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,30 @@ def get_activity_by_name(self, name: str) -> Activity:

raise ActivityNotFoundError(f"Activity with name {name} not found")

def validate_parameters(self, parameters: List[RunParameter]) -> None:
def validate_and_append_default_parameters(self, parameters: List[RunParameter]) -> List[RunParameter]:
"""Validate that all parameters are provided and that no duplicate parameters are provided.
Args:
parameters: List of parameters.
"""
# Check if all parameters are provided
for pipeline_parameter_name, _ in self.parameters.items():
run_parameters = parameters
for pipeline_parameter_name, pipeline_parameter in self.parameters.items():
found = False
for parameter in parameters:
if pipeline_parameter_name == parameter.name and parameter.type == RunParameterType.Pipeline:
found = True
break

if not found:
if "defaultValue" in pipeline_parameter:
run_parameters.append(
RunParameter(
RunParameterType.Pipeline, pipeline_parameter_name, pipeline_parameter["defaultValue"]
)
)
continue

raise ValueError(
f"Parameter with name '{pipeline_parameter_name}' and type 'RunParameterType.Pipeline' not found in pipeline '{self.name}'",
)
Expand All @@ -63,6 +72,8 @@ def validate_parameters(self, parameters: List[RunParameter]) -> None:
f"Duplicate parameter with name '{parameter.name}' and type '{parameter.type}' found in pipeline '{self.name}'",
)

return parameters

def get_run_variables(self) -> List[PipelineRunVariable]:
"""Get the run variables for the pipeline. This can be used to generate the instance variables for a pipeline run."""
run_variables = []
Expand Down
2 changes: 1 addition & 1 deletion src/azure_data_factory_testing_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def evaluate_pipeline(self, pipeline: Pipeline, parameters: List[RunParameter])
Returns:
A list of evaluated pipelines, which can be more than 1 due to possible child activities.
"""
pipeline.validate_parameters(parameters)
parameters = pipeline.validate_and_append_default_parameters(parameters)
state = PipelineRunState(parameters, pipeline.get_run_variables())
return self.evaluate_activities(pipeline.activities, state)

Expand Down
19 changes: 16 additions & 3 deletions tests/unit/models/pipelines/test_pipeline_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,30 @@ def test_when_validate_parameters_is_accurate_should_pass() -> None:
"pipelineParameterName2": {
"type": "String",
},
"pipelineParameterName3": {
"type": "String",
"defaultValue": "pipelineParameterValue3",
},
},
)

# Act
pipeline.validate_parameters(
parameters = pipeline.validate_and_append_default_parameters(
[
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
RunParameter(RunParameterType.Pipeline, "pipelineParameterName2", "pipelineParameterValue2"),
],
)

# Assert
assert len(parameters) == 3
assert parameters[0].name == "pipelineParameterName"
assert parameters[0].value == "pipelineParameterValue"
assert parameters[1].name == "pipelineParameterName2"
assert parameters[1].value == "pipelineParameterValue2"
assert parameters[2].name == "pipelineParameterName3"
assert parameters[2].value == "pipelineParameterValue3"


def test_when_validate_parameters_is_missing_run_parameter_should_throw_error() -> None:
# Arrange
Expand All @@ -46,7 +59,7 @@ def test_when_validate_parameters_is_missing_run_parameter_should_throw_error()

# Act
with pytest.raises(ValueError) as exception_info:
pipeline.validate_parameters(
pipeline.validate_and_append_default_parameters(
[
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
],
Expand Down Expand Up @@ -77,7 +90,7 @@ def test_when_duplicate_parameters_supplied_should_throw_error() -> None:

# Act
with pytest.raises(ValueError) as exception_info:
pipeline.validate_parameters(
pipeline.validate_and_append_default_parameters(
[
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
Expand Down

0 comments on commit 6c78027

Please sign in to comment.