An interesting problem has been bugging me. I am c...
# ask-the-community
g
An interesting problem has been bugging me. I am curious if there is already a solution to this problem or if it is even worth trying to resolve. In order to take advantage of caching and mapped tasks, I find myself creating a lot of small tasks that do nothing more than extract an attribute of an object or create an object from a list of outputs from a map-task. For example, suppose I have a set of model training hyper parameters in a
pydantic
class
Hyperparameters
. However, in order to optimize the caching of my pipeline I may sometimes need to extract out individual hyperparameter values so that I do not input ones that I do not need. By doing so, I can improve the chances of a "cache hit". However, this means that in practice I have dozens of small tasks that are nothing more than as follows:
Copy code
@task
def get_batch_size(params: Hyperparameters) -> int:
    return params.batch_size
There are other types of so-called lambda-tasks that I utilize for more operations, such as collecting the outputs from a map-task or creating a clean
list
in order to use a map-task. I guess my real question is this: Is there a way to support lambda functions to create a task? Would such functionality even possible to implement in the DSL? I suppose I would like to be able to create a workflow with something like this:
Copy code
@workflow
def workflow(params: Hyperparameters):

    batch_size = task(lambda params: params.batch_size)

    do_something_with_batch_size(batch_size=batch_size)
g
We use something like this: https://github.com/flyteorg/flytekit/pull/2275 It's nice because it's overridable via with_overrides method so we can specify portions of task signature to ignore at runtime. This may not work out of the box with your pydantic class though...
g
That is interesting. It doesn’t necessarily solve the problem though. I want to ignore some attributes of a pydantic class, but not all of them. By ignoring the input itself my pipeline would result in unexpected behavior due to false positive cache hits. However, it is very good to know that this functionality exists. I have often wondered how I could do that for other reasons.
g
yeah, you’d have to serialize the pydantic model to dict, or you could potentially try to work with
Annotated
and write your own
HashMethod
https://github.com/flyteorg/flytekit/blob/8d258c48b64bce26c66d79dc7e0c1c9a000d73b9/flytekit/core/hash.py#L11
The docs have an example showing custom hash method for a dataframe. But I’m not sure if this will solve what you need either. We do runtime cache ignores and have a pretty flexible task signature (bunch of optionals) to accomplish what you’ve described
g
Hi Greg! Thank you sharing this. It very might well be the appropriate way to go. I do think that including specialized hash methods for each task is cleaner than creating dozens of small tasks to extract out attributes. At the same time, being able to quickly define tasks via lambda functions would be quite pleasant.