-
Notifications
You must be signed in to change notification settings - Fork 6
/
caching.py
47 lines (39 loc) · 1.13 KB
/
caching.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
pod = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": [{"app": "redis"}],
"name": "redis",
},
"spec": {
"containers": [
{
"image": "redis",
"name": "redis",
}
]
},
}
@func_to_container_op
def print_op(msg: str):
"""Print message"""
print(msg)
@dsl.pipeline(name="sample pipeline", description="Shows how to use func_to_container_op")
def pipeline():
# Disabling caching of ContainerOP
add_task = print_op("Hello world!")
add_task.execution_options.caching_strategy.max_cache_staleness = "P30D"
# Disabling caching of ResourceOp
dsl.ResourceOp(
name="Create pos",
action="create",
k8s_resource=pod,
success_condition="status.phase=Running",
failure_condition="status.phase=Failed",
).add_pod_annotation("pipelines.kubeflow.org/max_cache_staleness", "P0D")
if __name__ == "__main__":
# Compile the pipeline
kfp.compiler.Compiler.compile(pipeline, "pipeline.yaml")