I am trying to backfill a cron scheduled workflow:...
# ask-the-community
m
I am trying to backfill a cron scheduled workflow:
Copy code
pyflyte backfill -p cronjob-example -d development --from-date "2023-04-14 13:20:00" --to-date "now" --do-not-execute example_schedule v10
On this workflow interface:
Copy code
def cronjob_example_workflow(kickoff_time: datetime) -> str:
and launch plan:
Copy code
example_launch_plan = LaunchPlan.get_or_create(
    name="example_schedule",
    workflow=example_workflow,
    schedule=CronSchedule(
        schedule="*/5 * * * *",  # Following schedule runs every min
        kickoff_time_input_arg="kickoff_time", # macro magic
    ),
)
And I get this error:
Copy code
Missing input `kickoff_time` type `simple: DATETIME
I would expect kickoff time to be passed by backfill, given the schedule and time window. Calling @Ketan (kumare3) I think you are the master mind ๐Ÿ˜‰ Full trace:
Copy code
Traceback (most recent call last):
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/bin/pyflyte", line 8, in <module>
    sys.exit(main())
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/pyflyte.py", line 82, in invoke
    raise e
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/pyflyte.py", line 78, in invoke
    return super().invoke(ctx)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/click/decorators.py", line 26, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/backfill.py", line 158, in backfill
    entity = remote.launch_backfill(
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/remote/remote.py", line 1781, in launch_backfill
    wf, start, end = create_backfill_workflow(start_date=from_date, end_date=to_date, for_lp=lp, parallel=parallel)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/remote/backfill.py", line 82, in create_backfill_workflow
    next_node = wf.add_launch_plan(for_lp, t=next_start_date)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/core/workflow.py", line 569, in add_launch_plan
    return self.add_entity(launch_plan, **kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/core/workflow.py", line 496, in add_entity
    n = create_node(entity=entity, **kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/core/node_creation.py", line 93, in create_node
    outputs = entity(**kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/remote/remote_callable.py", line 54, in __call__
    return self.compile(ctx, *args, **kwargs)
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/remote/entities.py", line 806, in compile
    return create_and_link_node_from_remote(
  File "/home/andersen/.local/share/virtualenvs/flyte-cronjob-example-ig5KVMXO/lib/python3.8/site-packages/flytekit/core/promise.py", line 860, in create_and_link_node_from_remote
    raise _user_exceptions.FlyteAssertion("Missing input `{}` type `{}`".format(k, var.type))
flytekit.exceptions.user.FlyteAssertion: Missing input `kickoff_time` type `simple: DATETIME
k
Ohh no, can I take a look later today
You are right it should be automatic
Is this at runtime
m
It is the output of the command + arguments I used at the top
I run the flytekti command locally configured towards a remote flyte-core backend
And I am not in a hurry, so feel free to review whenever you feel like it.. I am just happy that you build a nice feature ๐Ÿ˜„
k
I am not happy that it is not working out of the box. I am sorry as it was. I hope no regression. But I wonโ€™t be able to try till later today. Cc @Samhita Alla can try maybe?
s
I'm able to reproduce. Looks like a bug to me.
k
ohh
ok will take a look
[flyte-bug]
k
@Mathias Andersen this is a bug - https://github.com/flyteorg/flyte/issues/3599 and Here is a fix for this https://github.com/flyteorg/flytekit/pull/1593 cc @Samhita Alla can you try and approve my PR please - https://github.com/flyteorg/flytekit/pull/1593
s
@Ketan (kumare3), I just tested your change and it's working. It isn't working, however, when the cron schedule is set to say, run every minute. It's erroring out stating invalid input. Is it due to lots of runs getting scheduled (nearly 4000 in my case)?
k
ya, this is expected as the number of nodes is restricted. We can raise this limit, in sandbox we limit to 100 nodes only
129 Views