<@U06RKAMK0UC>, @task(container_image=CUSTOM_IMAG...
# ask-ai
v
@ancient-wolf-19325, @task(container_image=CUSTOM_IMAGE, retries=2) @track_datadog_metrics def load(doc_ids: List[List[str]]) -> Tuple[List[List[Dict[str, Any]]], List[List[str]]]: @workflow def xyz(doc_ids: List[List[str]], max_batch_size: int) -> None: enrich_node = create_node(enrich, doc_ids=doc_ids) store_enrichment_node = create_node(store_enrichment, timestamp=enrich_node.o0, doc_ids=doc_ids, max_batch_size=max_batch_size) load_table_node = create_node(load, doc_ids=doc_ids) chunk_table_nested_records = map_task(chunk_document, concurrency=10)(rows=load_table_node.o0[0]) write_chunk_table = map_task(write_table_chunks, concurrency=8)(doc_records=chunk_table_nested_records) doc_upload_status = map_task(chunk_embed_store, concurrency=3)(doc_ids=load_table_node.o0[1], dummy=write_chunk_table) #Archive files from output dir archive_output_folder_node = create_node(archive_folder, timestamp=enrich_node.o0, doc_upload_status=doc_upload_status) #Pipeline enrich_node >> store_enrichment_node >> archive_output_folder_node Request rejected by the API, due to Invalid input. RPC Failed, with Status: StatusCode.INVALID_ARGUMENT details: failed to compile workflow for [resource_type:WORKFLOW project:"kaas-pipeline" domain:"development" name:"workflows.pdfragpipeline.pdfEnrichmentPipeline" version:"v15.0.1"] with err failed to compile workflow with err Collected Errors: 4 Error 0: Code: MismatchingTypes, Node Id: n3, Description: Variable [o0] (type [collection_type{map value type{blob{format"PythonPickle"} metadata{fields{key:"python_class_name" value{string value"typing.Any"}}}}}]) doesn't match expected type [collection_type{collection type{map_value_type{blob{format:"PythonPickle"} metadata{fields{key:"python_class_name" value{string value"typing.Any"}}}}}}]. Error 1: Code: MismatchingTypes, Node Id: n5, Description: Variable [o0] (type [collection_type{map value type{blob{format"PythonPickle"} metadata{fields{key:"python_class_name" value{string value"typing.Any"}}}}}]) doesn't match expected type [collection_type{collection type{simple:STRING}}]. Error 2: Code: ParameterNotBound, Node Id: n3, Description: Parameter not bound [rows]. Error 3: Code: ParameterNotBound, Node Id: n5, Description: Parameter not bound [doc_ids].
j
Sure thing! I'm looking through the available data to answer your question. Hold on a second...
v
@ancient-wolf-19325 no it is wrong. n3 is chunk_table_nested_records = map_task(chunk_document, concurrency=10)(rows=load_table_node.o0[0]) n5 is doc_upload_status = map_task(chunk_embed_store, concurrency=3)(doc_ids=load_table_node.o0[1], dummy=write_chunk_table)
j
The errors in your Flyte workflow are due to type mismatches and unbound parameters: 1. MismatchingTypes Errors: - Error 0 (n3): The
rows
parameter in
chunk_table_nested_records
expects a collection of collections of maps, but
load_table_node.o0[0]
provides a collection of maps. - Error 1 (n5): The
doc_ids
parameter in
doc_upload_status
expects a collection of collections of strings, but
load_table_node.o0[1]
provides a collection of maps. 2. ParameterNotBound Errors: - Error 2 (n3): The
rows
parameter in
chunk_table_nested_records
is not correctly bound. - Error 3 (n5): The
doc_ids
parameter in
doc_upload_status
is not correctly bound. Would you like more detail on how to resolve these issues? Sources: - https://github.com/flyteorg/flyte/issues/5684 - https://github.com/flyteorg/flyte/issues/5359 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 lets say i have task like below @task(container_image=CUSTOM_IMAGE, retries=2) @track_datadog_metrics def load(doc_ids: List[List[str]]) -> Tuple[List[List[Dict[str, Any]]], List[List[str]]]: @workflow def xyz(doc_ids: List[List[str]], max_batch_size: int) -> None: load_table_node = create_node(load, doc_ids=doc_ids) Does load_table_node.o0[0] and load_table_node.o0[1] gives List[List[Dict[str, Any]] and List[List[str]] correctly ???
j
Yes,
load_table_node.o0[0]
and
load_table_node.o0[1]
should give you
List[List[Dict[str, Any]]]
and
List[List[str]]
respectively, as per the return type of the
load
task. Would you like more detail on how this works? Sources: - Flyte GitHub Issue #4841 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, Can assign custom pod name instead of <execution-name>-<node-id> ?
j
Custom pod names cannot be directly assigned in Flyte. Pod names are automatically generated using the format
<execution-name>-<node-id>
. However, you can use pod labels and annotations for identification purposes. Would you like more detail on using labels and annotations? Sources: - Flyte Community Discussion - Flyte GitHub Issue #5380 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 yes
j
To identify pods in Flyte, you can use labels and annotations instead of custom pod names. Here's how you can do it: 1. Using Labels and Annotations: You can add labels and annotations to your tasks or workflows, which will be applied to the pods. This can help in identifying and filtering pods based on these metadata. 2. Setting Labels and Annotations: You can set labels and annotations in your Flyte tasks using the
Labels
and
Annotations
classes from
flytekit
. Here's an example:
Copy code
from flytekit import task, workflow, Labels, Annotations

   @task(labels=Labels({"custom_label": "value"}), annotations=Annotations({"custom_annotation": "value"}))
   def my_task():
       pass

   @workflow
   def my_workflow():
       my_task()
3. Using Pod Templates: You can also use Kubernetes Pod Templates to apply default labels and annotations to all pods created by Flyte. This is done by defining a Pod Template in your Kubernetes cluster and referencing it in your Flyte configuration. These methods allow you to add metadata to pods, which can be used for identification and management purposes. Sources: - Flyte Community Discussion - Flyte GitHub Issue #5380 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, Flyte task sweep logic ? i mean after how many hours or days successful tasks will be cleaned ? is there any configuration for this ?
j
Flyte does not have a built-in task sweep logic for automatically cleaning up successful tasks after a certain period. However, you can manage task cleanup using Kubernetes TTL (Time To Live) configurations for resources like pods. This can be configured in the Flyte Propeller configuration. Would you like more detail on configuring Kubernetes TTL for Flyte tasks? Sources: - Flyte GitHub Issue #2837 - Flyte Community Discussion 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 yes
j
To manage the cleanup of successful tasks in Flyte, you can configure the Kubernetes TTL settings. Here's how you can do it: 1. TTL Configuration: You can set the
ttlSecondsAfterFinished
parameter in the Flyte Propeller configuration. This parameter determines how long Kubernetes should retain the pods after they have completed execution. 2. Configuring TTL: You can update the Flyte Propeller configuration map to set the desired TTL. For example, to set the TTL to 1 hour (3600 seconds), you can modify the configuration as follows:
Copy code
apiVersion: v1
   kind: ConfigMap
   metadata:
     name: flyte-propeller-config
     namespace: flyte
   data:
     config.yaml: |
       propeller:
         plugins:
           k8s:
             ttlSecondsAfterFinished: 3600
3. Applying the Configuration: After updating the configuration, apply it to your Kubernetes cluster using
kubectl apply -f <config-file>
. This configuration will ensure that the pods are cleaned up after the specified TTL period, freeing up resources in your cluster. Sources: - https://discuss.flyte.org/t/8198142/hi-is-there-a-way-to-shorten-ttlsecondsafterfinished-by-defa/ - https://discuss.flyte.org/t/14163367/how-can-we-increase-the-pod-retention-for-executed-task-in-f/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, ttlsecondsAfterFinished is for Ray plugin. i think max-ttl-hours is what the correct config https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flytepropeller/pkg/controller/config/config.go#L161
j
Yes, you are correct. The
max-ttl-hours
configuration in Flyte Propeller is used to specify the maximum number of hours a completed workflow should be retained. This setting is applicable for general Flyte workflows, not just for specific plugins like Ray. To configure
max-ttl-hours
, you can update the Flyte Propeller configuration file. Here's an example:
Copy code
apiVersion: v1
kind: ConfigMap
metadata:
  name: flyte-propeller-config
  namespace: flyte
data:
  config.yaml: |
    propeller:
      max-ttl-hours: 48
This configuration will retain completed workflows for 48 hours before they are cleaned up. Apply the configuration using
kubectl apply -f <config-file>
. Would you like more detail on this configuration? Sources: - Flyte Community Discussion 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 map_task can return List[List[str]] correct ?
j
Yes, a
map_task
in Flyte can return
List[List[str]]
. The
map_task
function is designed to handle tasks that return lists, and it can be used to process and return nested lists like
List[List[str]]
. Would you like more detail on how to implement this? Sources: - Flyte Community Discussion 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, pyflyte run command passing multiple arguments to workflow and also run command to include all the source folders
j
To pass multiple arguments to a workflow using the
pyflyte run
command, you can specify each argument with its corresponding value. Additionally, to include all source folders, you can use the
--copy-all
flag. Here's how you can do it: 1. Passing Multiple Arguments: Specify each argument with its value in the command line. For example, if your workflow takes two arguments,
arg1
and
arg2
, you can pass them as follows:
Copy code
pyflyte run my_workflow.py my_workflow --arg1 value1 --arg2 value2
2. Including All Source Folders: Use the
--copy-all
flag to include all source files and folders in the execution environment. This ensures that all necessary code is available during execution:
Copy code
pyflyte run --copy-all my_workflow.py my_workflow --arg1 value1 --arg2 value2
This command will execute the workflow with the specified arguments and include all source files. Would you like more detail on any specific part of this process? Sources: - Flyte GitHub Issue #5343 - Flyte Community Discussion 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 is there a way using authentication or another means can i block some users to not to register on particular project/domain ?
j
Currently, Flyte does not have built-in support for restricting user access to specific projects or domains based on authentication. However, you can implement custom access control using external tools or configurations: 1. Custom Authentication Gateway: You can set up an authentication/authorization gateway in front of Flyte Admin to handle access control. This gateway can check user credentials and restrict access to specific projects or domains based on your organization's policies. 2. Kubernetes RBAC: Integrate Flyte with Kubernetes Role-Based Access Control (RBAC) to manage permissions at the Kubernetes level. This can help restrict access to resources based on user roles. 3. External Authorization Tools: Use external tools like Open Policy Agent (OPA) to define and enforce access policies. OPA can be integrated with Flyte to manage fine-grained access control. Would you like more detail on implementing any of these solutions? Sources: - Flyte GitHub Issue #3909 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, How flyte checks whether user has access to list workflows on a particular project/domain ?
j
Flyte does not have built-in role-based access control (RBAC) for checking user access to list workflows on a particular project/domain in the open-source version. Access control is typically managed through external authentication and authorization mechanisms. 1. Authentication: Flyte uses OAuth2-based authentication, which can be integrated with identity providers like Keycloak. This ensures that only authenticated users can access the Flyte console or API. 2. Authorization: While Flyte does not natively support RBAC, you can implement custom authorization using external tools like Open Policy Agent (OPA) or by setting up an authentication gateway that enforces access policies. For more granular access control, you would need to implement a custom solution using these external tools. Would you like more detail on integrating Flyte with external authorization tools? Sources: - Flyte GitHub Issue #3909 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.