Hi guys, I'm using Flyte on a project that predict...
# ask-the-community
t
Hi guys, I'm using Flyte on a project that predicts glass type using
KNeighborsClassifier
It seems Flyte doesn't accept
KNeighborsClassifier
as the type of my function. What would be the right type hint for this? I get the following error:
Copy code
[4/4] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[sd4h34hqd15wyc3wy32t-n0-3] terminated with exit code (137). Reason [OOMKilled]. Message: 

{"asctime": "2023-03-05 20:24:43,219", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <class 'sklearn.neighbors._classification.KNeighborsClassifier'> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
2023/03/05 20:24:43 WARNING mlflow.utils.git_utils: Failed to import Git (the Git executable is probably not on your PATH), so Git SHA is not available. Error: Failed to initialize: Bad git executable.
The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

This initial warning can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|none|n|0: for no warning or exception
    - warn|w|warning|1: for a printed warning
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet

{"asctime": "2023-03-05 20:24:43,424", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <class 'sklearn.neighbors._classification.KNeighborsClassifier'> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
.
Can you please point me to the right direction?
k
Flyte, By default, will serialize the output (int, str, list, pd.dataframe) to
Flyte Literal
(protobuf message) if the type is unrecognized by Flyte, flytekit will serialize it to pickle. you could register a custom type transformer yourself. here is an example
t
Thanks for your response. Can I create a custom class in my script and use it as a custom type? (As shown here: https://docs.flyte.org/projects/cookbook/en/latest/auto/core/extend_flyte/custom_types.html#advanced-custom-types) Was also wondering if I could simply use
ClassifierMixin
,
BaseEstimator
or
Any
or maybe
None
?
s
Can I create a custom class in my script and use it as a custom type?
Yes, you can! You'll need to have the methods specified in the guide.
Was also wondering if I could simply use
ClassifierMixin
,
BaseEstimator
or
Any
or maybe
None
?
You can. The error you're seeing is OOMKilled, right? In that case, please increase the memory you're allocating to your task. The unsupported one is a warning. You can either register a custom type (which is preferred) or leave it as is.
n
interesting… shouldn’t
KNeighborsClassifier
be picked up by the
SklearnTypeTransformer
? https://github.com/flyteorg/flytekit/blob/master/flytekit/extras/sklearn/native.py#L72
t
Thanks guys
n
which version of flytekit are you using @Taeef Najib?
t
@Niels Bantilan it’s 1.2.7
n
kk, are you able to update or do you need to stick with 1.2.7?
(1.2.7) won’t have the support for sklearn estimators
t
We can upgrade to a later version. Will that support KNeighborsClassifier as a type hint?
n
yep! any version >= 1.3.2 should support
KNeighborsClassifier
t
Thanks. I'll try upgrading then
@Niels Bantilan After upgrading it to 1.4.0, I'm getting this error:
Copy code
Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte", line 5, in <module>
    from flytekit.clis.sdk_in_container.pyflyte import main
  File "/opt/venv/lib/python3.10/site-packages/flytekit/__init__.py", line 202, in <module>
    from flytekit.core.base_sql_task import SQLTask
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/base_sql_task.py", line 4, in <module>
    from flytekit.core.base_task import PythonTask, TaskMetadata
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/base_task.py", line 27, in <module>
    from flytekit.core.context_manager import (
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/context_manager.py", line 30, in <module>
    from flytekit.clients import friendly as friendly_client  # noqa
  File "/opt/venv/lib/python3.10/site-packages/flytekit/clients/friendly.py", line 4, in <module>
    from flyteidl.admin import common_pb2 as _common_pb2
  File "/opt/venv/lib/python3.10/site-packages/flyteidl/admin/common_pb2.py", line 5, in <module>
    from google.protobuf.internal import builder as _builder
ImportError: cannot import name 'builder' from 'google.protobuf.internal' (/opt/venv/lib/python3.10/site-packages/google/protobuf/internal/__init__.py)
n
we had to yank 1.4.0… can you try installing 1.4.1?
t
let me try
So we upgraded to 1.4.1 and got this error:
Copy code
[4/4] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[snbh2jiispjmvrt5faat-n0-3] terminated with exit code (1). Reason [Error]. Message: 
{"asctime": "2023-03-08 16:51:11,316", "name": "flytekit", "levelname": "WARNING", "message": "FlyteSchema is deprecated, use Structured Dataset instead."}

Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte-execute", line 8, in <module>
    sys.exit(execute_task_cmd())
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 476, in execute_task_cmd
    _execute_task(
  File "/opt/venv/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 160, in system_entry_point
    return wrapped(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 348, in _execute_task
    _task_def = resolver_obj.load_task(loader_args=resolver_args)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/core/class_based_resolver.py", line 34, in load_task
    return self.mapping[idx]
IndexError: list index out of range
.
@Niels Bantilan
n
can you share the code and commands you’re running?
t
Here's the workflow code:
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow

from main import Hyperparameters
from main import generate_dataset
from main import train_model

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def test_classification_2(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
    train_model=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)
    generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_=generate_dataset(hp=_wf_args)
    train_model_o0_=train_model(hp=_wf_args,X_train=generate_dataset_o0_,y_train=generate_dataset_o2_)
    return _wf_outputs(train_model_o0_)
did it give any clue about the possible problem?
@Niels Bantilan do you need to see the code from the imported modules?
n
you need to define tasks in the top-level scope of the module, not inside a workflow function body. You also need to make sure the output of the
train_model_task
needs to be
_wf_outputs
directly: you can’t do
_wf_outputs(train_model_o0_)
because
train_model_o0_
is a promise: you can’t operate on it like a regular python value
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow

from main import Hyperparameters
from main import generate_dataset
from main import train_model

generate_dataset_task = task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model_task = task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_0=sklearn.ensemble._forest.RandomForestClassifier)

@workflow
def test_classification_2(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_ = generate_dataset_task(hp=_wf_args)
    train_model_o0_=train_model_task(hp=_wf_args,X_train=generate_dataset_o0_,y_train=generate_dataset_o2_)
    return _wf_outputs(train_model_o0_)
Note that the output of
generate_dataset
needs to be a
tuple
or
NamedTuple
for you to be able to unpack the outputs like
generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_= generate_dataset_task(hp=_wf_args)
actually not confident whether
Copy code
generate_dataset_task = task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
will work, though it should in theory.
the main thing to remember about Flyte workflows is that the function body within
@workflow
-decorated functions only supports certain operations. You can read more about it here: https://docs.flyte.org/projects/cookbook/en/latest/getting_started/tasks_and_workflows.html#workflows
t
@Niels Bantilan The problem is deploying with v1.4.1 breaks with
index out of range
error whereas it works fine in v1.2.7. Is something different in v1.4.1 causing the error? Workflow code is exactly same. To confirm, I tried what you suggested (moving the task definitions outside the @workflow) but it throws this erros (in both v1.2.7 and v1.4.1)
Copy code
Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte-execute", line 8, in <module>
    sys.exit(execute_task_cmd())
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 476, in execute_task_cmd
    _execute_task(
  File "/opt/venv/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 160, in system_entry_point
    return wrapped(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 354, in _execute_task
    _handle_annotated_task(ctx, _task_def, inputs, output_prefix)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 297, in _handle_annotated_task
    _dispatch_execute(ctx, task_def, inputs, output_prefix)
  File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
Another thing I noticed when I made this change - in the flyte console task details I see this:
Copy code
"pyflyte-execute"
"--inputs"
"{{.input}}"
"--output-prefix"
"{{.outputPrefix}}"
"--raw-output-data-prefix"
"{{.rawOutputDataPrefix}}"
"--checkpoint-path"
"{{.checkpointOutputPrefix}}"
"--prev-checkpoint"
"{{.prevCheckpointPrefix}}"
"--resolver"
"flytekit.core.python_auto_container.default_task_resolver"
"--"
"task-module"
"main"
"task-name"
"generate_dataset"
Instead of:
Copy code
"pyflyte-execute"
"--inputs"
"{{.input}}"
"--output-prefix"
"{{.outputPrefix}}"
"--raw-output-data-prefix"
"{{.rawOutputDataPrefix}}"
"--checkpoint-path"
"{{.checkpointOutputPrefix}}"
"--prev-checkpoint"
"{{.prevCheckpointPrefix}}"
"--resolver"
"wf.wf_1.my_workflow_1"
"--"
"0"
n
did you refactor your code? can you share it?
The first task detail looks correct… the second one looks off… are you using a custom task resolver, or custom container interface?
t
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow
from main import Hyperparameters
from main import generate_dataset
from main import train_model

generate_dataset=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def my_workflow_3(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_o0_,generate_dataset_o1_,generate_dataset_o2_,generate_dataset_o3_=generate_dataset(hp=_wf_args)
    train_model_o0_=train_model(hp=_wf_args,X_train=generate_dataset_o0_,y_train=generate_dataset_o2_)
    return _wf_outputs(train_model_o0_)
No, not using a custom task resolver or custom container interface. The first one is before I took the tasks outside the @workflow. But when I take them out, I get the
AttributeError: 'function' object has no attribute 'name'
and can’t figure out why.
n
can you try renaming the tasks? Like:
Copy code
generate_dataset_task=task(...)(generate_dataset)
train_model_task=task(...)(train_model)
and then using
generate_dataset_task
and
train_model_task
in your workflow?
t
Copy code
import sklearn
import os
import sys
import typing
from flytekit import Resources, task, workflow
from main import Hyperparameters
from main import generate_dataset
from main import train_model

generate_dataset_task=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model_task=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)

_wf_outputs=typing.NamedTuple("WfOutputs",train_model_task_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def my_workflow_3(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_task_o0_,generate_dataset_task_o1_,generate_dataset_task_o2_,generate_dataset_task_o3_=generate_dataset_task(hp=_wf_args)
    train_model_task_o0_=train_model_task(hp=_wf_args,X_train=generate_dataset_task_o0_,y_train=generate_dataset_task_o2_)
    return _wf_outputs(train_model_task_o0_)
Same result
n
the result being
AttributeError: 'function' object has no attribute 'name'
or `
Copy code
packages/flytekit/core/class_based_resolver.py", line 34, in load_task
    return self.mapping[idx]
IndexError: list index out of range
or both?
t
I think your suggestion may have fixed the
list index out of range
error But still fails with
AttributeError: 'function' object has no attribute 'name'
error during execution
n
is this a local execution using
pyflyte run
? it would also be help if you can provide the contents of the
main
module of you can
can you go to this line of your flytekit installation:
Copy code
File "/opt/venv/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
And print out
task_def
and it’s
type
before line 80? I don’t think it’s an actual
PythonTask
object
t
regarding your previous message:
Copy code
# Import necessary libraries
import typing
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class Hyperparameters(object):
    n_samples: int = 1000
    n_features: int = 20
    n_informative: int = 15
    n_classes: int = 2
    test_size: float = 0.2
    n_estimator: int = 100


# Generate the dataset
def generate_dataset(hp: Hyperparameters) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    X, y = make_classification(n_samples=hp.n_samples, n_features=hp.n_features, n_informative=hp.n_informative, n_classes=hp.n_classes)
    return train_test_split(X, y, test_size=hp.test_size)


# Train a random forest classifier on the train data
def train_model(hp: Hyperparameters, X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier:
    return RandomForestClassifier(hp.n_estimator).fit(X_train, y_train)
No this is a remote execution via flyte REST API
Regarding your last message: give me some time. I'll give you an update about the result
n
so I just got this working on a sandbox cluster: basically flytekit currently doesn’t support defining tasks like this:
Copy code
generate_dataset_task=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
It basically needs to be used as a decorator. Here’s a one-scripter that works:
Copy code
# Import necessary libraries
import typing
import sklearn
from flytekit import Resources, task, workflow

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class Hyperparameters(object):
    n_samples: int = 1000
    n_features: int = 20
    n_informative: int = 15
    n_classes: int = 2
    test_size: float = 0.2
    n_estimator: int = 100


# Generate the dataset
@task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)
def generate_dataset(hp: Hyperparameters) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    X, y = make_classification(n_samples=hp.n_samples, n_features=hp.n_features, n_informative=hp.n_informative, n_classes=hp.n_classes)
    return train_test_split(X, y, test_size=hp.test_size)


# Train a random forest classifier on the train data
@task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)
def train_model(hp: Hyperparameters, X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier:
    return RandomForestClassifier(hp.n_estimator).fit(X_train, y_train)


_wf_outputs=typing.NamedTuple("WfOutputs",train_model_task_0=sklearn.ensemble._forest.RandomForestClassifier)
@workflow
def my_workflow_3(_wf_args:Hyperparameters)->_wf_outputs:
    generate_dataset_task_o0_,generate_dataset_task_o1_,generate_dataset_task_o2_,generate_dataset_task_o3_=generate_dataset(hp=_wf_args)
    train_model_task_o0_=train_model(hp=_wf_args,X_train=generate_dataset_task_o0_,y_train=generate_dataset_task_o2_)
    return _wf_outputs(train_model_task_o0_)

if __name__ == "__main__":
    print(my_workflow_3(_wf_args=Hyperparameters()))
you can do the
@task
decoration directly where the functions are define in
main
t
That’s strange because it was working for 1.2.7. Decorators are just functions and isn’t flyte just plain python outside of
@workflow
? The reason we need to do this is because we’re using flyte in an automated setup and we need to be able to apply the decorator to the imported functions How would we do that?
Also, using
pyflyte run
works just fine with the code we currently have - doesn’t that mean flyte should work in a remote execution as well?
n
@Yee @Kevin Su is this behavior supported ^^? this might be a bug
ah! I think I misled you a little… you’ll need to overwrite the function name after all, as you originally had it:
Copy code
generate_dataset=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(generate_dataset)
train_model=task(requests=Resources(cpu="1"),limits=Resources(cpu="1"),retries=3)(train_model)
I got this to work on my end, can you try it?
basically flytekit got confused: it was looking for
generate_dataset
as the task name, but it was a function in the module’s namespace, so re-defining it as a task makes it work. I’m not sure why this wasn’t working earlier, perhaps we’ll get another error on your side
t
OK so it’s the same error when we change the task function name to overwrite the imported function name
n
can you say how exactly you’re: 1. packaging the workflow 2. registering it 3. running it (UI or
FlyteRemote
? pyflyte run?)
@Yee @Eduardo Apolinario (eapolinario) above use case seems to be supported in earlier flytekit versions, but doesn’t work in 1.4.1, basically we import some functions that we then use as tasks in a separate module:
Copy code
from other_module import fn
from flytekit import task, workflow

fn = task(...)(fn)

@workflow
def wf():
    fn()
This will raise an error:
Copy code
[4/4] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[apdmwk4bc9m9nq27gfth-n0-3] terminated with exit code (1). Reason [Error]. Message: 
bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
Traceback (most recent call last):
  File "/usr/local/bin/pyflyte-fast-execute", line 8, in <module>
    sys.exit(fast_execute_task_cmd())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 513, in fast_execute_task_cmd
    subprocess.run(cmd, check=True)
  File "/usr/local/lib/python3.8/subprocess.py", line 516, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['pyflyte-execute', '--inputs', '<s3://flyte-development-data/metadata/propeller/flytesnacks-development-apdmwk4bc9m9nq27gfth/n0/data/inputs.pb>', '--output-prefix', '<s3://flyte-development-data/metadata/propeller/flytesnacks-development-apdmwk4bc9m9nq27gfth/n0/data/3>', '--raw-output-data-prefix', '<s3://flyte-development-data/data/bh/apdmwk4bc9m9nq27gfth-n0-3>', '--checkpoint-path', '<s3://flyte-development-data/data/bh/apdmwk4bc9m9nq27gfth-n0-3/_flytecheckpoints>', '--prev-checkpoint', '<s3://flyte-development-data/data/mi/apdmwk4bc9m9nq27gfth-n0-2/_flytecheckpoints>', '--dynamic-addl-distro', '<s3://flyte-development-data/flytesnacks/development/VB6EEUDVKPK3GOU5HSHIETCXVU======/fasta42e5e6df1c146b6ec0cf0dd67ed937b.tar.gz>', '--dynamic-dest-dir', '/root', '--resolver', 'flytekit.core.python_auto_container.default_task_resolver', '--', 'task-module', 'other_module', 'task-name', 'fn']' returned non-zero exit status 1.
.
it’s failing because the flyte task resolver thinks the task is defined in
other_module.fn
so it tries to grab the task from there, but it’s not a task… it’s a function
@Taeef Najib so I tried running this on flytekit 1.2.7 and it doesn’t work, it gives me the same error message.
Copy code
logger.debug(f"Starting _dispatch_execute for {task_def.name}")
y
is it really called
fn
in multiple places? can we rename?
shouldn’t matter
t
Inside a CI/CD pipeline:
Copy code
`pyflyte --pkgs wf package --output flyte-workflow-package.tgz --image <http://gcr.io/blah|gcr.io/blah> --force`
`flytectl register files --project valor-unicorn --domain development --archive flyte-workflow-package.tgz --config /root/flyte_config.yaml --version 0.0.16`
Yes, I can confirm that this code fails with the same error on 1.2.7 as well - 1.2.7 succeeds if I put the task defs inside the @workflow (edited) But the same code runs fine locally via
pyflyte run
on 1.4.1 so I’d say either the way we’re registering/running the workflow is somehow deprecated or this seems like a bug. I think it’s fair to assume that the code that runs fine via
pyflyte run
should also run without issues remotely? What would you suggest is the right way to write the flyte workflow if I had to import functions from other modules like this? How can I get this to work?
@Niels Bantilan @Yee It must be weekend in your time zone! Could you please take a look at it when you return to work? I'm extremely excited about making it work! Thank you for what you all are doing for me 🙏 enjoy your weekends
e
have the same error:
Copy code
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[feaac56274c144d7f8a6-n0-0] terminated with exit code (1). Reason [Error]. Message: 
ted_task
    _dispatch_execute(ctx, task_def, inputs, output_prefix)
  File "/opt/.venv/lib/python3.9/site-packages/flytekit/bin/entrypoint.py", line 80, in _dispatch_execute
    logger.debug(f"Starting _dispatch_execute for {task_def.name}")
AttributeError: 'function' object has no attribute 'name'
n
hi @Taeef Najib would you mind filing a bug report for this issue? We’ve been discussing on this thread for some time and I think it’s work creating an issue for this, esp. with @Eduardo Matus’s error as well. Basically it should contain: 1. a minimally reproducible code snippet 2. instructions on how to reproduce the error 3. description of expected behavior [flyte-bug] 👇
s
@Niels Bantilan, @Eduardo Matus was able to resolve this issue by downgrading Flyte to 1.3.0
@Taeef Najib, can you test and confirm?
t
@Niels Bantilan I will. Thanks @Samhita Alla Of course, I can test
@Samhita Alla OK on v1.3.0, packaging step fails with this error:
Copy code
[12:44 AM] ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.51.3 requires protobuf>=4.21.6, but you have protobuf 3.20.3 which is incompatible.
flytekit 1.3.0 requires pyarrow<11.0.0,>=4.0.0, but you have pyarrow 11.0.0 which is incompatible.
flyteidl 1.3.10 requires protobuf<5.0.0,>=4.21.1, but you have protobuf 3.20.3 which is incompatible.
awscli 1.27.85 requires botocore==1.29.85, but you have botocore 1.29.89 which is incompatible.
awscli 1.27.85 requires PyYAML<5.5,>=3.10, but you have pyyaml 6.0 which is incompatible.
Successfully installed aiohttp-3.8.4 aiosignal-1.3.1 alembic-1.10.2 anyio-3.6.2 appdirs-1.4.4 asgiref-3.6.0 async-timeout-4.0.2 attrs-22.2.0 backoff-2.2.1 bentoml-1.0.15 boto3-1.26.89 botocore-1.29.89 cattrs-22.2.0 circus-0.18.0 click-option-group-0.5.5 contextlib2-21.6.0 contourpy-1.0.7 cycler-0.11.0 databricks-cli-0.17.5 deepmerge-1.1.0 entrypoints-0.4 exceptiongroup-1.1.1 flask-2.2.3 fonttools-4.39.0 frozenlist-1.3.3 fs-2.4.16 greenlet-2.0.2 gunicorn-20.1.0 h11-0.14.0 itsdangerous-2.1.2 kiwisolver-1.4.4 llvmlite-0.39.1 mako-1.2.4 markdown-3.4.1 markdown-it-py-2.2.0 matplotlib-3.7.1 mdurl-0.1.2 mlflow-2.2.1 multidict-6.0.4 numba-0.56.4 oauthlib-3.2.2 opentelemetry-api-1.14.0 opentelemetry-exporter-otlp-proto-http-1.14.0 opentelemetry-instrumentation-0.35b0 opentelemetry-instrumentation-aiohttp-client-0.35b0 opentelemetry-instrumentation-asgi-0.35b0 opentelemetry-proto-1.14.0 opentelemetry-sdk-1.14.0 opentelemetry-semantic-conventions-0.35b0 opentelemetry-util-http-0.35b0 pathspec-0.11.0 pillow-9.4.0 pip-requirements-parser-32.0.1 prometheus-client-0.16.0 protobuf-3.20.3 psutil-5.9.4 pyarrow-11.0.0 pygments-2.14.0 pyjwt-2.6.0 pynvml-11.5.0 pyparsing-3.0.9 python-multipart-0.0.6 pyzmq-25.0.1 querystring-parser-1.2.4 rich-13.3.2 schema-0.7.5 scikit-learn-1.2.0 scipy-1.10.1 setuptools-67.6.0 shap-0.41.0 sidetrekutils-0.0.14 simple-di-0.1.5 slicer-0.0.7 sniffio-1.3.0 sqlalchemy-2.0.6 sqlparse-0.4.3 starlette-0.26.1 tabulate-0.9.0 threadpoolctl-3.1.0 tornado-6.2 tqdm-4.65.0 urllib3-1.26.15 uvicorn-0.21.0 watchfiles-0.18.1 werkzeug-2.2.3 yarl-1.8.2
Traceback (most recent call last):
  File "/opt/venv/lib/python3.10/site-packages/git/__init__.py", line 89, in <module>
    refresh()
  File "/opt/venv/lib/python3.10/site-packages/git/__init__.py", line 76, in refresh
    if not Git.refresh(path=path):
  File "/opt/venv/lib/python3.10/site-packages/git/cmd.py", line 392, in refresh
    raise ImportError(err)
ImportError: Bad git executable.
The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

This initial warning can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|none|n|0: for no warning or exception
    - warn|w|warning|1: for a printed warning
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/venv/bin/pyflyte", line 5, in <module>
    from flytekit.clis.sdk_in_container.pyflyte import main
  File "/opt/venv/lib/python3.10/site-packages/flytekit/clis/sdk_in_container/pyflyte.py", line 7, in <module>
    from flytekit.clis.sdk_in_container.package import package
  File "/opt/venv/lib/python3.10/site-packages/flytekit/clis/sdk_in_container/package.py", line 13, in <module>
    from flytekit.tools.repo import NoSerializableEntitiesError, serialize_and_package
  File "/opt/venv/lib/python3.10/site-packages/flytekit/tools/repo.py", line 14, in <module>
    from flytekit.remote import FlyteRemote
  File "/opt/venv/lib/python3.10/site-packages/flytekit/remote/__init__.py", line 98, in <module>
    from flytekit.remote.remote import FlyteRemote
  File "/opt/venv/lib/python3.10/site-packages/flytekit/remote/remote.py", line 22, in <module>
    from git import Repo
  File "/opt/venv/lib/python3.10/site-packages/git/__init__.py", line 91, in <module>
    raise ImportError("Failed to initialize: {0}".format(exc)) from exc
ImportError: Failed to initialize: Bad git executable.
The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

This initial warning can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|none|n|0: for no warning or exception
    - warn|w|warning|1: for a printed warning
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet
So it won’t even deploy - can’t get to the execute step
y
try
1.3.4
e
@Taeef Najib Create a new environment using python 3.8-3.9. with flyte, force the installation with the version that Yee mentioned above
t
@Eduardo Matus @Yee Able to register/package, but same error on python 3.9.16 with flyte 1.3.4
y
sorry what error?
git?
or the other one?
t
This one:
Copy code
AttributeError: 'function' object has no attribute 'name'
y
can i ask what the goal is?
i get that the code is how it is…
but why the desire to separate the task decorator call from the function?
t
That way we can import functions from other files - otherwise, you have to put all your task and workflow code in the same file
y
i would have to check tomorrow but I believe it’s done that way to support the other use-case… where the decoration happens together, but then that task is imported as a whole unit into other python modules
but why not decorate first, and then import?
if you truly need access to the underlying function, it is still there in the
.task_function
property
t
OK so the problem is using the decorator this way, then, right?
y
i think so.
at least for now
python module loading is complicated and almost always a rabbit hole when i venture down it
and we do some possibly esoteric things already that i think might be challenging to increase the flexibility of
t
Lol - so true. OK, thank you so much for looking into this - we’ll try one of the other ways you mentioned. I really appreciate your patience with all the back and forth!
y
but leave the issue open, we will look at it when we have a bit more time. there’s some other module loading clean-up things we need to investigate as well
thank you!
t
Will do - thanks again!
160 Views