https://flyte.org logo
Join the conversationJoin Slack
Channels
announcements
ask-the-community
auth
conference-talks
contribute
databricks-integration
datahub-flyte
deployment
ecosystem-unionml
engineeringlabs
events
feature-discussions
flyte-bazel
flyte-build
flyte-console
flyte-deployment
flyte-documentation
flyte-github
flyte-ui-ux
flytekit
flytekit-java
flytelab
great-content
hacktoberfest-2022
helsing-flyte
in-flyte-conversations
introductions
jobs
konan-integration
linkedin-flyte
random
ray-integration
ray-on-flyte
release
scipy-2022-sprint
sig-large-models
workflow-building-ui-proj
writing-w-sfloris
Powered by Linen
ask-the-community
  • s

    seunggs

    11/19/2022, 3:39 AM
    Hi, tyring out
    flytekit.extras
    types, but it doesn’t seem to exist in
    flytekit
    (v1.2.3) - do I need to install a separate module for this?
    k
    n
    +2
    • 5
    • 40
  • f

    Felix Ruess

    11/19/2022, 7:46 PM
    @Ketan (kumare3) How is the S3 config supposed to work? If add
    AWS_ACCESS_KEY_ID
    env var to my container/pod, the
    FLYTE_AWS_ACCESS_KEY_ID
    is ignored... So this means I can not set these env vars in my container for use in my user code... Shouldn't the flyte "internal" things always use the FLYTE_AWS_ACCESS_ vars?
  • f

    Felix Ruess

    11/19/2022, 7:51 PM
    And shouldn't it read it from the storage configmap instead of env vars? Not even talking about having the secret key in the configmap instead of a secret...
  • b

    Bernardo Galvao

    11/21/2022, 8:46 AM
    Hey, hello.I have a dataframe with 150ish columns, which schema was inferred with
    pandera.infer_schema
    . Now, I want to build a model inference server with fastapi. The thing is, I have no way of converting
    DataFrameSchema
    to
    SchemaModel
    , so that it can be used with FastAPI and have nicely generated Swagger UI docs. Can anyoone help me convert
    DataFrameSchema
    to
    SchemaModel
    ? Or get to a solution to use with FastAPI?
    s
    n
    k
    • 4
    • 4
  • c

    Calvin Leather

    11/21/2022, 8:16 PM
    Hey all, does anyone have experience using a dynamic with_overrides to set resource requests/limits at scale? We've been testing the following example in our dev cluster and its been working okay, wanted to start using this at scale and wanted to see if people are also doing this in larger workflows (e.g., 100's of dynamic nodes). I can't see any K8s or Flyte reason why this would have issues in the future (maybe funky interactions between heterogenous pod requests and K8s/EKS cluster autoscaling?), but its so nice and simple that it seems too good to be true.
    @task
    def test_task(input: int) -> int:
        import time
        time.sleep(600)
        return input + 1
    
    
    @dynamic
    def test_dynamic(list_of_ints: List[int]) -> List[int]:
        results = []
        for integer in list_of_ints:
            result = test_task(input=integer).with_overrides(requests=Resources(cpu=f"{integer}", mem=f"{integer}Gi"))
            results.append(result)
        return results
    y
    k
    • 3
    • 9
  • l

    Louis DiNatale

    11/22/2022, 3:08 PM
    I have a failed workflow, that I for some reason had an error that just said “unknown” I stopped the workflow so i could restart it and it now doesnt even have the relaunch recover options.
    k
    k
    • 3
    • 37
  • t

    Tarmily Wen

    11/22/2022, 5:30 PM
    Is there a way to automatically taint a few node pools to allow gpu access upon creation? Like this but through the values.yaml when helm installing? Also what should the key and value be for the resource tolerations?
    y
    • 2
    • 12
  • l

    Louis DiNatale

    11/22/2022, 6:19 PM
    When you use the clone execution will this use the cache from the previous execution?
    y
    • 2
    • 5
  • f

    Frank Shen

    11/22/2022, 6:19 PM
    I am using Ray with Modin to process large dataset in my workflow. Therefore I use modin.pandas.DataFrame & modin.pandas.Series instead of pandas version of the DataFrame & Series in my task’s input params and return values. However, the data serialization error messages I got below suggested that modin.pandas.DataFrame & modin.pandas.Series are not supported by Flyte yet. Am I correct? I think Ray with Modin is an high impact feature since Flyte team wants to support Ray. What will be the process of submitting a Change Request? Thanks. CC: @Kevin Su @Eduardo Apolinario (eapolinario)
    flytekit.exceptions.scopes.FlyteScopedUserException: Could not find a renderer for <class 'modin.pandas.dataframe.DataFrame'>
    ...
      File ".../flytekit/types/structured/structured_dataset.py", line 699, in to_html
        raise NotImplementedError(f"Could not find a renderer for {type(df)} in {self.Renderers}")
    NotImplementedError: Could not find a renderer for <class 'modin.pandas.dataframe.DataFrame'> in {<class 'pandas.core.frame.DataFrame'>: <flytekit.deck.renderer.TopFrameRenderer object at 0x
    e
    k
    • 3
    • 57
  • a

    Ailin Yu

    11/22/2022, 6:31 PM
    Hello! I have a workflow which chains together several sub-workflows, including two
    reference_launch_plans
    and a
    dynamic
    workflow. When I try to view this workflow in the Flyte console, everything shows up as unknown (Node ID’s, type, workflow/task name, etc.), and the graph doesn’t render. When I try to get execution details using
    flytectl get execution --details
    , I get this error:
    {"json":{},"level":"info","msg":"Retrieved 1 executions","ts":"2022-11-22T10:25:18-08:00"}
    Error: unsupported literal scalar type *core.Scalar_Union
    {"json":{},"level":"error","msg":"unsupported literal scalar type *core.Scalar_Union","ts":"2022-11-22T10:27:16-08:00"}
    Is what I’m trying to do here something that isn’t supported?
    y
    p
    • 3
    • 33
  • a

    Arshak Ulubabyan

    11/23/2022, 11:34 AM
    Hi! is there a way to change the default lunchplan for a workflow to be a different lunchplan than the one created automatically?
    k
    e
    +3
    • 6
    • 14
  • a

    Andrew Korzhuev

    11/23/2022, 12:23 PM
    I was trying to make the correct template uri for CloudWatch logs, but it appears that
    {{.hostname}}
    is not propagated from the pods correctly. Were anybody looking into that issue? https://github.com/flyteorg/flyte/issues/2635 https://github.com/flyteorg/flyteplugins/pull/293
    k
    h
    • 3
    • 5
  • b

    Babis Kiosidis

    11/23/2022, 2:09 PM
    Hey friends, We use single task execution as integration tests for some platform tasks we built as a team in the platform. We've been seeing "flakiness" in these tests and I looked a bit more into it today. It looks like the TaskIdentifier is used to create the single task execution, which means that there will be a conflict if 2 executions with the same task are launched https://github.com/flyteorg/flyteadmin/blob/master/pkg/manager/impl/execution_manager.go#L572 Does t hat sound right? We usually launch one task that will succeed and one that will fail, and collect the executions to validate the behaviour.
    k
    • 2
    • 13
  • s

    seunggs

    11/23/2022, 2:14 PM
    Using named tuple as a task output seems to be erroring out:
    AssertionError: Tuples are not a supported type for individual values in Flyte - got a tuple - Output(model_state=Promise(node:n0.model_state)). If using named tuple in an inner task, please, de-reference theactual attribute that you want to use. For example, in NamedTuple('OP', x=int) thenreturn v.x, instead of v, even if this has a single element
  • s

    seunggs

    11/23/2022, 2:15 PM
    Even though in the docs (in “Named Outputs”), it shows
    hello_output = typing.NamedTuple("OP", greet=str)
    @task
    def say_hello() -> hello_output:
        return hello_output("hello world")
  • s

    seunggs

    11/23/2022, 2:15 PM
    My code:
  • s

    seunggs

    11/23/2022, 2:16 PM
    TrainingOutputs = typing.NamedTuple(
        "TrainingOutputs",
        model_state=FlyteFile,
    )
    
    def train_task(hp: Hyperparameters) -> TrainingOutputs:
        ...
        return TrainingOutputs(model_state=FlyteFile(MODEL_PATH))
  • s

    seunggs

    11/23/2022, 2:16 PM
    Anyone has any idea what I’m doing wrong here?
    e
    • 2
    • 29
  • k

    Katrina P

    11/23/2022, 6:26 PM
    Trying to debug some odd behavior. I have two separate environments with the same propeller configmap deployed, but the Console UI is showing different logging links. One is showing what I configured in the config map, but the other is showing the default settings. I pulled the configmaps using
    kubectl
    but they're the same. Is this correct configuration for updating log links in propeller? Do I need to restart anything other than propeller to update the log links?
    task_logs.yaml: | 
        plugins:
          logs:
            cloudwatch-enabled: false
            kubernetes-enabled: false
            templates:
              - displayName: "Data Dog"
                templateUris:
                  - "<https://app.datadoghq.com/logs?query=pod_name%3A{{> .podName }}%20kube_namespace%3A{{ .namespace }}%20&cols=&index=&messageDisplay=inline&stream_sort=desc&viz=stream&from_ts={{ .podUnixStartTime }}000&to_ts={{ .podUnixFinishTime }}000&live=false"
    g
    d
    • 3
    • 19
  • k

    Kilian Tep

    11/24/2022, 4:27 AM
    Hello flyte community, I’m facing the following bug: I’ve defined a task function:
    @task
    def to_training_config(cfg: DictConfig) -> TrainingConfig:
        return TrainingConfig(**cfg)
    TrainingConfig
    is simply a custom
    dataclass_json
    object. This function simply aims to convert a
    DictConfig
    object to the dataclass so I can reuse it later within my workflow. I am getting the following error:
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    /opt/venv/lib/python3.7/site-packages/flytekit/core/base_task.py in dispatch_execute(self, ctx, input_literal_map)
        521                 try:
    --> 522                     literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
        523                 except Exception as e:
    
    /opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(cls, ctx, python_val, python_type, expected)
        751 
    --> 752         lv = transformer.to_literal(ctx, python_val, python_type, expected)
        753 
    
    /opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(self, ctx, python_val, python_type, expected)
        349             )
    --> 350         self._serialize_flyte_type(python_val, python_type)
        351         return Literal(
    
    /opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
        397                 field_type = v.type
    --> 398                 python_val.__setattr__(v.name, self._serialize_flyte_type(val, field_type))
        399             return python_val
    
    /opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
        395             for v in dataclasses.fields(python_type):
    --> 396                 val = python_val.__getattribute__(v.name)
        397                 field_type = v.type
    
    AttributeError: 'DictConfig' object has no attribute 'name'
    Note
    : this works perfectly fine when I remove the
    @task
    decorator. Any idea what could be the cause?
    k
    s
    • 3
    • 23
  • s

    SeungTaeKim

    11/24/2022, 4:49 AM
    Hello, Flyte team! it it available to share CRD with more than 2 flyte application (just differentiate storage, db, and hostname but other settings are the same)? my team would like to deploy more than 2 flyte application in k8s cluster, but I guess flyte does not share CRD each other. (CRD is across namespace)
    k
    • 2
    • 4
  • c

    Chandramoulee K V

    11/24/2022, 8:05 AM
    Hello, I was exploring on Kubernetes Spark job and i tried to implement it by following this Documentation . This is done in a EKS setup. I have created a custom docker image for spark as specified in the documentation, (only thing i did was i commented the following out in the docker file
    # Copy the makefile targets to expose on the container. This makes it easier to register.
    # Delete this after we update CI to not serialize inside the container
    # COPY k8s_spark/sandbox.config /root
    # Copy the actual code
    # COPY k8s_spark/ /root/k8s_spark
    # This tag is supplied by the build script and will be used to determine the version
    # when registering tasks, workflows, and launch plans
    # ARG tag
    # ENV FLYTE_INTERNAL_IMAGE $tag
    # Copy over the helper script that the SDK relies on
    # RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
    # RUN chmod a+x /usr/local/bin/flytekit_venv
    ) I registered the sample pyspark workflow with the image and i am facing this issue:
    failed
    SYSTEM ERROR! Contact platform administrators.
    When looking at the logs in aws i found that it was unable to load native-hadoop library warning could this be the cause of this issue any idea?
    {"log":"22/11/24 07:03:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    ","stream":"stderr","docker":{"container_id":"XXX"},"kubernetes":{"container_name":"YYY","namespace_name":"flytesnacks-development","pod_name":"ZZZ","pod_id":"AAA","namespace_id":"BBB","namespace_labels":{"kubernetes_io/metadata_name":"flytesnacks-development"}}}
    s
    • 2
    • 29
  • l

    Liliana Mamani Sanchez

    11/24/2022, 11:02 AM
    Good morning! I've been diving in the documentation, and also searching here. Still, have some questions regarding the Flyte framework/tools: • Is it possible to set post-processing steps after error recovery? • is it possible to search in workflow logs? • is there the possibility of restricting workflow operations for certain users/groups? • any planned integration with Clickhouse? or is it possible to do it currently? • are there any options for official paid support? also, please, if you have any experiences about any of these points, I'd be glad to hear about it. Thanks
    s
    k
    • 3
    • 3
  • z

    Zhiyi Li

    11/25/2022, 2:18 PM
    Hey, I am trying to configure Kafka to consume cloudevents. Seems like I was getting errors at kafka initialization.
    {"json":{"src":"base.go:73"},"level":"fatal","msg":"caught panic: kafka: client has run out of available brokers to talk to (Is your cluster reachable?) [goroutine 1 [running]:\nruntime/debug.Stack()\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x65\<http://ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.NewAdminServer.func1()\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/base.go:73|ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.NewAdminServer.func1()\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/base.go:73> +0x88\npanic({0x224d580, 0xc0009c8300})\n\t/usr/local/go/src/runtime/panic.go:838 +0x207\<http://ngithub.com/flyteorg/flyteadmin/pkg/async/cloudevent.NewCloudEventsPublisher(|ngithub.com/flyteorg/flyteadmin/pkg/async/cloudevent.NewCloudEventsPublisher(>{0x2bd10a0, 0xc00005a018}, {0x1, {0xc000f2e460, 0x5}, {{0x0, 0x0}},
    The weird thing is that we tried to use the same version sarama client to connect to the kafka instance in the same cluster, and it works. (Also I was able to success at this step, yet after I redeployed with eventTypes set to a single type it failed and could never get back to normal) The cloudevent config I used is the following:
    cloudevents:
        enable: true
        kafka:
          brokers: kafkaip
          version:
            version:
              - 2
              - 2
              - 0
              - 0
        eventsPublisher:
          eventTypes:
          - all
          topicName: workflow-engine-test
        type: Kafka
    Any clue what could be the problem?
    s
    d
    k
    • 4
    • 14
  • a

    Andreas Fehlner

    11/27/2022, 8:02 PM
    I'm wondering what one have to configure to use flyte (or flytectl) behind a company proxy? Trying to start the flytecl sandbox gives me a timout...
    k
    • 2
    • 2
  • e

    Evan Sadler

    11/28/2022, 6:14 PM
    Hello! I am working through setting up a CI / CD process to register and activate launch plans. I am new to CI / CD, so I am curious if folks here have a good solution. I like how
    pyflyte
    can register a file or folder of launch plans, but I am not exactly sure how to active them all. I don’t want to have to active specific launch plans inside the github actions workflow. My best idea is to throw everything in a Makefile. Any advice is much appreciated!
    # only sudo code implemented
    .SILENT: deploy
    .PHONY: deploy
    deploy
    	pyflyte register [flags] ./recsys/deploy/launchplans.py;
        # I could list them here?
    	flytectl update [flags] launchplan prod_ease_train;
    y
    h
    • 3
    • 10
  • k

    Katrina P

    11/28/2022, 7:20 PM
    Just to be sure, when we run
    flytectl update task-resource-attribute
    does that setting still have to be within whatever limits we set in the Admin config map, right? We've tried to up the task-resource-attribute but when registering the workflow still get
    Requested MEMORY limit [1Ti] is greater than current limit set in the platform configuration [512Gi]. Please contact Flyte Admins to change these limits or consult the configuration
    Because if I call
    flytectl get task-resource-attribute
    it does say
    {"project":"<>","domain":"<>","defaults":{"cpu":"500m","memory":"1Gi"},"limits":{"cpu":"2","memory":"1024Gi"}}
    f
    k
    • 3
    • 3
  • t

    Tarmily Wen

    11/28/2022, 10:29 PM
    Has anyone been able to have multiple resource-tolerations when requesting gpus for a job? I have 2 node pools for single gpus and multi-gpus, and I want to be able to only use the different gpu machine types when needed: a job wants 4 gpus on the same instance. But I noticed that when requesting 1 gpu flyte defaults to the last toleration when requesting resources. And furthermore when launching a task I don't see any place to determine the tolerations associated with that task.
    k8s:
        plugins:
          # -- Configuration section for all K8s specific plugins [Configuration structure](<https://pkg.go.dev/github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config>)
          k8s:
            default-env-vars: []
            #  DEFAULT_ENV_VAR: VALUE
            default-cpus: 100m
            default-memory: 100Mi
            resource-tolerations:
              - <http://nvidia.com/gpu|nvidia.com/gpu>:
                - key: "num-gpus"
                  operator: "Equal"
                  value: 1
                  effect: "NoSchedule"
              - <http://nvidia.com/gpu|nvidia.com/gpu>:
                - key: "num-gpus"
                  operator: "Equal"
                  value: 4
                  effect: "NoSchedule"
    y
    d
    k
    • 4
    • 35
  • j

    Jay Ganbat

    11/28/2022, 11:52 PM
    Hi all, does Recover on dynamic task work properly, we have a dynamic task that returns a Dict with following
    sample_name: sample_workflow_results
    . When some sample fails and we recover, unfortunately the output mapped dict is all shuffled up and no longer matches with the sample_name key. For example we expect the certain dynamic task returns
    filename -> filename-suffix
    map like below
    {
      "counter_1": 1,
      "counter_2": 2,
      "counter_3": 3,
      "counter_4": 4,
      "counter_5": 5,
      "counter_6": 6
    }
    and i forcefully fail some tasks and some successfully completes. Then when i recover i got this instead
    {
      "counter_1": 5,
      "counter_2": 6,
      "counter_3": 3,
      "counter_4": 4,
      "counter_5": 5,
      "counter_6": 4
    }
    example dynamic task is below
    @dynamic(cache=True, cache_version=get_workflow_version())
    def compute_average_for_chunks(
        chunks: Dict[str, FlyteFile], infile: FlyteFile
    ) -> Dict[str, int]:
        suffix_dict = {}
    
        for chunk_name, chunk_path in chunks.items():
            infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
            suffix_dict[chunk_name] = infile_suffix
    
        return suffix_dict
    k
    d
    y
    • 4
    • 36
  • k

    Ketan (kumare3)

    11/29/2022, 1:28 AM
    Folks as a reminder we have the community sync tomorrow and Pachama will be sharing their Flyte journey. If you have not already added yourself to the calendar - you can do so here - https://www.addevent.com/calendar/kE355955
Powered by Linen
Title
k

Ketan (kumare3)

11/29/2022, 1:28 AM
Folks as a reminder we have the community sync tomorrow and Pachama will be sharing their Flyte journey. If you have not already added yourself to the calendar - you can do so here - https://www.addevent.com/calendar/kE355955
View count: 1