• Yee

    Yee

    5 months ago
    General question about python function task (and workflow) naming and loading conventions: We’re hoping to clean up how this works a bit. Today, they are always determined basically by the path from where the python process was started, to the location of the .py file. For example, if you have
    :~/dev/my_repo $ tree .
    .
    ├── Dockerfile
    └── src
        ├── __init__.py
        ├── __pycache__
        └── parent
            ├── __init__.py
            ├── __pycache__
            └── child
                ├── __init__.py
                ├── __pycache__
                └── hello_world.py
    the
    __module__
    attribute is determined by the path:
    :~/dev/my_repo $ ipython
    In [1]: from src.parent.child.hello_world import my_wf
    
    In [2]: my_wf.__module__
    Out[2]: 'src.parent.child.hello_world'
    
    
    :~/dev/my_repo $ cd src/parent
    :~/dev/my_repo/src/parent $ ipython
    In [1]: from child.hello_world import my_wf
    
    In [2]: my_wf.__module__
    Out[2]: 'child.hello_world'
    Note the
    __module__
    in the second example is shortened. This module name is used for the name of the task, and to load the task at run-time in the container (so your image has to be built correctly of course). We are thinking of changing the behaviour so that, given a task (or launch plan or workflow), we walk up the file system until we find a folder that doesn’t have an
    __init__.py
    file, and basically assume that the python process was started from there. In the above example, the name in the latter would be the same as in the first example. Obviously this will not work for namespace packages, but we doubt anyone uses those currently for Flyte workflows. Does this sound unreasonable to anyone?
    Yee
    4 replies
    Copy to Clipboard
  • Stephan Gref

    Stephan Gref

    5 months ago
    Hey Flyte folks! 🙂 I am currently trying to get Flyte to work with Azure. I successfully configured "stow" config for admin and propeller and also managed to do an upload (fast-register) using an
    abfs://
    prefixed url. However, propeller produces a url like this for the pyflyte command:
    pyflyte-execute --inputs <afs://flyte-data/metadata/propeller/test-azure-development-boygblti4g/n3/data/0/n0/inputs.pb>
    which lead to the following error in flytekit:
    Original exception: No plugin found for matching protocol of path <afs://flyte-data/metadata/propeller/test-azure-development-boygblti4g/n2/data/0/n0/0>
    I believe the value
    afs
    is taken from https://github.com/graymeta/stow - however I could not identify the place in propeller yet. Did I miss how to configure this properly?
    Stephan Gref
    Ketan (kumare3)
    28 replies
    Copy to Clipboard
  • GitHub

    GitHub

    5 months ago
    message has been deleted
  • Jay Ganbat

    Jay Ganbat

    5 months ago
    HI all, so i am trying to use FlyteRemote to aggregate failed task executions of a given workflow execution. So it looks like this
    remote = FlyteRemote(
            default_project=project,
            default_domain=domain,
            flyte_admin_url=flyte_admin_url,
            insecure=True,
        )
        wf_exec = remote.fetch_workflow_execution(name=execution_id)
        synced_wf_exec = remote.sync(wf_exec, sync_nodes=True)
    However it fails to sync nodes because of this error
    File ~/.virtualenvs/balrog/lib/python3.9/site-packages/flytekit/remote/executions.py:114, in FlyteWorkflowExecution.outputs(self)
        110     raise _user_exceptions.FlyteAssertion(
        111         "Please wait until the node execution has completed before requesting the outputs."
        112     )
        113 if self.error:
    --> 114     raise _user_exceptions.FlyteAssertion("Outputs could not be found because the execution ended in failure.")
        115 return self._outputs
    
    FlyteAssertion: Outputs could not be found because the execution ended in failure.
    is there anyway to get all the failed node/task executions? Looks like it only syncs if the all node execution was successful, it is failing here in
    flytekit/remote/remote.py
    if launched_exec.is_complete:
                    # The synced underlying execution should've had these populated.
                    execution._inputs = launched_exec.inputs
                    execution._outputs = launched_exec.outputs
    Jay Ganbat
    Yee
    4 replies
    Copy to Clipboard
  • Matheus Moreno

    Matheus Moreno

    5 months ago
    Hi, everyone! I was wondering: is there a simple way to execute file persistence in Flyte? My use case: I have a workflow that generates a dataset. I want to save that dataset somewhere so I can call it on my training workflow without regenerating it. Is it possible? When I return a
    FlyteFile
    , the path is not really obvious/human-friendly.
    Matheus Moreno
    Jay Ganbat
    +1
    17 replies
    Copy to Clipboard
  • Jay Ganbat

    Jay Ganbat

    5 months ago
    Hi flyte community, happy monday 😆 So I am trying to get some clarification on some task related question: I have a
    regular python task(T1)
    that calls another
    regular python task(T2)
    without it being dynamic; T2 is a very simple function that runs quickly. I did not want to make T1 dynamic since i didnt wanna add overhead of launching pod for T2. Also I am using T2 in a workflow as a regular python task. My question is
    is it a correct way to use it?
    or is it fine to call regular task from another regular task. i could create a regular python function and call that from T1 but then i need to create another wrapper for T2 which just calls the regular function and i wanted to avoid that extra stuff. From this page https://docs.flyte.org/projects/cookbook/en/latest/auto/core/flyte_basics/task.html#sphx-glr-auto-core-flyte-basics-task-py i read that
    You can execute a Flyte task as any normal function.
    maybe i took this statement a bit too liberally
    Jay Ganbat
    Yee
    12 replies
    Copy to Clipboard
  • Jay Ganbat

    Jay Ganbat

    5 months ago
    Does
    Mapped Task
    supported in local executions, I have a mapped task in one of my workflow and tried to run it locally but im getting into a Promise issue like below
    E                   TypeError: Failed to convert return value for var o0 for function balrog_pipeline.workflows.primer_qc.process_expected_primer_qc_workflow.mapper_calculate_spike_in_metrics_in_fastq_dir_2 with error <class 'TypeError'>: No automatic conversion found from type <class 'flytekit.core.promise.Promise'> to FlyteFile.Supported (os.PathLike, str, Flytefile)
    mapper task looks like this
    out_spike_in_metrics_list = map_task(
            calculate_spike_in_metrics_in_fastq_dir,
            metadata=TaskMetadata(retries=DEFAULT_DYNAMIC_TASK_RETRY),
        )(task_params=task_params_list)
    and
    calculate_spike_in_metrics_in_fastq_dir
    is the task with following signature
    @BASE_TASK_WORKER.run_task_with_args()
    def calculate_spike_in_metrics_in_fastq_dir(
        task_params: dict[str, FlyteFile]
    ) -> FlyteFile:
    Jay Ganbat
    Yee
    35 replies
    Copy to Clipboard
  • Tim Bauer

    Tim Bauer

    5 months ago
    Anyone else having dependency conflicts when pip installing from flytekit master? I get:
    ERROR: Could not find a version that satisfies the requirement grpcio<2.0,>=1.45.0 (from flytekit) (from versions: 0.4.0a0, 0.4.0a1, 0.4.0a2, 0.4.0a3, 0.4.0a4, 0.4.0a5, 0.4.0a6, 0.4.0a7, 0.4.0a8, 0.4.0a13, 0.4.0a14, 0.5.0a0, 0.5.0a1, 0.5.0a2, 0.9.0a0, 0.9.0a1, 0.10.0a0, 0.11.0b0, 0.11.0b1, 0.12.0b0, 0.13.0, 0.13.1rc1, 0.13.1, 0.14.0rc1, 0.14.0, 0.15.0, 1.0.0rc1, 1.0.0rc2, 1.0.0, 1.0.1rc1, 1.0.1, 1.0.2, 1.0.3, 1.0.4, 1.1.0, 1.1.3, 1.2.0, 1.2.1, 1.3.0, 1.3.5, 1.4.0, 1.6.0, 1.6.3, 1.7.0, 1.7.3, 1.8.1, 1.8.2, 1.8.3, 1.8.4, 1.8.6, 1.9.0rc1, 1.9.0rc2, 1.9.0rc3, 1.9.0, 1.9.1, 1.10.0rc1, 1.10.0rc2, 1.10.0, 1.10.1rc1, 1.10.1rc2, 1.10.1, 1.11.0rc1, 1.11.0rc2, 1.11.0, 1.11.1rc1, 1.11.1, 1.12.0rc1, 1.12.0, 1.12.1, 1.13.0rc1, 1.13.0rc2, 1.13.0rc3, 1.13.0, 1.14.0rc1, 1.14.0rc2, 1.14.0, 1.14.1, 1.14.2rc1, 1.14.2, 1.15.0rc1, 1.15.0, 1.16.0rc1, 1.16.0, 1.16.1, 1.17.0, 1.17.1, 1.18.0, 1.19.0, 1.20.0rc1, 1.20.0rc2, 1.20.0rc3, 1.20.0, 1.20.1, 1.21.0rc1, 1.21.1rc1, 1.21.1, 1.22.0rc1, 1.22.0, 1.22.1, 1.23.0rc1, 1.23.0, 1.23.1, 1.24.0rc1, 1.24.0, 1.24.1, 1.24.3, 1.25.0rc1, 1.25.0, 1.26.0rc1, 1.26.0, 1.27.0rc1, 1.27.0rc2, 1.27.1, 1.27.2, 1.28.0.dev0, 1.28.0rc1, 1.28.0rc2, 1.28.0rc3, 1.28.1, 1.29.0, 1.30.0rc1, 1.30.0, 1.31.0rc1, 1.31.0rc2, 1.31.0, 1.32.0rc1, 1.32.0, 1.33.0rc1, 1.33.0rc2, 1.33.1, 1.33.2, 1.34.0rc1, 1.34.0, 1.34.1, 1.35.0rc1, 1.35.0, 1.36.0rc1, 1.36.0, 1.36.1, 1.37.0rc1, 1.37.0, 1.37.1, 1.38.0rc1, 1.38.0, 1.38.1, 1.39.0rc1, 1.39.0, 1.40.0rc1, 1.40.0, 1.41.0rc2, 1.41.0, 1.41.1, 1.42.0rc1, 1.42.0, 1.43.0rc1, 1.43.0, 1.44.0rc1, 1.44.0rc2, 1.44.0, 1.45.0rc1, 1.45.0)
    ERROR: No matching distribution found for grpcio<2.0,>=1.45.0
    Bump to >=1.45.0 happened four days ago Irony is 1.45.0 is listed as available 😄
    Tim Bauer
    Ketan (kumare3)
    +2
    8 replies
    Copy to Clipboard
  • Matheus Moreno

    Matheus Moreno

    5 months ago
    Hey, everyone! Quick question: is it possible to "lazily evaluate" tasks when registering them? My use case: I have some tasks that will connect with Google Cloud and a remote MLflow server. To do so, we need to define some environment variables, such as
    GOOGLE_APPLICATION_CREDENTIALS
    and
    MLFLOW_TRACKING_USERNAME
    . To make things easier for the team, I defined a decorator called
    @setup_task
    , that receives strings specifying which integrations we want to use. For instance,
    @task(secret_requests=MLFLOW_SECRETS, ...)
    @task_setup(integration_requests=['mflow'])
    def hello_mlflow() -> str:
    In the above case,
    task_setup
    is responsible for setting up the environment variables for MLflow. The problem is that these variables are defined as secrets on the Kubernetes cluster, so when I try to register the tasks, I receive errors such as
    ValueError: Unable to find secret for key MLFLOW_TRACKING_URI in group mlflow-credentials in Env Var:_FSEC_MLFLOW-CREDENTIALS_MLFLOW_TRACKING_URI and FilePath: /etc/secrets/mlflow-credentials/mlflow_tracking_uri
    Which is (kinda) expected, since I don't have the secret locally. But my whole logic is that these tasks won't be running locally. Is there no way that I can avoid this? The only workaround I can think is transforming the decorator into a normal function, and calling it inside the task. Is this the only way?
  • Matheus Moreno

    Matheus Moreno

    5 months ago
    Hi, everyone. While using Secrets on Flytekit, I stumbled upon a bug. When I use
    flytectl.current_context().secrets.get()
    with a Secret mounted as a file, the
    get_secrets_file()
    method here searches for it using
    return os.path.join(self._base_dir, group.lower(), f"{self._file_prefix}{key.lower()}")
    The thing is, Kubernetes mounts secret files without changing the case of letters. Therefore, if I call
    flytectl.current_context().secrets.get('mlflow-credentials', 'MLFLOW_TRACKING_URI')
    , the Secrets Manager is unable to find this secret, even though its present on
    /etc/flyte/secrets/mlflow-credentials/MLFLOW_TRACKING_URI
    . I think this is a problem on the Kubernetes side, right? Because I just checked that secrets requested as environment variables are passed with all upper case, even though the key is lower case. I was even opening a PR on
    flyteorg/flytekit
    removing the
    .upper()
    and
    .lower()
    calls on the Secret Manager, but, again, I'm not sure if this is a problem on the SDK side or on the server side.
    Matheus Moreno
    Ketan (kumare3)
    3 replies
    Copy to Clipboard