https://flyte.org logo
#ask-the-community
Title
# ask-the-community
t

Taeef Najib

03/05/2023, 9:30 PM
Hi guys, I'm using Flyte on a project that predicts glass type using
KNeighborsClassifier
It seems Flyte doesn't accept
KNeighborsClassifier
as the type of my function. What would be the right type hint for this? I get the following error:
Copy code
[4/4] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[sd4h34hqd15wyc3wy32t-n0-3] terminated with exit code (137). Reason [OOMKilled]. Message: 

{"asctime": "2023-03-05 20:24:43,219", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <class 'sklearn.neighbors._classification.KNeighborsClassifier'> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
2023/03/05 20:24:43 WARNING mlflow.utils.git_utils: Failed to import Git (the Git executable is probably not on your PATH), so Git SHA is not available. Error: Failed to initialize: Bad git executable.
The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

This initial warning can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|none|n|0: for no warning or exception
    - warn|w|warning|1: for a printed warning
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet

{"asctime": "2023-03-05 20:24:43,424", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <class 'sklearn.neighbors._classification.KNeighborsClassifier'> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
.
Can you please point me to the right direction?
k

Kevin Su

03/05/2023, 10:44 PM
Flyte, By default, will serialize the output (int, str, list, pd.dataframe) to
Flyte Literal
(protobuf message) if the type is unrecognized by Flyte, flytekit will serialize it to pickle. you could register a custom type transformer yourself. here is an example
t

Taeef Najib

03/06/2023, 1:19 AM
Thanks for your response. Can I create a custom class in my script and use it as a custom type? (As shown here: https://docs.flyte.org/projects/cookbook/en/latest/auto/core/extend_flyte/custom_types.html#advanced-custom-types) Was also wondering if I could simply use
ClassifierMixin
,
BaseEstimator
or
Any
or maybe
None
?
s

Samhita Alla

03/06/2023, 4:22 AM
Can I create a custom class in my script and use it as a custom type?
Yes, you can! You'll need to have the methods specified in the guide.
Was also wondering if I could simply use
ClassifierMixin
,
BaseEstimator
or
Any
or maybe
None
?
You can. The error you're seeing is OOMKilled, right? In that case, please increase the memory you're allocating to your task. The unsupported one is a warning. You can either register a custom type (which is preferred) or leave it as is.
n

Niels Bantilan

03/06/2023, 6:31 PM
interesting… shouldn’t
KNeighborsClassifier
be picked up by the
SklearnTypeTransformer
? https://github.com/flyteorg/flytekit/blob/master/flytekit/extras/sklearn/native.py#L72
t

Taeef Najib

03/06/2023, 9:47 PM
Thanks guys
n

Niels Bantilan

03/06/2023, 11:53 PM
which version of flytekit are you using @Taeef Najib?
t

Taeef Najib

03/07/2023, 5:31 PM
@Niels Bantilan it’s 1.2.7
n

Niels Bantilan

03/07/2023, 5:39 PM
kk, are you able to update or do you need to stick with 1.2.7?
(1.2.7) won’t have the support for sklearn estimators
t

Taeef Najib

03/07/2023, 5:41 PM
We can upgrade to a later version. Will that support KNeighborsClassifier as a type hint?
n

Niels Bantilan

03/07/2023, 5:45 PM
yep! any version >= 1.3.2 should support
KNeighborsClassifier
t

Taeef Najib

03/07/2023, 5:46 PM
Thanks. I'll try upgrading then
@Niels Bantilan After upgrading it to 1.4.0, I'm getting this error:
Copy code
Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte", line 5, in <module>
    from flytekit.clis.sdk_in_container.pyflyte import main
  File "/opt/venv/lib/python3.10/site-packages/flytekit/__init__.py", line 202, in <module>
    from flytekit.core.base_sql_task import SQLTask
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/base_sql_task.py", line 4, in <module>
    from flytekit.core.base_task import PythonTask, TaskMetadata
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/base_task.py", line 27, in <module>
    from flytekit.core.context_manager import (
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/context_manager.py", line 30, in <module>
    from flytekit.clients import friendly as friendly_client  # noqa
  File "/opt/venv/lib/python3.10/site-packages/flytekit/clients/friendly.py", line 4, in <module>
    from flyteidl.admin import common_pb2 as _common_pb2
  File "/opt/venv/lib/python3.10/site-packages/flyteidl/admin/common_pb2.py", line 5, in <module>
    from google.protobuf.internal import builder as _builder
ImportError: cannot import name 'builder' from 'google.protobuf.internal' (/opt/venv/lib/python3.10/site-packages/google/protobuf/internal/__init__.py)
n

Niels Bantilan

03/08/2023, 1:53 PM
we had to yank 1.4.0… can you try installing 1.4.1?
t

Taeef Najib

03/08/2023, 2:51 PM
let me try
So we upgraded to 1.4.1 and got this error:
Copy code
[4/4] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[snbh2jiispjmvrt5faat-n0-3] terminated with exit code (1). Reason [Error]. Message: 
{"asctime": "2023-03-08 16:51:11,316", "name": "flytekit", "levelname": "WARNING", "message": "FlyteSchema is deprecated, use Structured Dataset instead."}

Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte-execute", line 8, in <module>
    sys.exit(execute_task_cmd())
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 476, in execute_task_cmd
    _execute_task(
  File "/opt/venv/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 160, in system_entry_point
    return wrapped(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 348, in _execute_task
    _task_def = resolver_obj.load_task(loader_args=resolver_args)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/class_based_resolver.py", line 34, in load_task
    return self.mapping[idx]
IndexError: list index out of range
.
@Niels Bantilan
n

Niels Bantilan

03/08/2023, 4:59 PM
can you share the code and commands you’re running?
t

Taeef Najib

03/08/2023, 5:07 PM
Here's the workflow code:
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow

from main import Hyperparameters
from main import generate_dataset
from main import train_model

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def test_classification_2(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
    train_model=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)
    generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_=generate_dataset(hp=_wf_args)
    train_model_o0_=train_model(hp=_wf_args,X_train=generate_dataset_o0_,y_train=generate_dataset_o2_)
    return _wf_outputs(train_model_o0_)
did it give any clue about the possible problem?
@Niels Bantilan do you need to see the code from the imported modules?
n

Niels Bantilan

03/09/2023, 8:19 PM
you need to define tasks in the top-level scope of the module, not inside a workflow function body. You also need to make sure the output of the
train_model_task
needs to be
_wf_outputs
directly: you can’t do
_wf_outputs(train_model_o0_)
because
train_model_o0_
is a promise: you can’t operate on it like a regular python value
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow

from main import Hyperparameters
from main import generate_dataset
from main import train_model

generate_dataset_task = task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model_task = task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_0=sklearn.ensemble._forest.RandomForestClassifier)

@workflow
def test_classification_2(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_ = generate_dataset_task(hp=_wf_args)
    train_model_o0_=train_model_task(hp=_wf_args,X_train=generate_dataset_o0_,y_train=generate_dataset_o2_)
    return _wf_outputs(train_model_o0_)
Note that the output of
generate_dataset
needs to be a
tuple
or
NamedTuple
for you to be able to unpack the outputs like
generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_= generate_dataset_task(hp=_wf_args)
actually not confident whether
Copy code
generate_dataset_task = task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
will work, though it should in theory.
the main thing to remember about Flyte workflows is that the function body within
@workflow
-decorated functions only supports certain operations. You can read more about it here: https://docs.flyte.org/projects/cookbook/en/latest/getting_started/tasks_and_workflows.html#workflows
t

Taeef Najib

03/10/2023, 1:58 PM
@Niels Bantilan The problem is deploying with v1.4.1 breaks with
index out of range
error whereas it works fine in v1.2.7. Is something different in v1.4.1 causing the error? Workflow code is exactly same. To confirm, I tried what you suggested (moving the task definitions outside the @workflow) but it throws this erros (in both v1.2.7 and v1.4.1)
Copy code
Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte-execute", line 8, in <module>
    sys.exit(execute_task_cmd())
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 476, in execute_task_cmd
    _execute_task(
  File "/opt/venv/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 160, in system_entry_point
    return wrapped(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 354, in _execute_task
    _handle_annotated_task(ctx, _task_def, inputs, output_prefix)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 297, in _handle_annotated_task
    _dispatch_execute(ctx, task_def, inputs, output_prefix)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
Another thing I noticed when I made this change - in the flyte console task details I see this:
Copy code
"pyflyte-execute"
"--inputs"
"{{.input}}"
"--output-prefix"
"{{.outputPrefix}}"
"--raw-output-data-prefix"
"{{.rawOutputDataPrefix}}"
"--checkpoint-path"
"{{.checkpointOutputPrefix}}"
"--prev-checkpoint"
"{{.prevCheckpointPrefix}}"
"--resolver"
"flytekit.core.python_auto_container.default_task_resolver"
"--"
"task-module"
"main"
"task-name"
"generate_dataset"
Instead of:
Copy code
"pyflyte-execute"
"--inputs"
"{{.input}}"
"--output-prefix"
"{{.outputPrefix}}"
"--raw-output-data-prefix"
"{{.rawOutputDataPrefix}}"
"--checkpoint-path"
"{{.checkpointOutputPrefix}}"
"--prev-checkpoint"
"{{.prevCheckpointPrefix}}"
"--resolver"
"wf.wf_1.my_workflow_1"
"--"
"0"
n

Niels Bantilan

03/10/2023, 2:38 PM
did you refactor your code? can you share it?
The first task detail looks correct… the second one looks off… are you using a custom task resolver, or custom container interface?
t

Taeef Najib

03/10/2023, 4:16 PM
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow
from main import Hyperparameters
from main import generate_dataset
from main import train_model

generate_dataset=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def my_workflow_3(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_=generate_dataset(hp=_wf_args)
    train_model_o0_=train_model(hp=_wf_args,X_train=generate_dataset_o0_,y_train=generate_dataset_o2_)
    return _wf_outputs(train_model_o0_)
No, not using a custom task resolver or custom container interface. The first one is before I took the tasks outside the @workflow. But when I take them out, I get the
AttributeError: 'function' object has no attribute 'name'
and can’t figure out why.
n

Niels Bantilan

03/10/2023, 4:35 PM
can you try renaming the tasks? Like:
Copy code
generate_dataset_task=task(...)(generate_dataset)
train_model_task=task(...)(train_model)
and then using
generate_dataset_task
and
train_model_task
in your workflow?
t

Taeef Najib

03/10/2023, 5:03 PM
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow
from main import Hyperparameters
from main import generate_dataset
from main import train_model

generate_dataset_task=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model_task=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_task_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def my_workflow_3(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_task_o0_,generate_dataset_task_o1_,generate_dataset_task_o2_,generate_dataset_task_o3_=generate_dataset_task(hp=_wf_args)
    train_model_task_o0_=train_model_task(hp=_wf_args,X_train=generate_dataset_task_o0_,y_train=generate_dataset_task_o2_)
    return _wf_outputs(train_model_task_o0_)
Same result
n

Niels Bantilan

03/10/2023, 5:06 PM
the result being
AttributeError: 'function' object has no attribute 'name'
or `
Copy code
packages/flytekit/core/class_based_resolver.py", line 34, in load_task
    return self.mapping[idx]
IndexError: list index out of range
or both?
t

Taeef Najib

03/10/2023, 5:14 PM
I think your suggestion may have fixed the
list index out of range
error But still fails with
AttributeError: 'function' object has no attribute 'name'
error during execution
n

Niels Bantilan

03/10/2023, 5:15 PM
is this a local execution using
pyflyte run
? it would also be help if you can provide the contents of the
main
module of you can
can you go to this line of your flytekit installation:
Copy code
File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
And print out
task_def
and it’s
type
before line 80? I don’t think it’s an actual
PythonTask
object
t

Taeef Najib

03/10/2023, 5:24 PM
regarding your previous message:
Copy code
# Import necessary libraries
import typing
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class Hyperparameters(object):
    n_samples: int = 1000
    n_features: int = 20
    n_informative: int = 15
    n_classes: int = 2
    test_size: float = 0.2
    n_estimator: int = 100


# Generate the dataset
def generate_dataset(hp: Hyperparameters) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    X, y = make_classification(n_samples=hp.n_samples, n_features=hp.n_features, n_informative=hp.n_informative, n_classes=hp.n_classes)
    return train_test_split(X, y, test_size=hp.test_size)


# Train a random forest classifier on the train data
def train_model(hp: Hyperparameters, X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier:
    return RandomForestClassifier(hp.n_estimator).fit(X_train, y_train)
No this is a remote execution via flyte REST API
Regarding your last message: give me some time. I'll give you an update about the result
n

Niels Bantilan

03/10/2023, 6:15 PM
so I just got this working on a sandbox cluster: basically flytekit currently doesn’t support defining tasks like this:
Copy code
generate_dataset_task=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
It basically needs to be used as a decorator. Here’s a one-scripter that works:
Copy code
# Import necessary libraries
import typing
import sklearn
from flytekit import Resources, task, workflow

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class Hyperparameters(object):
    n_samples: int = 1000
    n_features: int = 20
    n_informative: int = 15
    n_classes: int = 2
    test_size: float = 0.2
    n_estimator: int = 100


# Generate the dataset
@task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)
def generate_dataset(hp: Hyperparameters) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    X, y = make_classification(n_samples=hp.n_samples, n_features=hp.n_features, n_informative=hp.n_informative, n_classes=hp.n_classes)
    return train_test_split(X, y, test_size=hp.test_size)


# Train a random forest classifier on the train data
@task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)
def train_model(hp: Hyperparameters, X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier:
    return RandomForestClassifier(hp.n_estimator).fit(X_train, y_train)


_wf_outputs=typing.NamedTuple("WfOutputs",train_model_task_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def my_workflow_3(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_task_o0_,generate_dataset_task_o1_,generate_dataset_task_o2_,generate_dataset_task_o3_=generate_dataset(hp=_wf_args)
    train_model_task_o0_=train_model(hp=_wf_args,X_train=generate_dataset_task_o0_,y_train=generate_dataset_task_o2_)
    return _wf_outputs(train_model_task_o0_)

if __name__ == "__main__":
    print(my_workflow_3(_wf_args=Hyperparameters()))
you can do the
@task
decoration directly where the functions are define in
main
t

Taeef Najib

03/10/2023, 6:19 PM
That’s strange because it was working for 1.2.7. Decorators are just functions and isn’t flyte just plain python outside of
@workflow
? The reason we need to do this is because we’re using flyte in an automated setup and we need to be able to apply the decorator to the imported functions How would we do that?
Also, using
pyflyte run
works just fine with the code we currently have - doesn’t that mean flyte should work in a remote execution as well?
n

Niels Bantilan

03/10/2023, 6:21 PM
@Yee @Kevin Su is this behavior supported ^^? this might be a bug
ah! I think I misled you a little… you’ll need to overwrite the function name after all, as you originally had it:
Copy code
generate_dataset=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)
I got this to work on my end, can you try it?
basically flytekit got confused: it was looking for
generate_dataset
as the task name, but it was a function in the module’s namespace, so re-defining it as a task makes it work. I’m not sure why this wasn’t working earlier, perhaps we’ll get another error on your side
t

Taeef Najib

03/10/2023, 7:55 PM
OK so it’s the same error when we change the task function name to overwrite the imported function name
n

Niels Bantilan

03/10/2023, 8:14 PM
can you say how exactly you’re: 1. packaging the workflow 2. registering it 3. running it (UI or
FlyteRemote
? pyflyte run?)
@Yee @Eduardo Apolinario (eapolinario) above use case seems to be supported in earlier flytekit versions, but doesn’t work in 1.4.1, basically we import some functions that we then use as tasks in a separate module:
Copy code
from other_module import fn
from flytekit import task, workflow

fn = task(...)(fn)

@workflow
def wf():
    fn()
This will raise an error:
Copy code
[4/4] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[apdmwk4bc9m9nq27gfth-n0-3] terminated with exit code (1). Reason [Error]. Message: 
bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
Traceback (most recent call last):
  File "/usr/local/bin/pyflyte-fast-execute", line 8, in <module>
    sys.exit(fast_execute_task_cmd())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 513, in fast_execute_task_cmd
    subprocess.run(cmd, check=True)
  File "/usr/local/lib/python3.8/subprocess.py", line 516, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['pyflyte-execute', '--inputs', '<s3://flyte-development-data/metadata/propeller/flytesnacks-development-apdmwk4bc9m9nq27gfth/n0/data/inputs.pb>', '--output-prefix', '<s3://flyte-development-data/metadata/propeller/flytesnacks-development-apdmwk4bc9m9nq27gfth/n0/data/3>', '--raw-output-data-prefix', '<s3://flyte-development-data/data/bh/apdmwk4bc9m9nq27gfth-n0-3>', '--checkpoint-path', '<s3://flyte-development-data/data/bh/apdmwk4bc9m9nq27gfth-n0-3/_flytecheckpoints>', '--prev-checkpoint', '<s3://flyte-development-data/data/mi/apdmwk4bc9m9nq27gfth-n0-2/_flytecheckpoints>', '--dynamic-addl-distro', '<s3://flyte-development-data/flytesnacks/development/VB6EEUDVKPK3GOU5HSHIETCXVU======/fasta42e5e6df1c146b6ec0cf0dd67ed937b.tar.gz>', '--dynamic-dest-dir', '/root', '--resolver', 'flytekit.core.python_auto_container.default_task_resolver', '--', 'task-module', 'other_module', 'task-name', 'fn']' returned non-zero exit status 1.
.
it’s failing because the flyte task resolver thinks the task is defined in
other_module.fn
so it tries to grab the task from there, but it’s not a task… it’s a function
@Taeef Najib so I tried running this on flytekit 1.2.7 and it doesn’t work, it gives me the same error message.
Copy code
logger.debug(f"Starting _dispatch_execute for {task_def.name}")
y

Yee

03/10/2023, 8:23 PM
is it really called
fn
in multiple places? can we rename?
shouldn’t matter
t

Taeef Najib

03/11/2023, 1:25 AM
Inside a CI/CD pipeline:
Copy code
`pyflyte --pkgs wf package --output flyte-workflow-package.tgz --image <http://gcr.io/blah|gcr.io/blah> --force`
`flytectl register files --project valor-unicorn --domain development --archive flyte-workflow-package.tgz --config /root/flyte_config.yaml --version 0.0.16`
Yes, I can confirm that this code fails with the same error on 1.2.7 as well - 1.2.7 succeeds if I put the task defs inside the @workflow (edited) But the same code runs fine locally via
pyflyte run
on 1.4.1 so I’d say either the way we’re registering/running the workflow is somehow deprecated or this seems like a bug. I think it’s fair to assume that the code that runs fine via
pyflyte run
should also run without issues remotely? What would you suggest is the right way to write the flyte workflow if I had to import functions from other modules like this? How can I get this to work?
@Niels Bantilan @Yee It must be weekend in your time zone! Could you please take a look at it when you return to work? I'm extremely excited about making it work! Thank you for what you all are doing for me 🙏 enjoy your weekends
e

Eduardo Matus

03/12/2023, 9:46 PM
have the same error:
Copy code
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[feaac56274c144d7f8a6-n0-0] terminated with exit code (1). Reason [Error]. Message: 
ted_task
    _dispatch_execute(ctx, task_def, inputs, output_prefix)
  File "/opt/.venv/lib/python3.9/site-packages/flytekit/bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
n

Niels Bantilan

03/13/2023, 2:43 PM
hi @Taeef Najib would you mind filing a bug report for this issue? We’ve been discussing on this thread for some time and I think it’s work creating an issue for this, esp. with @Eduardo Matus’s error as well. Basically it should contain: 1. a minimally reproducible code snippet 2. instructions on how to reproduce the error 3. description of expected behavior [flyte-bug] 👇
s

Samhita Alla

03/13/2023, 3:58 PM
@Niels Bantilan, @Eduardo Matus was able to resolve this issue by downgrading Flyte to 1.3.0
@Taeef Najib, can you test and confirm?
t

Taeef Najib

03/13/2023, 5:32 PM
@Niels Bantilan I will. Thanks @Samhita Alla Of course, I can test
@Samhita Alla OK on v1.3.0, packaging step fails with this error:
Copy code
[12:44 AM] ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.51.3 requires protobuf>=4.21.6, but you have protobuf 3.20.3 which is incompatible.
flytekit 1.3.0 requires pyarrow<11.0.0,>=4.0.0, but you have pyarrow 11.0.0 which is incompatible.
flyteidl 1.3.10 requires protobuf<5.0.0,>=4.21.1, but you have protobuf 3.20.3 which is incompatible.
awscli 1.27.85 requires botocore==1.29.85, but you have botocore 1.29.89 which is incompatible.
awscli 1.27.85 requires PyYAML<5.5,>=3.10, but you have pyyaml 6.0 which is incompatible.
Successfully installed aiohttp-3.8.4 aiosignal-1.3.1 alembic-1.10.2 anyio-3.6.2 appdirs-1.4.4 asgiref-3.6.0 async-timeout-4.0.2 attrs-22.2.0 backoff-2.2.1 bentoml-1.0.15 boto3-1.26.89 botocore-1.29.89 cattrs-22.2.0 circus-0.18.0 click-option-group-0.5.5 contextlib2-21.6.0 contourpy-1.0.7 cycler-0.11.0 databricks-cli-0.17.5 deepmerge-1.1.0 entrypoints-0.4 exceptiongroup-1.1.1 flask-2.2.3 fonttools-4.39.0 frozenlist-1.3.3 fs-2.4.16 greenlet-2.0.2 gunicorn-20.1.0 h11-0.14.0 itsdangerous-2.1.2 kiwisolver-1.4.4 llvmlite-0.39.1 mako-1.2.4 markdown-3.4.1 markdown-it-py-2.2.0 matplotlib-3.7.1 mdurl-0.1.2 mlflow-2.2.1 multidict-6.0.4 numba-0.56.4 oauthlib-3.2.2 opentelemetry-api-1.14.0 opentelemetry-exporter-otlp-proto-http-1.14.0 opentelemetry-instrumentation-0.35b0 opentelemetry-instrumentation-aiohttp-client-0.35b0 opentelemetry-instrumentation-asgi-0.35b0 opentelemetry-proto-1.14.0 opentelemetry-sdk-1.14.0 opentelemetry-semantic-conventions-0.35b0 opentelemetry-util-http-0.35b0 pathspec-0.11.0 pillow-9.4.0 pip-requirements-parser-32.0.1 prometheus-client-0.16.0 protobuf-3.20.3 psutil-5.9.4 pyarrow-11.0.0 pygments-2.14.0 pyjwt-2.6.0 pynvml-11.5.0 pyparsing-3.0.9 python-multipart-0.0.6 pyzmq-25.0.1 querystring-parser-1.2.4 rich-13.3.2 schema-0.7.5 scikit-learn-1.2.0 scipy-1.10.1 setuptools-67.6.0 shap-0.41.0 sidetrekutils-0.0.14 simple-di-0.1.5 slicer-0.0.7 sniffio-1.3.0 sqlalchemy-2.0.6 sqlparse-0.4.3 starlette-0.26.1 tabulate-0.9.0 threadpoolctl-3.1.0 tornado-6.2 tqdm-4.65.0 urllib3-1.26.15 uvicorn-0.21.0 watchfiles-0.18.1 werkzeug-2.2.3 yarl-1.8.2
Traceback (most recent call last):
  File "/opt/venv/lib/python3.10/site-packages/git/__init__.py", line 89, in <module>
    refresh()
  File "/opt/venv/lib/python3.10/site-packages/git/__init__.py", line 76, in refresh
    if not Git.refresh(path=path):
  File "/opt/venv/lib/python3.10/site-packages/git/cmd.py", line 392, in refresh
    raise ImportError(err)
ImportError: Bad git executable.
The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

This initial warning can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|none|n|0: for no warning or exception
    - warn|w|warning|1: for a printed warning
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte", line 5, in <module>
    from flytekit.clis.sdk_in_container.pyflyte import main
  File "/opt/venv/lib/python3.10/site-packages/flytekit/clis/sdk_in_container/pyflyte.py", line 7, in <module>
    from flytekit.clis.sdk_in_container.package import package
  File "/opt/venv/lib/python3.10/site-packages/flytekit/clis/sdk_in_container/package.py", line 13, in <module>
    from flytekit.tools.repo import NoSerializableEntitiesError, serialize_and_package
  File "/opt/venv/lib/python3.10/site-packages/flytekit/tools/repo.py", line 14, in <module>
    from flytekit.remote import FlyteRemote
  File "/opt/venv/lib/python3.10/site-packages/flytekit/remote/__init__.py", line 98, in <module>
    from flytekit.remote.remote import FlyteRemote
  File "/opt/venv/lib/python3.10/site-packages/flytekit/remote/remote.py", line 22, in <module>
    from git import Repo
  File "/opt/venv/lib/python3.10/site-packages/git/__init__.py", line 91, in <module>
    raise ImportError("Failed to initialize: {0}".format(exc)) from exc
ImportError: Failed to initialize: Bad git executable.
The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

This initial warning can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|none|n|0: for no warning or exception
    - warn|w|warning|1: for a printed warning
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet
So it won’t even deploy - can’t get to the execute step
y

Yee

03/13/2023, 8:27 PM
try
1.3.4
e

Eduardo Matus

03/13/2023, 11:54 PM
@Taeef Najib Create a new environment using python 3.8-3.9. with flyte, force the installation with the version that Yee mentioned above
t

Taeef Najib

03/14/2023, 3:44 AM
@Eduardo Matus @Yee Able to register/package, but same error on python 3.9.16 with flyte 1.3.4
y

Yee

03/14/2023, 4:00 AM
sorry what error?
git?
or the other one?
t

Taeef Najib

03/14/2023, 4:07 AM
This one:
Copy code
AttributeError: 'function' object has no attribute 'name'
y

Yee

03/14/2023, 4:14 AM
can i ask what the goal is?
i get that the code is how it is…
but why the desire to separate the task decorator call from the function?
t

Taeef Najib

03/14/2023, 4:16 AM
That way we can import functions from other files - otherwise, you have to put all your task and workflow code in the same file
y

Yee

03/14/2023, 4:16 AM
i would have to check tomorrow but I believe it’s done that way to support the other use-case… where the decoration happens together, but then that task is imported as a whole unit into other python modules
but why not decorate first, and then import?
if you truly need access to the underlying function, it is still there in the
.task_function
property
t

Taeef Najib

03/14/2023, 4:21 AM
OK so the problem is using the decorator this way, then, right?
y

Yee

03/14/2023, 4:25 AM
i think so.
at least for now
python module loading is complicated and almost always a rabbit hole when i venture down it
and we do some possibly esoteric things already that i think might be challenging to increase the flexibility of
t

Taeef Najib

03/14/2023, 4:28 AM
Lol - so true. OK, thank you so much for looking into this - we’ll try one of the other ways you mentioned. I really appreciate your patience with all the back and forth!
y

Yee

03/14/2023, 4:28 AM
but leave the issue open, we will look at it when we have a bit more time. there’s some other module loading clean-up things we need to investigate as well
thank you!
t

Taeef Najib

03/14/2023, 4:29 AM
Will do - thanks again!
27 Views