i created a PythonInstanceTask with the name `onem...
# flytekit
d
i created a PythonInstanceTask with the name
onemodel.get_assumptions.base_prices
but it registered as `onemodel.models.annotations.wf.onemodel.models.annotations.onemodel.get_assumptions.base_prices`*.* Any ideas? i'm guessing something around tracked instance is injecting this?
y
how did you assign the name?
oh name constructor
for context, this class is meant to maintain the correct schema'd type signature but still work as a general utility for fetching any schema type
(side note: lmk if this is insane to do)
y
this gist is self-contained right?
i can run it? it looks self contained
d
I think ao
y
and where are you seeing the long name?
d
console
for the instance task
y
so if you comment out the workflow, and you just write a unit test, I think you get what you want.
there was a desire to support tasks that were declared within a workflow a while back. don’t remember the justification, maybe ketan does. the problem with that comes at execution time… how do you load the task?
d
This won’t execute?
y
normally, a task is declared at the .py file layer.
d
because the task loader can’t find it yea
y
if it’s embedded within a task, you have to know to look at the workflow class first, to get the task
so our solution was to make the workflow object itself, a task resolver.
how does a task know if it needs to use a workflow to resolve? if it’s created during the compilation step of a workflow
d
interesting. So it does work?
but has a weird resolution path?
y
so what’s happening is that the name is getting overwritten by https://github.com/flyteorg/flytekit/blob/master/flytekit/core/python_auto_container.py#L95
the normal case works yeah, that is tested i’m pretty sure, though we don’t advertise it much.
i’m not sure how to make this case work
there’s an extra layer of indirection here, the assumptions class
would have to trace through the workflow task loading logic, etc.
d
Do you have an example of what does work?
maybe I can refactor this
actually. i just need to define a task resolver that can reconstitute this task right?
looks like it's being overriden in the init func
@Ketan (kumare3) I was told you might be the expert here. I was able to get the above to work by created a custom task resolver and manually overriding the global context like so
Copy code
ctx = FlyteContextManager.current_context()
state = ctx.compilation_state.with_params("", resolver=_resolver)
with FlyteContextManager.with_context(ctx.with_compilation_state(state)):
    task = cls._Task(assumption_name)
But i'm unclear what the intended behavior is ie why does the task contstructor ignore the passed resolver in favor of something in the global context
k
I guess it’s because the compilation context can provide a better resolveR
I will Have to understand the usecase. May very well be defensive
d
probably easiest to chat quickly if you have a min
the above thread is a lot to parse 😄
y
ketan’s out today, but this is the logic we were talking about. the task defined in a workflow.
the passed in task resolver is always respected unless you’re in a workflow.
which you are right?
d
i suppose yes, so the workflow compilation context overrides the task resolver?
y
this was the confusing part about the code yesterday and why it took so long to hunt down.
d
i might still expect that if a user specifically passes a resolver they know what they're doing
y
yeah and i think we’re open to changing that logic… but i still think it’s confusing
Copy code
def test_dwild():
    x = BasePricesSchema.name
    print(x)
    Assumptions.get(version_id="abc", assumption=x)


@task
def t2(pdf: BasePricesSchema) -> int:
    return len(pdf)


@workflow
def wf(version_id: str) -> int:
    bp = Assumptions.get(version_id=version_id, assumption=BasePricesSchema.name)
    return t2(pdf=bp)
this test produces the wrong name.
however this test produces the right name
Copy code
def test_dwild():
    x = BasePricesSchema.name
    print(x)
    Assumptions.get(version_id="abc", assumption=x)


# @task
# def t2(pdf: BasePricesSchema) -> int:
#     return len(pdf)
# 
# 
# @workflow
# def wf(version_id: str) -> int:
#     bp = Assumptions.get(version_id=version_id, assumption=BasePricesSchema.name)
#     return t2(pdf=bp)
d
flattered to have my own task named after me 🤩
y
hahah
d
i don't care too much about the name, but need the custom resolve to resolve at runtime
y
you can always just set it after the instantiation too
not that that looks any cleaner
d
i'm ok with this for now, but am wary of doing stuff that might break later 😄
159 Views