Hello guys! I need some help. We are doing some testing with flyte in the AWS environment with ALB...
l

Leonardo Almeida

over 3 years ago
Hello guys! I need some help. We are doing some testing with flyte in the AWS environment with ALB. But I'm having trouble trying to run the example code:
import typing
import pandas as pd
import numpy as np

from flytekit import task, workflow

@task
def generate_normal_df(n:int, mean: float, sigma: float) -> pd.DataFrame:
    return pd.DataFrame({"numbers": np.random.normal(mean, sigma,size=n)})

@task
def compute_stats(df: pd.DataFrame) -> typing.Tuple[float, float]:
    return float(df["numbers"].mean()), float(df["numbers"].std())

@workflow
def wf(n: int = 200, mean: float = 0.0, sigma: float = 1.0) -> typing.Tuple[float, float]:
    return compute_stats(df=generate_normal_df(n=n, mean=mean, sigma=sigma))
When I run the code using the local cluster using flytectl demo start (k3s) it works. But in production environment using EKS and ALB does not work.
pyflyte run --remote example.py wf --n 500 --mean 42 --sigma 2                                                                                    ξ‚² SIGINT(2) ↡ ξ‚² 10055 ξ‚² 17:06:22 ο€—
{"asctime": "2022-05-06 17:06:31,287", "name": "flytekit.cli", "levelname": "ERROR", "message": "Non-auth RPC error <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses\"\n\tdebug_error_string = \"{\"created\":\"@1651867591.287595583\",\"description\":\"Failed to pick subchannel\",\"file\":\"src/core/ext/filters/client_channel/client_channel.cc\",\"file_line\":3128,\"referenced_errors\":[{\"created\":\"@1651867591.287594125\",\"description\":\"failed to connect to all addresses\",\"file\":\"src/core/lib/transport/error_utils.cc\",\"file_line\":163,\"grpc_status\":14}]}\"\n>, sleeping 200ms and retrying"}
{"asctime": "2022-05-06 17:06:32,274", "name": "flytekit.cli", "levelname": "ERROR", "message": "Non-auth RPC error <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses\"\n\tdebug_error_string = \"{\"created\":\"@1651867592.274550785\",\"description\":\"Failed to pick subchannel\",\"file\":\"src/core/ext/filters/client_channel/client_channel.cc\",\"file_line\":3128,\"referenced_errors\":[{\"created\":\"@1651867592.274550285\",\"description\":\"failed to connect to all addresses\",\"file\":\"src/core/lib/transport/error_utils.cc\",\"file_line\":163,\"grpc_status\":14}]}\"\n>, sleeping 400ms and retrying"}
Traceback (most recent call last):
  File "/usr/local/bin/pyflyte", line 8, in <module>
    sys.exit(main())
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/lalmeida/.local/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/flytekit/clis/sdk_in_container/run.py", line 471, in _run
    wf = remote.register_script(
  File "/usr/local/lib/python3.8/dist-packages/flytekit/remote/remote.py", line 536, in register_script
    upload_location, md5_bytes = fast_register_single_script(
  File "/usr/local/lib/python3.8/dist-packages/flytekit/tools/script_mode.py", line 117, in fast_register_single_script
    upload_location = create_upload_location_fn(content_md5=md5)
  File "/usr/local/lib/python3.8/dist-packages/flytekit/clients/friendly.py", line 998, in get_upload_signed_url
    return super(SynchronousFlyteClient, self).create_upload_location(
  File "/usr/local/lib/python3.8/dist-packages/flytekit/clients/raw.py", line 40, in handler
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/flytekit/clients/raw.py", line 834, in create_upload_location
    return self._dataproxy_stub.CreateUploadLocation(create_upload_location_request, metadata=self._metadata)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1651867592.675517306","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1651867592.675516568","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
Do you have any ideas? I'm using helm with values-eks.yaml
General question about python function task (and workflow) naming and loading conventions: We’re hop...
y

Yee

over 3 years ago
General question about python function task (and workflow) naming and loading conventions: We’re hoping to clean up how this works a bit. Today, they are always determined basically by the path from where the python process was started, to the location of the .py file. For example, if you have
:~/dev/my_repo $ tree .
.
β”œβ”€β”€ Dockerfile
└── src
    β”œβ”€β”€ __init__.py
    β”œβ”€β”€ __pycache__
    └── parent
        β”œβ”€β”€ __init__.py
        β”œβ”€β”€ __pycache__
        └── child
            β”œβ”€β”€ __init__.py
            β”œβ”€β”€ __pycache__
            └── hello_world.py
the
__module__
attribute is determined by the path:
:~/dev/my_repo $ ipython
In [1]: from src.parent.child.hello_world import my_wf

In [2]: my_wf.__module__
Out[2]: 'src.parent.child.hello_world'


:~/dev/my_repo $ cd src/parent
:~/dev/my_repo/src/parent $ ipython
In [1]: from child.hello_world import my_wf

In [2]: my_wf.__module__
Out[2]: 'child.hello_world'
Note the
__module__
in the second example is shortened. This module name is used for the name of the task, and to load the task at run-time in the container (so your image has to be built correctly of course). We are thinking of changing the behaviour so that, given a task (or launch plan or workflow), we walk up the file system until we find a folder that doesn’t have an
__init__.py
file, and basically assume that the python process was started from there. In the above example, the name in the latter would be the same as in the first example. Obviously this will not work for namespace packages, but we doubt anyone uses those currently for Flyte workflows. Does this sound unreasonable to anyone?