Hi - I'm exploring the use of spot-instances wit...
# ask-the-community
t
Hi - I'm exploring the use of spot-instances with the
interruptible=True
arg to the
@task
decorator. Is there a dynamic way to control this, as can be done with resources using
with_overrides()
? Otherwise it seems I need to statically declare for all my tasks (and create a parallel version of them if I want the option to run the tasks on non-spot instances). We also generate our own pod templates with various settings like mounting volumes, publishing env vars -- I suppose I could not use that interruptible flag and do our own taints/tolerations to land on a spot-instance, but using built-in Flyte functionality is preferable. Thanks!
g
Have you considered using
functools.partial
to create a partial function to replace
@task
?
IE
Copy code
from functools import partial

import flytekit

task = partial(flytekit.task, interruptible=True)

@task
def my_task(): ...


@task
def my_other_task(): ...
This would allow for a single definition. Integrate it with a "debug mode". Applying kwargs to the new partial function also works as expected.
I do this with
cache
and
cache_version
to ensure that all of my caching gets updated together (as a function of my project's version, defined within
pyproject.toml
).
Copy code
from functools import partial
import importlib

import flytekit

version = str(importlib.metadata.version('my_module'))

task = partial(flytekit.task, cache=True, cache_version=version)
t
Hey @Grantham Taylor, thanks for your suggestion. I have not used partials before, so thanks for this tip. It looks to me like it's a way to augment existing function calls with new args. I'm looking for a way to programmatically (dynamically) alter this
interruptible
argument at runtime, based e.g. on a user config. We do this, for example, with resource overrides, so that if I'm training a particularly large classifier, and know I'll need more memory, I can specify this in a config (or it can be computed in a dynamic task), and this value gets used in
with_overrides()
instead of static values being specified in the
@task
definition. I'd also like to be able to do this with caching! My solution for that is to use the
overwrite_cache
arg when executing via a FlyteRemote instance. It seems that your solution to caching, and your suggestion to my interruptible, means you need to apply the same value to ALL tasks at "compile time" -- and I'm looking to apply, at runtime, a user-configured (or programmatically computed) value on a per-task basis. Does this make sense, or am I misunderstanding your suggestion?
g
Oh, I see. You are correct, I misunderstood your intent. I apologize.
t
No prob, I appreciate your suggestion, and I learned something! 🙂
yeah i think this will work for your needs.
but also yes thank you grantham… partials are widely used, esp for map tasks.
t
@Yee - thanks - source comments indicate this applies at the workflow level - presumably this means all tasks in that workflow are considered interruptible if this value is set. The code you reference is the
ExecutionSpec
message - my understanding is this defines the protobuf message Flyte passes around. This IDL presumably corresponds to some python type or arguments passed to Flyte functions. How do I use this? My guess would be I can pass an
interruptible
argument to
FlyteRemote.execute()
(which notably also takes an
overwrite_cache
flag which I also see in that protobuf defintion) -- but I don't see the
interruptible
flag in the docs or source... Am I misunderstanding?
@Yee - see message above. I hunted around in the flytekit source and found that interruptible for workflows is stored in the
metadata_defaults
attribute of
WorkflowTemplate
, so I tried setting this on a workflow (
wf.template.metadata_defaults._interruptible=True
) after retrieving the FlyteWorkflow from a FlyteRemote, but this didn't have an effect (and it doesn't seem like it's meant to be set this way, since it's a property with a getter and no setter!). Maybe this would work for a dynamic workflow, seems more plausible, but I didn't test this yet. Not sure this is the right track? Maybe this isn't really supported? I can statically make a workflow/tasks interruptible by declaring this in the @workflow/@dynamic/@task decorators, but was hoping to set this at runtime.
y
all tasks in that workflow…
is this what you want?
haven’t checked yet that the interruptible flag is piped through all the right places… but it does exist at execution time.
meaning this is configurable per execution
actually could you submit an issue for this? it turns out the per-execution interruptible isn’t being piped through on the flytekit side at all.
we’ll need to work on that.
t
@Yee - I created this issue per your request - I wasn't sure whether to call this a bug or a Core Feature request, so I opted for the latter. I note that @Dan Rammer (hamersaw) did related work in 2022 based on a similar feature request -- this resulted in the
interruptible
flag being added to the form UI available when launching via Flyte Console (where you can specify both
interruptible
and
overwrite_cache
for the execution) -- but via
flytekit
, you can only specify
overwrite_cache
.
Something that I wondered about while researching this -- should
retries
also be exposed at the
workflow
level, or is it exposed only for tasks by design? My goal with all of the above is to make it easy to set interruptible at the workflow level (and have it apply to all tasks), and it would be similarly handy to apply retries to all tasks -- otherwise I'll need to annotate each potential task with
retries
so that the task is automatically retried if the pod is pre-empted. This based on my reading of these docs:
Copy code
If you set retries=n, for instance, and the task gets preempted repeatedly, Flyte will retry on a preemptible/spot instance n-1 times and for the last attempt will retry your task on a non-spot (regular) instance. Please note that tasks will only be retried if at least one retry is allowed using the retries parameter in the task decorator.