dazzling-apple-56709
02/14/2025, 1:46 AMAZURE_STORAGE_CONNECTION_STRING
environment variable to work, otherwise it will throw this error
flytekit.exceptions.system.FlyteDownloadDataException: SYSTEM:DownloadDataError: error=Failed to get data from <abfs://data-pipeline-alpha/flytesnacks/development/BLYABWOHQLKRP6B5JW2Y7XMYCQ======/fast31e63d3e22ecaf0c9d9eea81c81f9d59.tar.gz> to ./ (recursive=False).
I made it work by defining a pod template
apiVersion: v1
kind: PodTemplate
metadata:
name: default-template
template:
spec:
tolerations:
- key: "exclusive"
operator: "Equal"
value: "only"
effect: "NoSchedule"
nodeSelector:
exclusive: flyte
containers:
- name: default
image: python:3.10-slim
env:
- name: AZURE_STORAGE_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: flyte-pod-secret
key: AZURE_STORAGE_CONNECTION_STRING
hostNetwork: false
---
apiVersion: v1
kind: Secret
metadata:
name: flyte-pod-secret
type: Opaque
data:
AZURE_STORAGE_CONNECTION_STRING: __AZURE_STORAGE_CONNECTION_STRING__ # this will be substitute in CI/CD
and deploy it to the domain kubectl apply -f template.yaml -n flytesnacks-development
It works with simple workflow:
@task(
pod_template_name="default-template"
)
def hello_name(name: str) -> str:
return f"Hello, {name}!"
@workflow
def say_hello(name: str = 'nguyen'):
say = hello_name(name)
if __name__ == "__main__":
print(f"Running wf() {say_hello()}")
But when I use map task, it doesn't create the pod with the template above, which makes hello_name
fail (get_names
is fine though)
@task(
pod_template_name="default-template" <============ good
)
def get_names(name: str) -> List[str]:
results = [name + str(i) for i in range(5)]
return results
@task(
pod_template_name="default-template" <=========== isn't applied to pod
)
def hello_name(name: str) -> str:
return f"Hello, {name}!"
@workflow
def say_hello(name: str = 'nguyen'):
names = get_names(name=name)
hello_name_map = functools.partial(hello_name)
map_task(
hello_name_map,
metadata=TaskMetadata(cache=True, cache_version="0.1", retries=5),
concurrency=2,
)(name=names)
if __name__ == "__main__":
print(f"Running wf() {say_hello()}")
Any advice?dazzling-apple-56709
02/14/2025, 1:59 AMmetadata=TaskMetadata(cache=True, cache_version="0.1", retries=5),
dazzling-apple-56709
02/14/2025, 2:00 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
dazzling-apple-56709
02/14/2025, 5:31 PMdazzling-apple-56709
02/14/2025, 5:32 PM