microscopic-furniture-57275
04/17/2024, 3:02 PMinterruptible=True
arg to the @task
decorator.
Is there a dynamic way to control this, as can be done with resources using with_overrides()
? Otherwise it seems I need to statically declare for all my tasks (and create a parallel version of them if I want the option to run the tasks on non-spot instances).
We also generate our own pod templates with various settings like mounting volumes, publishing env vars -- I suppose I could not use that interruptible flag and do our own taints/tolerations to land on a spot-instance, but using built-in Flyte functionality is preferable.
Thanks!icy-tent-98067
04/17/2024, 4:53 PMfunctools.partial
to create a partial function to replace @task
?icy-tent-98067
04/17/2024, 4:54 PMfrom functools import partial
import flytekit
task = partial(flytekit.task, interruptible=True)
@task
def my_task(): ...
@task
def my_other_task(): ...
icy-tent-98067
04/17/2024, 4:57 PMicy-tent-98067
04/17/2024, 5:00 PMcache
and cache_version
to ensure that all of my caching gets updated together (as a function of my project's version, defined within pyproject.toml
).
from functools import partial
import importlib
import flytekit
version = str(importlib.metadata.version('my_module'))
task = partial(flytekit.task, cache=True, cache_version=version)
microscopic-furniture-57275
04/17/2024, 5:04 PMinterruptible
argument at runtime, based e.g. on a user config. We do this, for example, with resource overrides, so that if I'm training a particularly large classifier, and know I'll need more memory, I can specify this in a config (or it can be computed in a dynamic task), and this value gets used in with_overrides()
instead of static values being specified in the @task
definition.
I'd also like to be able to do this with caching! My solution for that is to use the overwrite_cache
arg when executing via a FlyteRemote instance.
It seems that your solution to caching, and your suggestion to my interruptible, means you need to apply the same value to ALL tasks at "compile time" -- and I'm looking to apply, at runtime, a user-configured (or programmatically computed) value on a per-task basis.
Does this make sense, or am I misunderstanding your suggestion?icy-tent-98067
04/17/2024, 5:09 PMmicroscopic-furniture-57275
04/17/2024, 5:10 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
microscopic-furniture-57275
04/17/2024, 10:41 PMExecutionSpec
message - my understanding is this defines the protobuf message Flyte passes around. This IDL presumably corresponds to some python type or arguments passed to Flyte functions.
How do I use this?
My guess would be I can pass an interruptible
argument to FlyteRemote.execute()
(which notably also takes an overwrite_cache
flag which I also see in that protobuf defintion) -- but I don't see the interruptible
flag in the docs or source... Am I misunderstanding?microscopic-furniture-57275
04/19/2024, 8:10 PMmetadata_defaults
attribute of WorkflowTemplate
, so I tried setting this on a workflow (wf.template.metadata_defaults._interruptible=True
) after retrieving the FlyteWorkflow from a FlyteRemote, but this didn't have an effect (and it doesn't seem like it's meant to be set this way, since it's a property with a getter and no setter!). Maybe this would work for a dynamic workflow, seems more plausible, but I didn't test this yet. Not sure this is the right track?
Maybe this isn't really supported? I can statically make a workflow/tasks interruptible by declaring this in the @workflow/@dynamic/@task decorators, but was hoping to set this at runtime.thankful-minister-83577
all tasks in that workflow…is this what you want?
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
microscopic-furniture-57275
04/23/2024, 8:23 PMinterruptible
flag being added to the form UI available when launching via Flyte Console (where you can specify both interruptible
and overwrite_cache
for the execution) -- but via flytekit
, you can only specify overwrite_cache
.microscopic-furniture-57275
04/23/2024, 8:28 PMretries
also be exposed at the workflow
level, or is it exposed only for tasks by design? My goal with all of the above is to make it easy to set interruptible at the workflow level (and have it apply to all tasks), and it would be similarly handy to apply retries to all tasks -- otherwise I'll need to annotate each potential task with retries
so that the task is automatically retried if the pod is pre-empted. This based on my reading of these docs:
If you set retries=n, for instance, and the task gets preempted repeatedly, Flyte will retry on a preemptible/spot instance n-1 times and for the last attempt will retry your task on a non-spot (regular) instance. Please note that tasks will only be retried if at least one retry is allowed using the retries parameter in the task decorator.