https://flyte.org logo
Join the conversationJoin Slack
Channels
announcements
ask-the-community
auth
conference-talks
contribute
databricks-integration
datahub-flyte
deployment
ecosystem-unionml
engineeringlabs
events
feature-discussions
flyte-bazel
flyte-build
flyte-console
flyte-deployment
flyte-documentation
flyte-github
flyte-ui-ux
flytekit
flytekit-java
flytelab
great-content
hacktoberfest-2022
helsing-flyte
in-flyte-conversations
introductions
jobs
konan-integration
linkedin-flyte
random
ray-integration
ray-on-flyte
release
scipy-2022-sprint
sig-large-models
workflow-building-ui-proj
writing-w-sfloris
Powered by Linen
ask-the-community
  • l

    Louis DiNatale

    12/08/2022, 4:55 PM
    Is there a hard limit on allowed for cache lookup? We are running into this error.
    array size > max allowed for cache lookup. requested [15434]. writer allowed [10000] reader allowed [10000]
    k
    • 2
    • 3
  • e

    Evgeny Romanov

    12/09/2022, 9:21 AM
    Hello Flyte community, I have a question related to cloud events. There is an integration with Kafka, and what I found so far is that you can configure: - Brokers - Topic name - Events types - Kafka Version As I can see almost default config is used here: https://github.com/flyteorg/flyteadmin/blob/master/pkg/async/cloudevent/factory.go#L65-L67 Is there a way to configure Kafka authentication method? Maybe there are hidden ways to do that, for example using env variables? Sorry, I'm not very familiar with Sarama library which is used for the integration.
    s
    k
    e
    • 4
    • 9
  • v

    varsha Parthasarathy

    12/11/2022, 3:51 AM
    Hi team, Suppose I have a flyte workflow A, I would like to get the “running” instances of this workflow from our cluster. For example, there are 5 instances of this workflow currently running in production. Is there a Flytekit API to retrieve all running exec id of workflow A? Thanks in advance 🙏
    s
    • 2
    • 2
  • a

    Adrian Garcia Badaracco

    12/12/2022, 1:45 AM
    There are several good comparison articles between Flyte and Airflow, but I haven't seen many comparisons to other orchestrators. I'm particularly interested in Dagster. Any plans to add something like this to a FAQ or blog post?
    s
    k
    p
    • 4
    • 12
  • n

    Nischel Kandru (Woven Planet)

    12/12/2022, 3:01 AM
    Hello all Can you please help me with the below questions pertaining to the
    task_executions
    table in flyte
    AdminDb
    ? 1. Can you please let me know when are
    started_at
    and
    duration
    set to
    null
    ? 2. I am of the impression that {
    created_at
    and
    task_execution_created_at
    }, {
    updated_at
    and
    task_execution_updated_at
    } should be the same. Can you please confirm this? cc @Yee @Alex Pozimenko (FYI)
    d
    • 2
    • 1
  • t

    Tom Stokes

    12/12/2022, 1:38 PM
    Hey all, I'm having a little trouble running a workflow in the Flyte sandbox on my local machine - in particular, the workflow that I'm attempting to run is failing to pull the image that I've built within the sandbox. Here you can see the containers that I have running on my host:
    $ docker ps
    >>>
    CONTAINER ID   IMAGE                                                                               COMMAND                  CREATED             STATUS             PORTS                                                                                                                 NAMES
    
    dbf8f5dcb150   <http://cr.flyte.org/flyteorg/flyte-sandbox:dind-bfa1dd4e6057b6fc16272579d61df7b1832b96a7|cr.flyte.org/flyteorg/flyte-sandbox:dind-bfa1dd4e6057b6fc16272579d61df7b1832b96a7>   "tini flyte-entrypoi…"   About an hour ago   Up About an hour   0.0.0.0:30081-30082->30081-30082/tcp, 0.0.0.0:30084->30084/tcp, 2375-2376/tcp, 0.0.0.0:30086-30088->30086-30088/tcp   flyte-sandbox
    From which we can then find the images that exist inside the
    dbf8f5dcb150
    container:
    $ docker exec -it dbf8f5dcb150 docker image ls
    >>>
    REPOSITORY                                     TAG                       IMAGE ID       CREATED          SIZE
    papermill-exploration                          latest                    3c40c6deb126   23 minutes ago   948MB
    ...
    I can see my project in there under the tag
    papermill-exploration:latest
    . I then serialize and submit my workflow spec as follows:
    pyflyte --pkgs workflows package -f --image "papermill-exploration:latest"
    flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v2
    All of which works:
    $ flytectl get workflows --project flytesnacks --domain development  
    >>>        
     --------- ------------------------------------ ----------------------------- 
    | VERSION | NAME                               | CREATED AT                  |
     --------- ------------------------------------ ----------------------------- 
    | v2      | workflows.workflow.nb_to_python_wf | 2022-12-12T12:41:53.987960Z |
     --------- ------------------------------------ ----------------------------- 
    | v1      | workflows.workflow.nb_to_python_wf | 2022-12-12T12:33:08.295661Z |
     --------- ------------------------------------ ----------------------------- 
    2 rows
    I then attempt to invoke the workflow, but the resulting pod cannot pull the image:
    $ flytectl get execution --project flytesnacks --domain development azlfqvzfsbz4lr8pbmlt
    >>>
     ---------------------- ------------------------------------ ------------- -------- ---------------- -------------------------------- --------------- -------------------- --------------------------------------------------------- 
    | NAME                 | LAUNCH PLAN NAME                   | TYPE        | PHASE  | SCHEDULED TIME | STARTED                        | ELAPSED TIME  | ABORT DATA (TRUNC) | ERROR DATA (TRUNC)                                      |
     ---------------------- ------------------------------------ ------------- -------- ---------------- -------------------------------- --------------- -------------------- --------------------------------------------------------- 
    | azlfqvzfsbz4lr8pbmlt | workflows.workflow.nb_to_python_wf | LAUNCH_PLAN | FAILED |                | 2022-12-12T13:07:23.548693519Z | 23.161600293s |                    | [1/1] currentAttempt done. Last Error: USER::containers |
    |                      |                                    |             |        |                |                                |               |                    | with unready status: [azlfqvzfsbz4lr8pbmlt-n            |
     ---------------------- ------------------------------------ ------------- -------- ---------------- -------------------------------- --------------- -------------------- --------------------------------------------------------- 
    1 rows
    
    $ docker exec -it dbf8f5dcb150 kubectl -n flytesnacks-development describe pod azlfqvzfsbz4lr8pbmlt-n0-0
    >>>
    ...
    Events:
      Type     Reason     Age                    From               Message
      ----     ------     ----                   ----               -------
      Normal   Scheduled  27m                    default-scheduler  Successfully assigned flytesnacks-development/azlfqvzfsbz4lr8pbmlt-n0-0 to dbf8f5dcb150
      Normal   Pulling    25m (x4 over 27m)      kubelet            Pulling image "papermill-exploration:latest"
      Warning  Failed     25m (x4 over 27m)      kubelet            Failed to pull image "papermill-exploration:latest": rpc error: code = Unknown desc = Error response from daemon: pull access denied for papermill-exploration, repository does not exist or may require 'docker login': denied: requested access to the resource is denied
      Warning  Failed     25m (x4 over 27m)      kubelet            Error: ErrImagePull
      Warning  Failed     25m (x6 over 27m)      kubelet            Error: ImagePullBackOff
      Normal   BackOff    2m22s (x106 over 27m)  kubelet            Back-off pulling image "papermill-exploration:latest"
    Have I missed something here? Are the pods not authenticated against the docker repo? Or am I not specifying my images correctly?
    s
    y
    +2
    • 5
    • 15
  • g

    George D. Torres

    12/12/2022, 3:02 PM
    This should hopefully be an easy one: how can I get the node to task name mapping from a workflow, given only the workflow python code? For example,
    n0 -> taskA
    ,
    n1 -> taskB
    , etc
    s
    • 2
    • 2
  • h

    honnix

    12/12/2022, 4:08 PM
    We are considering dropping Java 8 support in flytekit-java and moving to Java 11. Please let us know (reply to this thread) if you have any concerns. Thank you.
  • r

    Ruksana Kabealo

    12/12/2022, 7:03 PM
    Hello everyone! I am setting up Flyte and I am getting a CrashLoopBackoff error for the syncresources pod. When I look at the logs for the pod I get a "hostname resolving error" for looking up postgres on 8.8.8.8:53. I'm confused what this error means exactly. Any ideas?
    y
    • 2
    • 2
  • t

    Tarmily Wen

    12/12/2022, 11:32 PM
    I am currently trying to combine dynamic overrides with ray to dynamically determine the ray cluster size when running a task. But when the task under the dynamic runs, it is considered a python task and never creates a ray cluster.
    @dynamic
    def request_train_resources_task(
        compressed_dataset: FlyteFile,
        train_parameters: TrainParameters,
    ) -> TrainOutputs:
        task_config = RayJobConfig(
            head_node_config=HeadNodeConfig(
                ray_start_params={
                    "block": "true",
                }
            ),
            worker_node_config=[
                WorkerNodeConfig(
                    group_name="ray-group",
                    replicas=train_parameters.num_nodes - 1,
                )
            ],
        )
        resources_request = Resources(
            cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
            mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
            gpu=str(train_parameters.num_gpus),
            ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
        )
        print(f"task_config: {task_config}")
        print(f"resources_request: {resources_request}")
        return pl_mnist_train_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
    y
    k
    • 3
    • 55
  • s

    seunggs

    12/13/2022, 2:29 AM
    Hi, is there a way to delete a flyte project? Looks like it can only be archived (at least via REST api)?
    s
    y
    k
    • 4
    • 9
  • a

    Adrian Rumpold

    12/13/2022, 12:07 PM
    Hi! Is there an easy way to obtain an instance of a
    FlyteRemote
    or similar (i.e.,
    flytekit.clients.friendly
    ) API client instance when running in a container? I have seen
    Config.auto()
    , but it needs a config file to do its magic. Also, there's
    FlyteContext.flyte_client
    , which looks promising but is set to
    None
    , when I try to look it up on a context obtained via
    FlyteContextManager.current_context()
    at runtime. Any pointers are highly appreciated! 🙏
    y
    • 2
    • 8
  • j

    Jay Ganbat

    12/13/2022, 5:37 PM
    Hey Flyte folks, just a quick question about execution times. Is there a way to know how long a task has been in queue and how long it was actively run, i assume the execution time shown in console combines the 2 right?
    d
    • 2
    • 7
  • s

    seunggs

    12/13/2022, 11:25 PM
    Is there a way to run flyte validation prior to the build process? Based on the docs, I have to currently build an image before I can attempt to package/register the workflow to flyte. Since build can take a long time, it’d be great to be able to validate the workflow beforehand - is there a best practice for doing this?
    y
    • 2
    • 19
  • r

    Rupsha Chaudhuri

    12/14/2022, 6:09 PM
    Hi team.. I’m exploring Flyte’s map task for a workflow I’m working on. If I have a very large data set (say 100 million rows)… how efficient is the map task in processing this row by row vs chunks of rows? Does Flyte spin up and tear down the task for every row or is it more like a streaming model?
    d
    n
    y
    • 4
    • 21
  • t

    Tarmily Wen

    12/14/2022, 10:49 PM
    Hi are there event based triggers in flyte like argo's?
    k
    • 2
    • 2
  • l

    Laura Lin

    12/14/2022, 11:22 PM
    Is there a way from within a flyte job to update some kind of status on the flyte UI? It would be nice to get status messages for a running task without needing to check logs.
    k
    • 2
    • 7
  • l

    Laura Lin

    12/15/2022, 1:11 AM
    when I have the map_tasks executions expanded, e.g. the running, succeeded tabs, it will throw this error on update
    s
    k
    t
    • 4
    • 9
  • k

    Kyle B

    12/15/2022, 5:21 AM
    Hi, I'm trying to use flyte with mlflow. I have the flyte sandbox up and running and an mlflow docker container running in the sandbox. I created a user defined bridge network [from here] and can connect the mlflow docker container to it. Is there anyway to pass the
    --net
    flag to the flyte execution so that it can network to other containers using their name(e.g.
    https://<dockercontainername>:5000
    ) instead of the ip? Is there a better way of setting up mlflow instead of it having its own docker container that Is manually started using
    docker run
    ?
    s
    k
    +3
    • 6
    • 13
  • v

    varsha Parthasarathy

    12/15/2022, 12:08 PM
    Hi team, I have a dynamic taslk which launches a launch plan(workflow) When if one of the workflow fails, all of the other launches gets aborted, How to let flyte know not to do this? Its okay if one launch task fail, continue with the rest
    m
    d
    j
    • 4
    • 16
  • l

    Louis DiNatale

    12/15/2022, 4:34 PM
    After a recent update to our flyte helm chart, recovery has stopped working. We can relaunch still but we are getting an error
    Failed to check the existence of deck file.
    d
    y
    • 3
    • 8
  • j

    Jay Ganbat

    12/15/2022, 10:09 PM
    Hi Flyte folks, does flyte local execution use any parallelization for dynamic tasks
    y
    • 2
    • 4
  • t

    Thomas Kobber Panum

    12/16/2022, 10:35 AM
    I'm struggling with installing Flyte core on our private Azure K8S (AKS) cluster using the official Helm Charts. Particularly, specifying
    storage.type: custom
    and
    storage.custom: {}
    to make Stow use Azure Blob Storage (non-S3 compatible). Does anyone have an example of using
    storage.type: custom
    correctly?
    n
    k
    +3
    • 6
    • 27
  • m

    Muthukrishnan Ramya

    12/16/2022, 2:55 PM
    Hi all.. Is it possible to view the Kubernetes logs if we enable kubernetes dashboard UI in Flyte. If yes, could you please share a some template? If there is any other better way, please help me some examples? @Samhita Alla @Ketan (kumare3)
    k
    • 2
    • 2
  • k

    Ketan (kumare3)

    12/17/2022, 8:07 AM
    Merry Christmas and early wishes for the new year. Flyte is close to 3k GitHub stars. Please help the community- star it and push it over the 3k mark before the new year. Go Flyte community!! https://github.com/flyteorg/flyte
  • b

    Bernhard Stadlbauer

    12/19/2022, 12:35 PM
    Hi everyone! I saw that the latest version of
    flytekit
    on
    conda-forge
    (link) is
    1.1.0
    . From what it looks like, most of what this would need is merging the associated PRs in the flytekit-feedstock. Is someone maintaining this at the moment? If not, happy to help here. cc @Sugato Ray
    k
    s
    n
    • 4
    • 5
  • k

    Katrina P

    12/19/2022, 3:04 PM
    Happy Monday! First time seeing this error and it's not super descriptive. Any debugging hints from anyone who has seen this before? This is returning during
    pyflyte package
    and
    flytectl register
    Successfully serialized 3 flyte objects
      Packaging app_module.workflow.extract_emails -> 0_app_module.workflow.xx_1.pb
      Packaging <http://app_module.workflow.cx|app_module.workflow.cx> -> 1_app_module.workflow.a_2.pb
      Packaging <http://app_module.workflow.cx|app_module.workflow.cx> -> 2_app_module.workflow.a_3.pb
    Successfully packaged 3 flyte objects into /root/flyte-package.tgz
    tar: Removing leading `/' from member names
    drwx------ root/root         0 2022-12-19 14:39 /
    -rw-r--r-- root/root      6109 2022-12-19 14:39 0_app_module.workflow.xx_1.pb
    -rw-r--r-- root/root       217 2022-12-19 14:39 1_app_module.workflow.a_2.pb
    -rw-r--r-- root/root       194 2022-12-19 14:39 2_app_module.workflow.a_3.pb
    Error: input package have some invalid files. try to run pyflyte package again [flyte-package.tgz]
    {"json":{},"level":"error","msg":"input package have some invalid files. try to run pyflyte package again [flyte-package.tgz]","ts":"2022-12-19T14:39:46Z"}
    k
    y
    • 3
    • 3
  • l

    Louis DiNatale

    12/19/2022, 4:54 PM
    Hey flyte community I have a question about pod tasks. If i want to add node_selector to a pod is it as simple as this?
    @task(
        task_config=Pod(
            pod_spec=V1PodSpec(
                node_selector={'node_group': 'memory'},
            ),
        ),
    )
    f
    k
    y
    • 4
    • 9
  • d

    David Cupp

    12/19/2022, 10:33 PM
    Hello, I have a question regarding Flyte's scheduling capabilities: Is it true to say that Flyte cannot do anything more complicated than cron? Based on the docs, it looks like the most advanced scheduling options are: 1. interval scheduling [1] 2. cron scheduling [1] 3. AWS cron scheduling [2] 4. manually with an SQS queue? [1] I'd like to know if there is anyway to create schedules with the same expressive power as rrule [3] or something similar. Something weird like "every 3rd thursday except for january or wendesdays" which can't be represented unless you have a list of calendar rules. Also, if Flyte doesnt support advance scheduling out of the box...is it possible to add support for rrule using a core plugin [4]? (I'm guessing the answer is no since it seems designed to handle task executions, but just want to make sure I do my due diligence) [1] https://docs.flyte.org/projects/cookbook/en/latest/auto/core/scheduled_workflows/lp_schedules.html#sphx-glr-auto-core-scheduled-workflows-lp-schedules-py [2] https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html [3] https://dateutil.readthedocs.io/en/stable/rrule.html [4] https://pkg.go.dev/github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core#Plugin
    k
    • 2
    • 6
  • d

    David Cupp

    12/20/2022, 4:24 PM
    Is it possible to construct a "dynamic workflow" using the Flyte "imperative" API? The DAGs I want to execute will be described by some rpc call:
    get_dag( job_input ) ->   DAG of task inputs
    In Flyte, I need to read the response, which is a DAG represented as an adjacency list. The directed edges say which tasks need to be executed before which other tasks, and each node in the graph has a payload which is the input to the task. This
    get_dag()
    call happens at runtime, so I understand from [1] that I can do this:
    @dynamic
    start_workflow(...) -> ...:
       dag = get_dag(...)
       ???
    
    @workflow
    def my_workflow(...) -> None
      return start_workflow(...)
    but I can't figure out how to use "imperative workflows"[2] at the same time. How do I get a `
    wf = Workflow(name="my.imperative.workflow.example")
    ? I did find the dynamic workflow in the API docs: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html But how do I use it? Do I need to create a "sub workflow" or is there a more direct way? [1] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/control_flow/dynamics.html [2] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/flyte_basics/imperative_wf_style.html
    y
    • 2
    • 9
Powered by Linen
Title
d

David Cupp

12/20/2022, 4:24 PM
Is it possible to construct a "dynamic workflow" using the Flyte "imperative" API? The DAGs I want to execute will be described by some rpc call:
get_dag( job_input ) ->   DAG of task inputs
In Flyte, I need to read the response, which is a DAG represented as an adjacency list. The directed edges say which tasks need to be executed before which other tasks, and each node in the graph has a payload which is the input to the task. This
get_dag()
call happens at runtime, so I understand from [1] that I can do this:
@dynamic
start_workflow(...) -> ...:
   dag = get_dag(...)
   ???

@workflow
def my_workflow(...) -> None
  return start_workflow(...)
but I can't figure out how to use "imperative workflows"[2] at the same time. How do I get a `
wf = Workflow(name="my.imperative.workflow.example")
? I did find the dynamic workflow in the API docs: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html But how do I use it? Do I need to create a "sub workflow" or is there a more direct way? [1] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/control_flow/dynamics.html [2] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/flyte_basics/imperative_wf_style.html
If anyone finds this later, I still don't know how to mix the dynamic and "imperitive" APIs however I was able to accomplish my goal using the dynamic API only, by setting up faking outputs from tasks and mapping them to inputs of other tasks.
74 def _run_task(task_id, adj_list, submitted_tasks):
 75     if task_id in submitted_tasks:
 76         # we already ran it
 77         return submitted_tasks[task_id]
 78 
 79     mylist = adj_list.get((int(task_id)))
 80     dep_ids = []
 81     if mylist is not None:
 82         dep_ids = list(adj_list[int(task_id)])
 83 
 84     markers = []
 85     for dep_id in dep_ids:
 86         if dep_id in submitted_tasks:
 87             markers.append(submitted_tasks[dep_id])
 88         else:
 89             markers.append(_run_task(dep_id, adj_list, submitted_tas
 90 
 91     marker = run_task(task_input="TODO", markers=markers)
 92     submitted_tasks[task_id] = marker
 93     return submitted_tasks[task_id]
 94 
 95 @dynamic
 96 def wf4(n: int = 5) -> None:
 97 
 98     nodes, adj_list = get_dag()
 99     submitted_tasks = {}
100     for node_id in nodes:
101         _run_task(node_id, adj_list, submitted_tasks)
y

Yee

12/20/2022, 10:03 PM
@David Cupp sorry for the delay - can you describe the use-case a bit more? what you’re doing is probably the right way to go, but wanted to understand a bit more about the ask, don’t think i’ve heard this one before
what does a dag of task inputs look like?
i guess i’d like to understand how a use-case like this can arise. it’s kinda meta… like you’re using a workflow engine, to run generic workflows or something, which is totally okay and kinda cool, but wanted to understand a bit more.
d

David Cupp

12/20/2022, 10:56 PM
Sorry for the delay; I was in meetings.
to run generic workflows or something
This is actually exactly what I want to do. My company has a lot of critical jobs that run as DAGs and our existing system has many flaws including unreliability and a lack of scalability. These jobs are currently owned by many different teams. This requirement about fully dynamic dags came from one of the learnings of that system. Right now the current system uses a plugin model where multiple teams all own code in the same process. This is terrible because: 1. every team needs to understand how the DAG execution system works 2. no team actually understands how it works; the first time someone sees the code they own is at 3am during an incident. Instead, the team that keeps the system online becomes the de facto owner of everyones code. So, for the brand new world of scheduling, I have instilled 1 requirement: no business logic in the scheduling/DAG execution system. ALL business logic must live in (preferably stateless) services outside the DAG execution system: 1. the definition of jobs comes from RPC methods like
getJobs()
2. the definition of a task (which we have always been able to calculate at runtime) comes from an RPC method
getDAG()
3. the actual business logic to run a task lives in an RPC method
runTask()
So the requirement to keep all business logic out of the scheduler is something that I am trying to impose. The requirement that DAGs must be calculated at runtime comes from the fact that our jobs already run this way -- we can compromise on this but we would have to rewrite many jobs completely with the new constraint.
To answer your question about the task inputs, the task inputs are actually opaque byte arrays serialized as base64 strings. The scheduling system is meant to have no idea what they are. These inputs are returned by
getDag()
and passed without modification directly to
runTask()
. The serialization mechanism is up to the people who implement
getDag()
and
runTask()
-- I provide a version field to allow people to change serialization mechanisms in a backwards-compatible way.
Oh and I chose base64 strings because I figured every DAG execution system on the planet would probably allow you to pass a UTF-8 string.
y

Yee

12/20/2022, 11:04 PM
so the goal is to use flyte as the execution/runtime engine, and retain existing systems for workflow creation and specification
d

David Cupp

12/20/2022, 11:07 PM
Yes. I think they want me to also use Flyte for scheduling as well, though we wouldn't do that unless/until we can submit a change to implement support for RRULE schedules.
View count: 12