big-notebook-82371
08/16/2024, 3:19 PM>
Here is the relevant error and context
AssertionError: Logical (and/or/is/not) operations are not supported. Expressions Comparison (<,=,,>=,==,!=) or Conjunction (&/|) are supported.Received an evaluated expression with val True in
process_query_split.if_
The above exception was the direct cause of the following exception:
AssertionError: Error encountered while executing 'create_crop_dataset':
Logical (and/or/is/not) operations are not supported. Expressions Comparison (<,=,,>=,==,!=) or Conjunction (&/|) are supported.Received an evaluated expression with val True in
process_query_split.if_
# failing conditional inside a workflow
query_process_tasks = (
conditional("process_query_split")
.if_(pipeline_settings.num_query_splits > 0)
.then(
process_split_in_parallel(
dataset_bucket_dir=dataset_bucket_dir,
split_name="query",
num_splits=pipeline_settings.num_query_splits,
pipeline_status_id=pipeline_status_id,
)
)
.else_(return_empty_list())
)
# using pydantic
class PipelineSettings(CommonSettings):
num_train_splits: int = 70
num_test_splits: int = 15
num_query_splits: int = 5
num_partitions_for_vec_preprocess: int = 50
limit: int = 0ancient-wolf-19325
08/16/2024, 3:19 PMComparisonExpression
or ConjunctionExpression
types.
To resolve this, you need to ensure that the expression passed to the if_
method is a ComparisonExpression
. Here's how you can modify your code:
1. Ensure the expression is a ComparisonExpression
: Use the ==
operator to create a ComparisonExpression
.
2. Convert the boolean value to a Promise
: If the value is a direct boolean, you need to convert it to a Promise
.
Here's an example of how you can modify your workflow:
from flytekit import task, workflow, conditional
@task
def return_empty_list() -> list:
return []
@task
def process_split_in_parallel(dataset_bucket_dir: str, split_name: str, num_splits: int, pipeline_status_id: str) -> list:
# Your processing logic here
return []
@workflow
def create_crop_dataset(pipeline_settings: PipelineSettings, dataset_bucket_dir: str, pipeline_status_id: str) -> list:
query_process_tasks = (
conditional("process_query_split")
.if_(pipeline_settings.num_query_splits > 0)
.then(
process_split_in_parallel(
dataset_bucket_dir=dataset_bucket_dir,
split_name="query",
num_splits=pipeline_settings.num_query_splits,
pipeline_status_id=pipeline_status_id,
)
)
.else_(return_empty_list())
)
return query_process_tasks
# Example usage
class PipelineSettings:
num_query_splits: int = 5
pipeline_settings = PipelineSettings()
create_crop_dataset(pipeline_settings=pipeline_settings, dataset_bucket_dir="some_dir", pipeline_status_id="some_id")
In this example, pipeline_settings.num_query_splits > 0
is a ComparisonExpression
that Flyte can handle.
For more details, refer to the Flyte documentation on conditionals.
Would you like more details on any specific part of this solution?
Sources:
- Flyte documentation on conditionals
- Flytekit core condition.py 👍 1 button 👎 0 button 💡 Hint: Mention @ancient-wolf-19325 in the thread for followups.