Rahul Mehta
09/22/2022, 1:57 AM[1,2,3]
, can I pass each item to a map task along with some other fixed inputs that are common across all tasks). All the examples in the docs only include tasks w/ a single argument
Relatedly, is there a way to use a workflow as part of a map task (ie. map a sub-workflow over a sequence of inputs)
Edit: guess not
Map tasks only accept python function tasks with 0 or 1 inputs
Sanjay Chouhan
09/22/2022, 6:38 AMadmin:
# For GRPC endpoints you might want to use dns:///flyte.myexample.com
endpoint: dns:///a4ad903c61##################.<http://us-west-1.elb.amazonaws.com:80|us-west-1.elb.amazonaws.com:80>
authType: Pkce
insecure: true
logger:
show-source: true
level: 0
storage:
connection:
access-key: minio
auth-type: accesskey
disable-ssl: true
endpoint: <http://a093eb##############.us-west-1.elb.amazonaws.com:9001/>
region: us-east-1
secret-key: miniostorage
type: minio
container: "my-s3-bucket"
enable-multicontainer: true
Why it's checking in localhost when I have mentioned the AWS ingress URL?Sathish kumar Venkatesan
09/22/2022, 10:51 AMfrom flytekit import kwtypes, workflow
from flytekitplugins.snowflake import SnowflakeConfig, SnowflakeTask
snowflake_task_no_io = SnowflakeTask(
name="sql.snowflake.no_io",
inputs={},
query_template="select * from TEST_DEV.IDENTITY.TEST_123 ;",
output_schema_type=None,
task_config=SnowflakeConfig(
account="xxxxx.us-east-1",
database="TEST_DEV",
schema="IDENTITY",
warehouse="DEMO_WH",
),
)
@workflow
def no_io_wf():
return snowflake_task_no_io()
KS Tarun
09/22/2022, 12:08 PM@task(container_image="{{.image.trainer.fqn }}:{{.image.trainer.version}}")
My sandbox.config looks like this:
[images]
trainer = <http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>
predictor = <http://ghcr.io/flyteorg/flytecookbook:pima_diabetes-d4838f0f5e39a21f845a93b9e3375a675bd75eaa|ghcr.io/flyteorg/flytecookbook:pima_diabetes-d4838f0f5e39a21f845a93b9e3375a675bd75eaa>
I'm getting this error:
raise AssertionError(f"Image Config with name {name} not hound in the configuration")
AssertionError:Image Config with name trainer not found in the configuration
Any suggestions about this ?Katrina P
09/22/2022, 9:02 PMseunggs
09/23/2022, 12:24 AMflyte-user-role
(which has full s3 access) attached as an annotation still gives me PutObject access denied error?seunggs
09/23/2022, 12:25 AMseunggs
09/23/2022, 12:25 AMRobin Kahlow
09/23/2022, 11:23 AMRobin Kahlow
09/23/2022, 11:37 AMarg: ""
), rather than storing an empty string. That causes it to use the default of the field rather than using an empty string.Rupsha Chaudhuri
09/23/2022, 4:41 PMobject [my_domainctw5x2vdnkvd-n0-0] terminated in the background, manually
Ankit Goyal
09/23/2022, 8:07 PMseunggs
09/23/2022, 10:38 PMerror: "size of name exceeded length 20 : my-project.workflows.04b2ed0b-434b-4d7b-9220-90606fdc2afc.test_wf_1"
. What’s strange is that this seems to run fine from the flyte dashboard ui?seunggs
09/23/2022, 10:40 PMseunggs
09/23/2022, 10:43 PMseunggs
09/23/2022, 10:44 PM{
project: "shelly-robotics-bidepal-robot",
domain: "development",
name: "my-project.workflows.04b2ed0b-434b-4d7b-9220-90606fdc2afc.test_wf_1",
spec: {
launch_plan: {
project: "shelly-robotics-bidepal-robot",
domain: "development",
name: "my-project.workflows.04b2ed0b-434b-4d7b-9220-90606fdc2afc.test_wf_1",
version: "0.0.1",
},
inputs: {
x: 1,
},
},
};
Maarten T
09/26/2022, 8:12 AMfloodcast_model = ContainerTask(
name="floodcast-docker",
input_data_dir="/input",
output_data_dir="/output",
inputs=kwtypes(rainfall=List[FlyteFile]),
outputs=kwtypes(floodmaps=List[FlyteFile]),
image="<myimage>",
command=["/env/bin/python", "run.py"])
Sebastian
09/26/2022, 3:03 PMflytectl update launchplan ... -- version $GITSHA --activate >> --archive-old <<
.
This seems like a basic use case but I have not found any resources on it. Please let me know if I overlooked anythingRobert Everson
09/26/2022, 7:20 PMEventSinkError
that on some workflows appears to occur occasionally, but on others happens all the time? We moved to a new k8s cluster for our flyteadmin instance (and from 3 instances to 1) and the timing of that seems to mostly line up with this error starting, but after checking the flyteadmin logs, I can see propeller sending a CreateWorkflowEvent
and it getting successfully registered, but propeller will still throw an
EventRecordingFailed: failed to record node event, caused by: EventSinkError: Error sending event, caused by [rpc error: code = Unavailable desc = upstream connect error or disconnect/reset before headers. reset reason: protocol error]
Sebastian
09/27/2022, 5:42 AMlaunch_plan.LaunchPlan.get_or_create(
workflow=wf,
name="my_wf_prod", # prod specific
schedule=CronSchedule(schedule="1 0 * * *"),
default_inputs={
"execution_date": datetime.now(), # aside: this does not work. How do I provide an execution date?
},
fixed_inputs={
"out_path": consts.OUT_S3_PATH_PROD # prod specific vars
},
)
there is also a copy of this launch plan for the dev env, so that I can plug in the dev env vars. But this is silly and requires two copies. How can I parameterize workflows with something like env vars so I can vary them between prod and dev?fei sun
09/27/2022, 6:51 AMflytectl demo start
finished successfully, it’s able to open flyte webui which is localhost:30080, but pyflyte remote run failed with the following error message:fei.sun@MacBook-Pro cookbook % pyflyte run --remote core/flyte_basics/hello_world.py my_wf
E0927 14:46:18.763713000 4366484864 <http://ssl_transport_security.cc:1495]|ssl_transport_security.cc:1495]> Handshake failed with fatal error SSL_ERROR_SSL: error:100000f7:SSL routines:OPENSSL_internal:WRONG_VERSION_NUMBER.
E0927 14:46:18.765990000 4366484864 <http://ssl_transport_security.cc:1495]|ssl_transport_security.cc:1495]> Handshake failed with fatal error SSL_ERROR_SSL: error:100000f7:SSL routines:OPENSSL_internal:WRONG_VERSION_NUMBER.
{"asctime": "2022-09-27 14:46:18,781", "name": "flytekit.cli", "levelname": "ERROR", "message": "Non-auth RPC error <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses\"\n\tdebug_error_string = \"{\"created\":\"@1664261178.780964000\",\"description\":\"Failed to pick subchannel\",\"file\":\"src/core/ext/filters/client_channel/client_channel.cc\",\"file_line\":3261,\"referenced_errors\":[{\"created\":\"@1664261178.780963000\",\"description\":\"failed to connect to all addresses\",\"file\":\"src/core/lib/transport/error_utils.cc\",\"file_line\":167,\"grpc_status\":14}]}\"\n>, sleeping 200ms and retrying"}
{"asctime": "2022-09-27 14:46:18,986", "name": "flytekit.cli", "levelname": "ERROR", "message": "Non-auth RPC error <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses\"\n\tdebug_error_string = \"{\"created\":\"@1664261178.985906000\",\"description\":\"Failed to pick subchannel\",\"file\":\"src/core/ext/filters/client_channel/client_channel.cc\",\"file_line\":3261,\"referenced_errors\":[{\"created\":\"@1664261178.985905000\",\"description\":\"failed to connect to all addresses\",\"file\":\"src/core/lib/transport/error_utils.cc\",\"file_line\":167,\"grpc_status\":14}]}\"\n>, sleeping 400ms and retrying"}
Traceback (most recent call last):
File "/Users/fei.sun/Library/Python/3.8/bin//pyflyte", line 8, in <module>
sys.exit(main())
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/flytekit/clis/sdk_in_container/run.py", line 539, in _run
remote_entity = remote.register_script(
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/flytekit/remote/remote.py", line 596, in register_script
upload_location, md5_bytes = fast_register_single_script(
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/flytekit/tools/script_mode.py", line 113, in fast_register_single_script
upload_location = create_upload_location_fn(content_md5=md5)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/flytekit/clients/friendly.py", line 998, in get_upload_signed_url
return super(SynchronousFlyteClient, self).create_upload_location(
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/flytekit/clients/raw.py", line 41, in handler
return fn(*args, **kwargs)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/flytekit/clients/raw.py", line 854, in create_upload_location
return self._dataproxy_stub.CreateUploadLocation(create_upload_location_request, metadata=self._metadata)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/Users/fei.sun/Library/Python/3.8/lib/python/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1664261179.392327000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1664261179.392310000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
fei sun
09/27/2022, 6:55 AMTamis van der Laan
09/27/2022, 8:39 AMTamis van der Laan
09/27/2022, 8:40 AMTamis van der Laan
09/27/2022, 8:42 AMSamhita Alla
Hampus Rosvall
09/28/2022, 6:17 AMAnthony
09/28/2022, 8:31 AMWorkflow[flyte-anti-fraud-ml:development:app.workflow.main_flow] failed. RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: error file @[<s3://my-s3-bucket/metadata/propeller/flyte-anti-fraud-ml-development-f31c365f02c114639b00/n0/data/0/error.pb>] is too large [28775519] bytes, max allowed [10485760] bytes
I added max-output-size-bytes
params to the flyte-propeller-config
and wait to apply all changes before re-submitting a new task.
kubectl edit configmap -n flyte flyte-propeller-config
My propeller section of flyte-propeller-config
looks like:
core.yaml: |
manager:
pod-application: flytepropeller
pod-template-container-name: flytepropeller
pod-template-name: flytepropeller-template
propeller:
max-output-size-bytes: 52428800
downstream-eval-duration: 30s
enable-admin-launcher: true
leader-election:
enabled: true
lease-duration: 15s
lock-config-map:
name: propeller-leader
namespace: flyte
renew-deadline: 10s
retry-period: 2s
limit-namespace: all
max-workflow-retries: 3
metadata-prefix: metadata/propeller
metrics-prefix: flyte
prof-port: 10254
Task configuration has been set up via kubectl -n flyte edit cm flyte-admin-base-config
storage.yaml: |
storage:
type: minio
container: "my-s3-bucket"
stow:
kind: s3
config:
access_key_id: minio
auth_type: accesskey
secret_key: miniostorage
disable_ssl: true
endpoint: <http://minio.flyte.svc.cluster.local:9000>
region: us-east-1
signedUrl:
stowConfigOverride:
endpoint: <http://localhost:30084>
enable-multicontainer: false
limits:
maxDownloadMBs: 50
task_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 1
memory: 3000Mi
storage: 100Mi
limits:
cpu: 4
gpu: 1
memory: 3Gi
storage: 500Mi
Also changing maxDownloadMBs
didn’t change the situation
Changing cache max_size_mbs
in flyte-propeller-config
from 0 to some custom value also not working:
cache.yaml: |
cache:
max_size_mbs: 100
target_gc_percent: 70
I ty to change different time with different params but the error was arising during each new executions.
I saw that none of max-output-size-bytes
or max-workflow-retries
(changed from 30 --> 3) are passed to the workflow execution:
RuntimeExecutionError: max number of system retry attempts [31/30] exhausted...
error file @[<s3://my-s3-bucket/metadata/propeller/flyte-anti-fraud-ml-development-f31c365f02c114639b00/n0/data/0/error.pb>] is too large [28775519] bytes, max allowed [10485760] bytes...
Hereafter are my cli steps to create a new execution:
- kubectl -n flyte edit cm flyte-admin-base-config
- kubectl edit configmap -n flyte flyte-propeller-config
- flytectl get task-resource-attribute -p flyteexamples -d development
- flytectl update project -p flyte-anti-fraud-ml -d development --storage.cache.max_size_mbs 100
- flytectl get launchplan --project flyte-anti-fraud-ml --domain development app.workflow.main_flow --latest --execFile exec_spec.yaml
- flytectl create execution --project flyte-anti-fraud-ml --domain development --execFile exec_spec.yaml
What additional steps I have to do to force flytectl to use my propeller changes and solve the problem of a max 10Mb size allowed for serialized uploads to flyte?Sanjay Chouhan
09/28/2022, 9:33 AMPadma Priya M
09/28/2022, 9:36 AM[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[ff43e19ed1617402a8c6-n0-0] terminated with exit code (1). Reason [Error]. Message:
ine 286, in _handle_annotated_task
_dispatch_execute(ctx, task_def, inputs, output_prefix)
File "/opt/venv/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 76, in _dispatch_execute
logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
Traceback (most recent call last):
File "/opt/venv/bin/pyflyte-fast-execute", line 8, in <module>
sys.exit(fast_execute_task_cmd())
File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 1130, in _call_
return self.main(*args, **kwargs)
File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/opt/venv/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 502, in fast_execute_task_cmd
subprocess.run(cmd, check=True)
File "/usr/local/lib/python3.8/subprocess.py", line 516, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['pyflyte-execute', '--inputs', '<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ff43e19ed1617402a8c6/n0/data/inputs.pb>', '--output-prefix', '<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ff43e19ed1617402a8c6/n0/data/0>', '--raw-output-data-prefix', '<s3://my-s3-bucket/qf/ff43e19ed1617402a8c6-n0-0>', '--checkpoint-path', '<s3://my-s3-bucket/qf/ff43e19ed1617402a8c6-n0-0/_flytecheckpoints>', '--prev-checkpoint', '""', '--dynamic-addl-distro', '<s3://my-s3-bucket/72/flytesnacks/development/LXFFQS6IOSM2OAH57AOZKM2SZA======/scriptmode.tar.gz>', '--dynamic-dest-dir', '/root', '--resolver', 'flytekit.core.python_auto_container.default_task_resolver', '--', 'task-module', 'flytekitplugins.papermill.task', 'task-name', '_dummy_task_func']' returned non-zero exit status 1.