<@U06RKAMK0UC>, Flyetpropeller failed to start. ...
# ask-ai
v
@ancient-wolf-19325, Flyetpropeller failed to start. {"json":{},"level":"fatal","msg":"Failed to start controller. Error: failed to start FlytePropeller: Failed to create Metadata storage: initContainer is required even with `enable-multicontainer`","ts":"2024-12-17T045851Z"}
j
It seems likely that the error you're encountering is related to the configuration of FlytePropeller, specifically with the
enable-multicontainer
setting. According to this discussion, enabling
multicontainer
requires specific configurations in the FlytePropeller setup. The error message suggests that an
initContainer
is required even when
enable-multicontainer
is set to true. Would you like more details on how to configure FlytePropeller or troubleshoot this issue? Sources: - Flyte Discussion 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 , i have a map task def xyz(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: which returns List[Dict[str, Any]] but i want to return only batch of 200 entries, so that my next map task def abc(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: Should recv first 200 for 1 pod.. and 2 pd 200...etc How should i do it ? Workflow: rec = map_task(xyz, concurrency=1)(rows=load_table_node.o0) ent = map_task(abc, concurrency=10)(rows=rec)
j
It seems likely that you want to batch the output of your
xyz
map task into chunks of 200 entries before passing them to the
abc
map task. You can achieve this by modifying the
xyz
task to return a list of lists, where each sublist contains up to 200 entries. Then, the
abc
task can process each sublist independently. Here's a conceptual approach: 1. Modify
xyz
Task
: Change the return type to
List[List[Dict[str, Any]]]
and implement logic to split the output into batches of 200. 2. Adjust Workflow: Ensure that the
abc
task is called with each batch. Here's a code snippet to illustrate this:
Copy code
from typing import List, Dict, Any
from flytekit import task, workflow, map_task

@task
def xyz(rows: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
    # Split rows into batches of 200
    batch_size = 200
    return [rows[i:i + batch_size] for i in range(0, len(rows), batch_size)]

@task
def abc(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    # Process each batch
    return rows  # Example processing

@workflow
def my_workflow():
    rec = map_task(xyz, concurrency=1)(rows=load_table_node.o0)
    ent = map_task(abc, concurrency=10)(rows=rec)
Would you like more details on this approach or any other aspect of Flyte? Sources: - https://github.com/flyteorg/flyte/blob/master/docs/user_guide/advanced_composition/nested_parallelization.md 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
pyflyte run where workflow take doc_ids as input lis[list[str]]
@ancient-wolf-19325, pyflyte run where workflow take doc_ids as input lis[list[str]]
j
It seems likely that you want to execute a Flyte workflow using
pyflyte run
where the workflow takes a parameter
doc_ids
of type
List[List[str]]
. To achieve this, you need to ensure that the input is correctly formatted as a JSON or YAML file, which
pyflyte run
can parse. Here's a step-by-step approach: 1. Define the Workflow: Ensure your workflow is defined to accept
doc_ids
as
List[List[str]]
. 2. Prepare Input File: Create a JSON or YAML file with the
doc_ids
structured as a list of lists. 3. Run the Workflow: Use
pyflyte run
with the
--flyte-inputs
option to specify the input file. Example JSON input file (
input.json
):
Copy code
{
  "doc_ids": [
    [
      "id1",
      "id2"
    ],
    [
      "id3",
      "id4"
    ]
  ]
}
Command to run the workflow:
Copy code
pyflyte run <file_name>.py <workflow_name> --flyte-inputs input.json
Would you like more details on this process or any other aspect of Flyte? Sources: - https://github.com/flyteorg/flyte/issues/5473 - https://github.com/flyteorg/flyte/issues/4581 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, No such option: --flyte-inputs
j
It seems likely that the
--flyte-inputs
option is not available in your version of
pyflyte run
. This option might not be supported in the version you are using, or it might be a feature that is not yet implemented. To work around this, you can pass the input directly as a command-line argument in JSON format. Here's how you can do it: 1. Prepare JSON Input: Format your
doc_ids
as a JSON string. 2. Run the Workflow: Use
pyflyte run
and pass the JSON string directly as an argument. Example command:
Copy code
pyflyte run <file_name>.py <workflow_name> --doc_ids '[["id1", "id2"], ["id3", "id4"]]'
Make sure to replace
<file_name>.py
and
<workflow_name>
with your actual script and workflow names. Would you like more details on this approach or any other aspect of Flyte? Sources: - https://github.com/flyteorg/flyte/issues/5365 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, create a flyte workflow with two tasks. 1. task is xyz which doesnt take any inout but returns List[List[Dict[str, Any]]] 2. 2nd task is abc which map_task which takes input from task xyz output and return None
j
It seems likely that you want to create a Flyte workflow with two tasks:
xyz
, which returns a nested list of dictionaries, and
abc
, which processes each sublist using
map_task
. Here's a basic example of how you can set this up: 1. Define Task
xyz
: This task returns a
List[List[Dict[str, Any]]]
. 2. Define Task
abc
: This task processes each sublist and returns
None
. 3. Create Workflow: Use
map_task
to apply
abc
to each sublist produced by
xyz
. Here's a code snippet to illustrate this:
Copy code
from typing import List, Dict, Any
from flytekit import task, workflow, map_task

@task
def xyz() -> List[List[Dict[str, Any]]]:
    # Example data
    return [[{"key1": "value1"}, {"key2": "value2"}], [{"key3": "value3"}]]

@task
def abc(sublist: List[Dict[str, Any]]) -> None:
    # Process each sublist
    for item in sublist:
        print(item)

@workflow
def my_workflow():
    nested_list = xyz()
    map_task(abc)(sublist=nested_list)

# To run the workflow
if __name__ == "__main__":
    my_workflow()
Would you like more details on this setup or any other aspect of Flyte? Sources: - https://discuss.flyte.org/t/19363/hi-all-can-you-please-advise-me-an-example-with-hello-world-/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.