victorious-jelly-28729
12/17/2024, 5:00 AMjolly-florist-66728
12/17/2024, 5:00 AMenable-multicontainer
setting. According to this discussion, enabling multicontainer
requires specific configurations in the FlytePropeller setup. The error message suggests that an initContainer
is required even when enable-multicontainer
is set to true.
Would you like more details on how to configure FlytePropeller or troubleshoot this issue?
Sources:
- Flyte Discussion 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
12/17/2024, 10:17 AMjolly-florist-66728
12/17/2024, 10:17 AMxyz
map task into chunks of 200 entries before passing them to the abc
map task. You can achieve this by modifying the xyz
task to return a list of lists, where each sublist contains up to 200 entries. Then, the abc
task can process each sublist independently.
Here's a conceptual approach:
1. Modify xyz
Task: Change the return type to List[List[Dict[str, Any]]]
and implement logic to split the output into batches of 200.
2. Adjust Workflow: Ensure that the abc
task is called with each batch.
Here's a code snippet to illustrate this:
from typing import List, Dict, Any
from flytekit import task, workflow, map_task
@task
def xyz(rows: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
# Split rows into batches of 200
batch_size = 200
return [rows[i:i + batch_size] for i in range(0, len(rows), batch_size)]
@task
def abc(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Process each batch
return rows # Example processing
@workflow
def my_workflow():
rec = map_task(xyz, concurrency=1)(rows=load_table_node.o0)
ent = map_task(abc, concurrency=10)(rows=rec)
Would you like more details on this approach or any other aspect of Flyte?
Sources:
- https://github.com/flyteorg/flyte/blob/master/docs/user_guide/advanced_composition/nested_parallelization.md 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
12/17/2024, 10:30 AMvictorious-jelly-28729
12/17/2024, 10:30 AMjolly-florist-66728
12/17/2024, 10:30 AMpyflyte run
where the workflow takes a parameter doc_ids
of type List[List[str]]
. To achieve this, you need to ensure that the input is correctly formatted as a JSON or YAML file, which pyflyte run
can parse.
Here's a step-by-step approach:
1. Define the Workflow: Ensure your workflow is defined to accept doc_ids
as List[List[str]]
.
2. Prepare Input File: Create a JSON or YAML file with the doc_ids
structured as a list of lists.
3. Run the Workflow: Use pyflyte run
with the --flyte-inputs
option to specify the input file.
Example JSON input file (input.json
):
{
"doc_ids": [
[
"id1",
"id2"
],
[
"id3",
"id4"
]
]
}
Command to run the workflow:
pyflyte run <file_name>.py <workflow_name> --flyte-inputs input.json
Would you like more details on this process or any other aspect of Flyte?
Sources:
- https://github.com/flyteorg/flyte/issues/5473
- https://github.com/flyteorg/flyte/issues/4581 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
12/17/2024, 10:33 AMjolly-florist-66728
12/17/2024, 10:33 AM--flyte-inputs
option is not available in your version of pyflyte run
. This option might not be supported in the version you are using, or it might be a feature that is not yet implemented.
To work around this, you can pass the input directly as a command-line argument in JSON format. Here's how you can do it:
1. Prepare JSON Input: Format your doc_ids
as a JSON string.
2. Run the Workflow: Use pyflyte run
and pass the JSON string directly as an argument.
Example command:
pyflyte run <file_name>.py <workflow_name> --doc_ids '[["id1", "id2"], ["id3", "id4"]]'
Make sure to replace <file_name>.py
and <workflow_name>
with your actual script and workflow names.
Would you like more details on this approach or any other aspect of Flyte?
Sources:
- https://github.com/flyteorg/flyte/issues/5365 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
12/17/2024, 11:56 AMjolly-florist-66728
12/17/2024, 11:56 AMxyz
, which returns a nested list of dictionaries, and abc
, which processes each sublist using map_task
. Here's a basic example of how you can set this up:
1. Define Task xyz
: This task returns a List[List[Dict[str, Any]]]
.
2. Define Task abc
: This task processes each sublist and returns None
.
3. Create Workflow: Use map_task
to apply abc
to each sublist produced by xyz
.
Here's a code snippet to illustrate this:
from typing import List, Dict, Any
from flytekit import task, workflow, map_task
@task
def xyz() -> List[List[Dict[str, Any]]]:
# Example data
return [[{"key1": "value1"}, {"key2": "value2"}], [{"key3": "value3"}]]
@task
def abc(sublist: List[Dict[str, Any]]) -> None:
# Process each sublist
for item in sublist:
print(item)
@workflow
def my_workflow():
nested_list = xyz()
map_task(abc)(sublist=nested_list)
# To run the workflow
if __name__ == "__main__":
my_workflow()
Would you like more details on this setup or any other aspect of Flyte?
Sources:
- https://discuss.flyte.org/t/19363/hi-all-can-you-please-advise-me-an-example-with-hello-world-/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.