Skip to content

Commit

Permalink
chore: add examples tests for synapse
Browse files Browse the repository at this point in the history
  • Loading branch information
jayachithra committed Aug 30, 2024
1 parent 36b20ca commit 425d017
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from data_factory_testing_framework.models.activities import Activity, ForEachActivity
from data_factory_testing_framework.state import (
PipelineRunState,
PipelineRunVariable,
RunParameter,
RunParameterType,
)
Expand All @@ -27,9 +26,6 @@ def test_list_blobs(pipeline: Pipeline) -> None:
# Arrange
activity = pipeline.get_activity_by_name("List Folders")
state = PipelineRunState(
variables=[
PipelineRunVariable(name="SourceContainerName", default_value="source"),
],
parameters=[
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
RunParameter(
Expand All @@ -55,9 +51,6 @@ def test_for_each(pipeline: Pipeline) -> None:
# Arrange
activity = pipeline.get_activity_by_name("For Each SourceFolder")
state = PipelineRunState(
variables=[
PipelineRunVariable(name="SourceContainerName", default_value="source"),
],
parameters=[
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
RunParameter(
Expand Down
12 changes: 12 additions & 0 deletions examples/synapse/copy_blobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copy Blobs

This is an example pipeline which intends to list all the blobs in a given container and copies these blobs to another container

![image](copy_blobs.png)

The pipeline has two activities:

1. **List folders**: Web activity to list all blobs in a container that has a given prefix
2. **For each activity**: Iterates over each item in the list returned above and executes the sub-activity on each item.

2.1. **Copy files to destination**: Copy activity which copies the blobs to a given destination.
Binary file added examples/synapse/copy_blobs/copy_blobs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
172 changes: 172 additions & 0 deletions examples/synapse/copy_blobs/pipeline/copy_blobs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
{
"name": "copy_blobs",
"properties": {
"activities": [
{
"name": "List Folders",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"method": "GET",
"headers": {
"x-ms-version": "2023-01-03"
},
"url": {
"value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net/',pipeline().parameters.SourceContainerName,'?restype=container&comp=list&prefix=',pipeline().parameters.SourceFolderPrefix,'&delimiter=$SourceBlobDelimiter')",
"type": "Expression"
},
"connectVia": {
"referenceName": "AutoResolveIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"authentication": {
"type": "MSI",
"resource": "https://storage.azure.com"
}
}
},
{
"name": "For Each SourceFolder",
"type": "ForEach",
"dependsOn": [
{
"activity": "List Folders",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@xpath(xml(activity('List Folders').output.Response),'/EnumerationResults/Blobs/BlobPrefix/Name/text()')",
"type": "Expression"
},
"activities": [
{
"name": "Copy files to Destination",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "ParquetSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFolderPath": {
"value": "@item()",
"type": "Expression"
},
"wildcardFileName": "*.parquet"
},
"formatSettings": {
"type": "ParquetReadSettings"
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings",
"copyBehavior": "FlattenHierarchy"
},
"formatSettings": {
"type": "ParquetWriteSettings"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "Binary",
"type": "DatasetReference",
"parameters": {
"ServiceURI": {
"value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net')",
"type": "Expression"
},
"ContainerName": {
"value": "@pipeline().parameters.SourceContainerName",
"type": "Expression"
},
"FolderName": {
"value": "@pipeline().parameters.SourceFolderPrefix",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "Binary",
"type": "DatasetReference",
"parameters": {
"ServiceURI": {
"value": "@concat('https://',pipeline().parameters.SinkStorageAccountName,'.blob.core.windows.net')",
"type": "Expression"
},
"ContainerName": {
"value": "@pipeline().parameters.SinkContainerName",
"type": "Expression"
},
"FolderName": {
"value": "@pipeline().parameters.SinkFolderName",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"parameters": {
"SourceContainerName": {
"type": "string"
},
"SourceFolderPrefix": {
"type": "string"
},
"SinkStorageAccountName": {
"type": "string"
},
"SinkContainerName": {
"type": "string"
},
"SinkFolderName": {
"type": "string"
},
"SourceStorageAccountName": {
"type": "string"
}
},
"folder": {
"name": "batch"
},
"annotations": []
}
}
75 changes: 75 additions & 0 deletions examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest
from data_factory_testing_framework import TestFramework, TestFrameworkType
from data_factory_testing_framework.state import (
DependencyCondition,
RunParameter,
RunParameterType,
)


def test_copy_blobs_pipeline(request: pytest.FixtureRequest) -> None:
# Arrange
test_framework = TestFramework(
framework_type=TestFrameworkType.DataFactory, root_folder_path=request.fspath.dirname
)
pipeline = test_framework.get_pipeline_by_name("copy_blobs")

# Act
activities = test_framework.evaluate_pipeline(
pipeline=pipeline,
parameters=[
RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorageaccount"),
RunParameter(RunParameterType.Pipeline, "SourceContainerName", "sourcecontainer"),
RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "sourcefolder"),
RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorageaccount"),
RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
],
)

# Assert
list_folder_activity = next(activities)
assert list_folder_activity.name == "List Folders"
assert (
list_folder_activity.type_properties["url"].result
== "https://sourcestorageaccount.blob.core.windows.net/sourcecontainer?restype=container&comp=list&prefix=sourcefolder&delimiter=$SourceBlobDelimiter"
)
assert list_folder_activity.type_properties["method"] == "GET"
list_folder_activity.set_result(
result=DependencyCondition.SUCCEEDED,
output={
"Response": """
<EnumerationResults ServiceEndpoint="http://myaccount.blob.core.windows.net/" ContainerName="mycontainer">
<Prefix>testfolder</Prefix>
<Delimiter>$SourceBlobDelimiter</Delimiter>
<Blobs>
<BlobPrefix>
<Name>testfolder_1/$SourceBlobDelimiter</Name>
</BlobPrefix>
<BlobPrefix>
<Name>testfolder_2/$SourceBlobDelimiter</Name>
</BlobPrefix>
</Blobs>
</EnumerationResults>
"""
},
)

copy_activity = next(activities)

assert copy_activity.name == "Copy files to Destination"
assert copy_activity.type == "Copy"
assert (
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
== "testfolder_1/$SourceBlobDelimiter"
)

copy_activity = next(activities)
assert copy_activity.name == "Copy files to Destination"
assert copy_activity.type == "Copy"
assert (
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
== "testfolder_2/$SourceBlobDelimiter"
)

pytest.raises(StopIteration, lambda: next(activities))
Loading

0 comments on commit 425d017

Please sign in to comment.