Hi all -- we've been using the Flyte <caching capa...
# ask-the-community
j
Hi all -- we've been using the Flyte caching capabilities in one of our workflows to make sure that we don't unnecessarily run a task on the same input repeatedly. We caught a bug in the task that caused the cached output for a single input (out of hundreds) to be erroneous. Is there any way to "uncache" that single input, or is our only option to clear out the cache for that task entirely (causing it to run on all of our inputs again)? Thanks!
j
Does your task a dynamic task?
if its just a regular task i dont think thats possible
if your task is a dynamic task that spawn bunch of regular tasks and one the regular task that takes your erroneous input then you can launch that individual task with the corrected input right
j
I didn't realize I could override the cached outputs with a manual rerun of the task itself, but that works perfectly. Thank you!
@Jay Ganbat sorry I'm just getting back to this after I was out on Friday -- I don't think this is functioning properly, I am asking it to overwrite the cached outputs but it's still reading the output from the existing cache?
j
hmmm that is very odd, could be a bug πŸ€”
@Yee do you have any insight here, does cache overriding not work in map task?
y
i think that is a bug yes - could you try the new map_task?
we’re still working on it
but
from flytekit.experimental import map_task
and that will pick up a different implementation on the backend
it uses a new node type on the backend (should be transparent to users)
j
i see, thats good to know, in that case looks like your map_task spawns only 1 task you can retry the task itself with the same input
j
I'm trying to re-register w/ that suggested change (
from flytekit.experimental import map_task
instead of
from flytekit import map_task
), but I'm getting the following error when calling
pyflyte register ...
Copy code
RPC Failed, with Status: StatusCode.INVALID_ARGUMENT
	details: task with different structure already exists with id resource_type:TASK project:"flyte-[redacted]" domain:"development" name:"common.task.download_file" version:"jXPiVekbE8I0fwQpxmfwXQ=="
j
you cant register already regisrtered versions, you should change the version, how are the version string is getting created?
j
I'm not explicitly setting that as far as I can tell, I thought Flyte was autogenerating that? For context, if I leave it as
from flytekit import map_task
I can run the
pyflyte register ...
just fine (and registers it w/ a different version string), so it is explicitly that import change that causes this failure for me
I was able to get around that error by making some other irrelevant changes to the code (which assumedly had Flyte generate a new version), but now my workflow is failing on registration (again, when I use the non-experimental
map_task
it registers just fine:
Copy code
> pyflyte register -p flyte-[redacted] -d development common
...
Successfully serialized 5 flyte objects
[βœ”] Registration common.task.download_file type TASK successful with version L78nnSc8FhZDvHgn0jiQEQ==
[βœ”] Registration common.task.list_files type TASK successful with version L78nnSc8FhZDvHgn0jiQEQ==
[βœ”] Registration common.task.map_download_file_f543a5f02bd6f3ed1ad99b14a311e9d9-arraynode type TASK successful with version L78nnSc8FhZDvHgn0jiQEQ==
[ ] Registration common.workflow.ingest type WORKFLOW NoneFailed with Exception Code: SYSTEM:Unknown
RPC Failed, with Status: StatusCode.INTERNAL
	details: failed to compile workflow for [resource_type:WORKFLOW project:"flyte-[redacted]" domain:"development" name:"common.workflow.ingest" version:"L78nnSc8FhZDvHgn0jiQEQ==" ] with err failed to compile workflow with err Collected Errors: 3
	Error 0: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
	Error 1: Code: ValueRequired, Node Id: n1, Description: Value required [Target].
	Error 2: Code: VariableNameNotFound, Node Id: n1, Description: Variable [o0] not found on node [n1].

	Debug string UNKNOWN:Error received from peer ipv4:3.212.214.154:443 {grpc_message:"failed to compile workflow for [resource_type:WORKFLOW project:\"flyte-[redacted]\" domain:\"development\" name:\"common.workflow.ingest\" version:\"L78nnSc8FhZDvHgn0jiQEQ==\" ] with err failed to compile workflow with err Collected Errors: 3\n\tError 0: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].\n\tError 1: Code: ValueRequired, Node Id: n1, Description: Value required [Target].\n\tError 2: Code: VariableNameNotFound, Node Id: n1, Description: Variable [o0] not found on node [n1].\n", grpc_status:13, created_time:"2023-09-18T10:56:21.191069-07:00"}
The
ingest()
workflow that it is failing on calls
map_task(download_file)
j
looks like mismatched types for outputs πŸ€”
just to unblock the workflow i still think it might be just easier for you to run the underlying task πŸ˜… and figure out the experimental work in different run
j
Yes, that did unblock me (just doing a
pyflyte run --remote --overwrite-cache
on the input that I wanted to return different results for), thank you