https://flyte.org logo
Join the conversationJoin Slack
Channels
announcements
ask-the-community
auth
conference-talks
contribute
databricks-integration
datahub-flyte
deployment
ecosystem-unionml
engineeringlabs
events
feature-discussions
flyte-bazel
flyte-build
flyte-console
flyte-deployment
flyte-documentation
flyte-github
flyte-ui-ux
flytekit
flytekit-java
flytelab
great-content
hacktoberfest-2022
helsing-flyte
in-flyte-conversations
introductions
jobs
konan-integration
linkedin-flyte
random
ray-integration
ray-on-flyte
release
scipy-2022-sprint
sig-large-models
workflow-building-ui-proj
writing-w-sfloris
Powered by Linen
ask-the-community
  • b

    Byron Hsu

    12/20/2022, 9:58 PM
    HI team, we are looking at the rbac.yaml file for propeller. There is one rule confusing to us. It seems that it allows most verbs from all resources and all apiGroups, which might overwrite other rules.
    # Allow Access All plugin objects
    - apiGroups:
      - '*'
      resources:
      - '*'
      verbs:
      - get
      - list
      - watch
      - create
      - update
      - delete
      - patch
    Please help take a look. Thanks! @Yubo Wang
    y
    y
    k
    • 4
    • 24
  • s

    Slackbot

    12/21/2022, 2:18 PM
    This message was deleted.
    m
    k
    • 3
    • 13
  • z

    Zhiyi Li

    12/21/2022, 2:18 PM
    Hi team, is there any limit over the number of task a certain workflow can get scheduled? I first created a workflow with 100 node. Then created 10 execution of this workflow. Yet the max parallel tasks plateaus at 300. Another question is that even if I tried to set maxWorkflowNodes in admin configuration, the setting does not takes affect and it prevents me from registering workflow with more than 100 nodes. Are any of these two behavior expected?
    k
    d
    • 3
    • 18
  • d

    David Cupp

    12/21/2022, 4:13 PM
    Hello, I also have a question about scaling but on a different axis -- what is the expected way to scale Flyte servers horizontally. I can see that scaling the workers can be done by having multiple K8S clusters, but if I have millions or billions of workflows and the primary database and server get overwhelmed, is there a way to shard them? Or do I simply need to have multiple Flyte deployments at that point?
    k
    • 2
    • 5
  • d

    David Cupp

    12/21/2022, 6:51 PM
    Sorry in advance if this is answered somewhere in the docs: is there any idempotency when launching jobs via the API? I believe this is the request to launch a "Launch Plan": https://docs.flyte.org/projects/flyteidl/en/latest/protos/docs/admin/admin.html#ref-flyteidl-admin-executioncreaterequest ....but I can't tell what the behavior is if, for example, two duplicate requests arrive within minutes of each other. Does the
    name
    parameter, or something in the
    ExecutionSpec
    cause them to be deduplicated?
    k
    • 2
    • 3
  • l

    Laura Lin

    12/21/2022, 7:48 PM
    for deploying multi-cluster flyte, https://docs.flyte.org/en/latest/deployment/multicluster.html#data-plane-deployment, can I keep the original cluster as is (running both control and the data), and just add a data cluster? and when creating the additional cluster, I should follow https://docs.flyte.org/en/latest/deployment/aws/manual.html (Connect to an EKS Cluster, OIDC Provider for the EKS Cluster, Create an EKS Node Group, the latter half of Create an RDS Database where I just connect new cluster to existing RDS)? I don't need to do this
    Install an Amazon Loadbalancer Ingress Controller
    or to do the SSL cert stuff again right?
    y
    k
    • 3
    • 11
  • k

    Klemens Kasseroller

    12/22/2022, 10:53 AM
    Optional FlyteFile in dataclass In the following code example, my test_task_2 returns a dataclass with an Optional[FlyteFile] type. In test_task_1 I am directly returning an Optional[FlyteFile], which works fine, however in test_task_2 only the first non-optional file shows up on the remote. If the first example works, I would also expect, that my TaskOutput.file2 is uploaded correctly. Is this expected behavior / are optional types in dataclasses not supported? I am using flytekit 1.2.7 locally. Thanks for help
    from dataclasses import dataclass
    from typing import Optional
    
    from dataclasses_json import dataclass_json
    from flytekit import workflow, task
    from flytekit.types.file import FlyteFile
    
    
    @dataclass_json
    @dataclass
    class TaskOutput:
        file1: FlyteFile
        file2: Optional[FlyteFile]
    
    
    @task
    def test_task_1() -> Optional[FlyteFile]:
        with open("file11.txt", "w+") as f:
            f.write("")
        return FlyteFile("file11.txt", remote_path="<s3://test-bucket/tmp/file11.txt>")
    
    
    @task
    def test_task_2() -> TaskOutput:
        with open("file21.txt", "w+") as f:
            f.write("")
        with open("file22.txt", "w+") as f:
            f.write("")
    
        return TaskOutput(
            FlyteFile("file21.txt", remote_path="<s3://test-bucket/tmp/file21.txt>"),
            FlyteFile("file22.txt", remote_path="<s3://test-bucket/tmp/file22.txt>")
        )
    
    
    @workflow
    def test_workflow():
        test_task_1()
        test_task_2()
    
    
    if __name__ == '__main__':
        test_workflow()
    s
    e
    • 3
    • 6
  • a

    Andreas Fehlner

    12/22/2022, 1:40 PM
    I wonder if you could document the sandbox variant more to use it as a production variant. My target system is actually a single Unix server. From there, the Flyte sandbox variant as a single Docker container seems quite attractive? What am I missing here in my considerations?
    k
    • 2
    • 2
  • h

    honnix

    12/22/2022, 2:42 PM
    Hi. A quick question. I understand a backend plugin has the ability to enrich task execution metadata such as adding additional links to external systems. Can a container based task achieve the same? Thanks.
    k
    k
    • 3
    • 5
  • a

    Andrew Korzhuev

    12/22/2022, 3:12 PM
    Does Flyte sets any environment variables on the pods it runs which we could use? We’re interested in
    flytekit.current_context().execution_id.domain
    in particular. It’s a bit of a chore that now our libraries have to include 300+mb of deps of
    flytekit
    to just get the runtime env.
    y
    • 2
    • 5
  • e

    Evan Sadler

    12/22/2022, 3:42 PM
    I working through setting up the databricks plugin on the demo cluster here: https://github.com/flyteorg/flyte/blob/master/CHANGELOG/CHANGELOG-v1.3.0-b5.md. Could use some debugging help. See 🧵
    k
    t
    +3
    • 6
    • 35
  • l

    Leiqing

    12/23/2022, 6:01 AM
    Hi team, API spec shows that there is a
    tags
    field in
    TaskMetadata
    https://docs.flyte.org/projects/flyteidl/en/latest/protos/docs/core/core.html#ref-flyteidl-core-taskmetadata But we don’t find such field in the python representation: https://github.com/flyteorg/flytekit/blob/master/flytekit/models/task.py#L276-L308 Is there still a way to specify that?
    s
    k
    • 3
    • 4
  • p

    Pulkit Mishra

    12/25/2022, 3:54 PM
    Hi I'm trying to write a workflow that takes in video as input and simply writes it. Input can either be in uri variable which is a downloadable link to video or it can be in data which is base64 string It is known that if input is an uri then value of data is going to be 1. Thus I check if value of data is 1 and if it is then call download task which downloads video from uri and saves content in data. This data is then passed to write_video task I'm new to flyte and this code is what I have so far.
    from typing import List
    from flytekit import task, workflow, conditional
    import base64
    import requests
    import cv2
    
    @task
    def _is_base64(data:bytes) -> bool:
        try:
            return base64.b64encode(base64.b64decode(data)) == data
        except Exception:
            return False
    
    @task
    def download(uri:str) -> bytes:
        #code to download from url
        return data
    
    @task
    def write_video(data:bytes) -> None:
        if _is_base64(data=data):
            data = base64.b64decode(data)
        with open("input.mp4", "wb") as out_file:
            out_file.write(data)
    
    @workflow
    def wf(data:str, uri:str, job_id:str) -> None:
        write_video(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(data))
    however I am running into errors and would really appreciate some help.
    s
    • 2
    • 9
  • v

    varsha Parthasarathy

    12/26/2022, 4:47 PM
    Hi team, Can we set timeouts only for a sub-workflow which is defined under dynamic task?
    ground_truth_workflow = flytekit.LaunchPlan.get_or_create(
        name="ground_truth_workflow",
        workflow=GroundTruthOfflinePCPWorkFlow,
    )
    Dynamic tasks calls a launch plan.
    @flytekit.dynamic
    def run:
         # Figures out number of iterations
         ground_truth_workflow(..)
    for each iteration of “run” - is it possible to safely set a timeout ?
    k
    s
    • 3
    • 10
  • s

    seunggs

    12/27/2022, 5:54 PM
    Hi team, I’m seeing this error with
    python 3.10
    and
    numpy
    :
  • s

    seunggs

    12/27/2022, 5:54 PM
    /opt/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py:323: FutureWarning: In the future `np.bool` will be defined as the corresponding NumPy scalar.  (This may have returned Python scalars in past versions.
      _np.bool: SchemaType.SchemaColumn.SchemaColumnType.BOOLEAN,  # type: ignore
    Traceback (most recent call last):
      File "/opt/venv/bin/pyflyte", line 5, in <module>
        from flytekit.clis.sdk_in_container.pyflyte import main
      File "/opt/venv/lib/python3.10/site-packages/flytekit/__init__.py", line 195, in <module>
        from flytekit.types import directory, file, numpy, schema
      File "/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/__init__.py", line 1, in <module>
        from .types import (
      File "/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py", line 313, in <module>
        class FlyteSchemaTransformer(TypeTransformer[FlyteSchema]):
      File "/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py", line 323, in FlyteSchemaTransformer
        _np.bool: SchemaType.SchemaColumn.SchemaColumnType.BOOLEAN,  # type: ignore
      File "/opt/venv/lib/python3.10/site-packages/numpy/__init__.py", line 284, in __getattr__
        raise AttributeError("module {!r} has no attribute "
    AttributeError: module 'numpy' has no attribute 'bool'. Did you mean: 'bool_'?
    k
    y
    +5
    • 8
    • 49
  • s

    seunggs

    12/27/2022, 5:58 PM
    Tried
    numpy>=1.23
  • r

    Rupsha Chaudhuri

    12/28/2022, 5:23 AM
    Hi team.. I need some help. I have a flyte task that needs to read a parquet file stored in s3 (about 2 GB in size) and return 1000 records from it… I’m using awswrangler for reading the file into a pandas dataframe and then return the first 1000 records from it. Each task node has upto 8 GB in memory… but the pod keeps getting OOM killed with no useful info in the stack trace. Not sure what’s going on…
    k
    k
    b
    • 4
    • 66
  • r

    Rupsha Chaudhuri

    12/28/2022, 8:12 PM
    Question about caching behavior: If I have a task that’s reused between 2 workflows, does caching still work if the inputs are the same? Or is it tied to the workflow?
    d
    k
    • 3
    • 5
  • s

    seunggs

    12/29/2022, 4:11 AM
    So after successfully packaging the project (via
    pyflyte package
    ),
    flytectl register
    fails with a strange error I don’t understand:
  • s

    seunggs

    12/29/2022, 4:11 AM
    panic: runtime error: invalid memory address or nil pointer dereference
    [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x138ddbb]
    
    goroutine 1 [running]:
    <http://github.com/flyteorg/flytectl/cmd/core.CommandContext.AdminClient(...)|github.com/flyteorg/flytectl/cmd/core.CommandContext.AdminClient(...)>
    	/home/runner/work/flytectl/flytectl/cmd/core/cmd_ctx.go:57
    <http://github.com/flyteorg/flytectl/cmd/register.register({0x252bf70|github.com/flyteorg/flytectl/cmd/register.register({0x252bf70>, 0xc000116000}, {0x251e598?, 0xc0008d91a0?}, {0x0, {0x0, 0x0}, {0x0, 0x0}, {0x0, ...}, ...}, ...)
    	/home/runner/work/flytectl/flytectl/cmd/register/register_util.go:188 +0x37b
    <http://github.com/flyteorg/flytectl/cmd/register.registerFile({0x252bf70|github.com/flyteorg/flytectl/cmd/register.registerFile({0x252bf70>, 0xc000116000}, {0xc000166230, _}, {_, _, _}, {0x0, {0x0, 0x0}, ...}, ...)
    	/home/runner/work/flytectl/flytectl/cmd/register/register_util.go:606 +0x95b
    <http://github.com/flyteorg/flytectl/cmd/register.Register({0x252bf70|github.com/flyteorg/flytectl/cmd/register.Register({0x252bf70>, 0xc000116000}, {0xc0005d9b80?, 0x0?, 0x0?}, 0x36246c0, {0x0, {0x0, 0x0}, {0x0, ...}, ...})
    	/home/runner/work/flytectl/flytectl/cmd/register/files.go:160 +0x6ca
    <http://github.com/flyteorg/flytectl/cmd/register.registerFromFilesFunc(|github.com/flyteorg/flytectl/cmd/register.registerFromFilesFunc(>{0x252bf70, 0xc000116000}, {0xc0005d9b80, 0x1, 0xa}, {0x0, {0x0, 0x0}, {0x0, 0x0}, ...})
    	/home/runner/work/flytectl/flytectl/cmd/register/files.go:118 +0xcd
    <http://github.com/flyteorg/flytectl/cmd/core.generateCommandFunc.func1(0xc00064f900|github.com/flyteorg/flytectl/cmd/core.generateCommandFunc.func1(0xc00064f900>?, {0xc0005d9b80, 0x1, 0xa})
    	/home/runner/work/flytectl/flytectl/cmd/core/cmd.go:70 +0x93d
    <http://github.com/spf13/cobra.(*Command).execute(0xc00064f900|github.com/spf13/cobra.(*Command).execute(0xc00064f900>, {0xc0005d9a40, 0xa, 0xa})
    	/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.4.0/command.go:856 +0x67c
    <http://github.com/spf13/cobra.(*Command).ExecuteC(0xc0008fbb80)|github.com/spf13/cobra.(*Command).ExecuteC(0xc0008fbb80)>
    	/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.4.0/command.go:974 +0x3b4
    <http://github.com/spf13/cobra.(*Command).Execute(...)|github.com/spf13/cobra.(*Command).Execute(...)>
    	/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.4.0/command.go:902
    <http://github.com/flyteorg/flytectl/cmd.ExecuteCmd()|github.com/flyteorg/flytectl/cmd.ExecuteCmd()>
    	/home/runner/work/flytectl/flytectl/cmd/root.go:137 +0x1e
    main.main()
    	/home/runner/work/flytectl/flytectl/main.go:12 +0x1d
  • s

    seunggs

    12/29/2022, 4:11 AM
    Anyone has any idea what might be causing this? Thanks in advance for your help!
    k
    k
    • 3
    • 12
  • r

    Rupsha Chaudhuri

    12/29/2022, 8:13 PM
    Hi team.. so flytekit needs pyarrow >= 4.0.0 except pyarrow>2.0.0 (required by awswrangler) has this issue that I’m currently running into when trying to chunk and read a large parquet file.. Has anyone else in the group encountered this and if yes how did you get around it?
    k
    y
    +2
    • 5
    • 57
  • f

    Fredrik Lyford

    12/30/2022, 6:55 AM
    Good morning, I have been looking for a way to run backfill jobs. Typically we run daily batch jobs, which will work well with the Launch Plan. But being able to process old datasets each identified by a date is important (schemas change, things fail, etc.) I have been trying to generate lists of dates to process (something like generate_dates(start_date, num_days_back) -> List[str]). If I hardcode a for loop (for i in range(10)) and create dates that way with no particular inputs the workflow executes correctly, but it would be really nice to be able to take start date and number of days back as an input to the workflow (the blocker is that we cannot iterate over a Promise). I see some discussion on backfilling on Slack, so I am probably just missing something obvious. Is this already baked into LaunchPlan? Would love to hear how others have solved this before 🙂 This is the last major hurdle before we are ready to put Flyte into prod - it’s such an improvement to Luigi.
    s
    k
    n
    • 4
    • 21
  • m

    Mücahit

    12/30/2022, 11:46 AM
    Hi, how can I pass a custom environment variable to my tasks? Ideally via pyflyte/flytectl, can’t use Docker env var because the env var is going to set according to the deployed flyte cluster
    k
    • 2
    • 5
  • s

    seunggs

    01/01/2023, 4:08 AM
    I have a workflow that takes a dataclass (with variables inside of course) as an input - I’m trying to execute it via REST API (POST /api/v1/executions), but if I supply it with individual variables of the dataclass, it errors out (predictable, I guess). How can I specify the input correctly in the POST call?
  • s

    seunggs

    01/01/2023, 4:09 AM
    Here’s how I tried specifying individual variables (I based this on FlyteAdmin swagger docs):
  • s

    seunggs

    01/01/2023, 4:09 AM
    {
        "literals": {
            "n_samples": {
                "scalar": {
                    "primitive": {
                        "integer": "1000"
                    }
                }
            },
            ...
            "n_estimator": {
                "scalar": {
                    "primitive": {
                        "integer": "100"
                    }
                }
            }
        }
    }
  • s

    seunggs

    01/01/2023, 4:09 AM
    I get this error back:
  • s

    seunggs

    01/01/2023, 4:09 AM
    invalid input n_estimator
Powered by Linen
Title
s

seunggs

01/01/2023, 4:09 AM
invalid input n_estimator
View count: 6