but getting an error ```containers with unready st...
# ask-the-community
r
but getting an error
Copy code
containers with unready status: [f7592492fe4da4f14b2f-n0-0]|Failed to apply default image tag "<http://cr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image|cr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image>": couldn't parse image reference "<http://cr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image|cr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image>": invalid reference format
n
I think it should be
<http://ghcr.flyte.org/|ghcr.flyte.org/>…
not
<http://cr.flyte.org|cr.flyte.org>
r
kk, trying now
Copy code
containers with unready status: [fba6b54cf32154cd6a01-n0-0]|Failed to apply default image tag "<http://ghcr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image|ghcr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image>": couldn't parse image reference "<http://ghcr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image|ghcr.flyte.org/flyteorg/flytekit:py3.10-1.3.2:image>": invalid reference format
a
shouldn't it be:
Copy code
fqn="<http://cr.flyte.org/flyteorg/flytekit|cr.flyte.org/flyteorg/flytekit>",
tag="py3.10-1.3.2",
r
let me try
that tag worked! But now I’m getting
Copy code
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[f5743f26504394989994-n0-0] terminated with exit code (1). Reason [Error]. Message: 
ages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 476, in execute_task_cmd
    _execute_task(
  File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 160, in system_entry_point
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 348, in _execute_task
    _task_def = resolver_obj.load_task(loader_args=resolver_args)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_auto_container.py", line 280, in load_task
    task_module = importlib.import_module(task_module)
  File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'app'
.
e
@Rezwan Abir, this means that the image doesn't contain your code. Were you fast registering?
r
ya basically I don’t want the image to contain my code, as I’m trying remotely to inject using api calls to install my wf, tasks, and launch_plan
so like this…
Copy code
self.remote.register_task(
                self.sum_test_2,
                version=VERSION,
                serialization_settings=serialization_settings,
            )
Copy code
self.remote.register_workflow(
                    process_data_wf_3,
                    version=VERSION,
                    serialization_settings=inner_serialization_settings,
                )
and
Copy code
launch_plan = LaunchPlan.get_or_create(
            process_data_wf_3,
            name="wf_launchplan_3",
            schedule=FixedRate(duration=timedelta(minutes=1)),
        )
my code can be dynamic based on stuff on my database, I have to dynamically generate launch plans, let me know if this is doable
I mean I see all my wf, tasks and launch plan already in the demo flyte app (so the remote thing worked), still can’t figure out why it won’t execute though
a
one thing i notice is that you need to set fast_serialization_settings in your
SerializationSettings
r
Great point, adding and testing now
so this is my new serializationsettings (added fast with dest dir )
Copy code
serialization_settings = SerializationSettings(
                project="flytesnacks",
                domain=self.settings.flyte_domain,
                env=None,
                image_config=ImageConfig(
                    default_image=Image(
                        name="custom_container_task",
                        fqn="<http://cr.flyte.org/flyteorg/flytekit|cr.flyte.org/flyteorg/flytekit>",
                        tag="py3.10-1.3.2",
                    )
                ),
                fast_serialization_settings=FastSerializationSettings(
                    enabled=True,
                    destination_dir='/Users/rezwanabir/Desktop'
                )
            )
But oddly getting this error
Copy code
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[ff6805959e365479e99a-n0-0] terminated with exit code (1). Reason [Error]. Message: 
Traceback (most recent call last):
  File "/usr/local/bin/pyflyte-fast-execute", line 8, in <module>
    sys.exit(fast_execute_task_cmd())
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 502, in fast_execute_task_cmd
    _download_distribution(additional_distribution, dest_dir)
  File "/usr/local/lib/python3.10/site-packages/flytekit/tools/fast_registration.py", line 107, in download_distribution
    raise ValueError("Destination path is required to download distribution and it should be a directory")
ValueError: Destination path is required to download distribution and it should be a directory
.
and if I use ‘.’ for directory
Copy code
destination_dir='.'
then I get a different error
Copy code
need fast serialization or else will fail 

[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[f7ee0d9e0f0204756b5d-n0-0] terminated with exit code (1). Reason [Error]. Message: 
 oe
  File "/usr/local/lib/python3.10/site-packages/flytekitplugins/fsspec/persist.py", line 99, in get
    return fs.get(from_path, to_path, recursive=recursive)
  File "/usr/local/lib/python3.10/site-packages/fsspec/spec.py", line 891, in get
    self.get_file(rpath, lpath, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/fsspec/implementations/local.py", line 135, in get_file
    return self.cp_file(path1, path2, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/fsspec/implementations/local.py", line 128, in cp_file
    raise FileNotFoundError(path1)
FileNotFoundError: /root/{{ .remote_package_path }}

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/pyflyte-fast-execute", line 8, in <module>
    sys.exit(fast_execute_task_cmd())
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/bin/entrypoint.py", line 502, in fast_execute_task_cmd
    _download_distribution(additional_distribution, dest_dir)
  File "/usr/local/lib/python3.10/site-packages/flytekit/tools/fast_registration.py", line 111, in download_distribution
    FlyteContextManager.current_context().file_access.get_data(additional_distribution, os.path.join(destination, ""))
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/data_persistence.py", line 456, in get_data
    raise FlyteAssertion(
flytekit.exceptions.user.FlyteAssertion: Failed to get data from {{ .remote_package_path }} to ./ (recursive=False).

Original exception: /root/{{ .remote_package_path }}
.
a
fast_serialization also expects a native_url, you need to first upload the fast archive, get the native URL, put that in the fast_serailzation_settins and the register
You can take a look pyflyte register code for reference
r
does that mean I have to register my local files (wf, tasks and launch_plans and upload it? My local files wf, tasks, are part of a much bigger fast api repo, I don’t think registering them is what you’re talking about right? I was gonna using remote push those tasks, and wf, to the local demo flyte. But sounds like flyte does not work like that, or am I missing something?
n
if you provide a
fast_serialization
config Flyte will automatically zip up your local files, dump it in the configured blob store, then overlay your code on top of the specified docker image. Not sure if that answers your question.
r
ya, so If I’m not mistaken, sounds like launch plan can’t be created dynamically (with dynamic time scheduling), correct?
my task is to create dynamic launch plan based on different cron expression that run at different times, sounds like this is not something that can be done on flyte?
n
can you expand on what you mean by “dynamic launch plan based on different cron expression that run at different times”?
r
In our backend db we are storing different cron configuration for user so let’s say user 1 has cron ‘5 4 * * *’ and user 2 has cron ‘1 * * * *’ and users are able to update this cron expression, on demand. Our goals is to run tasks based on these expressions, on flyte, so when a user changes a cron expression, we should be able to update the launch plan cron accordingly on flyte. I know that flyte does support cron scheduling on launch plan (https://docs.flyte.org/projects/cookbook/en/latest/getting_started/run_schedule.html)
n
so you want to run tasks to re-configure the launchplans based on a user-provided expression right? Is there a reason you want to use Flyte Tasks to do this instead of application you application using
FlyteRemote
to do it? @Eduardo Apolinario (eapolinario) @Yee is there a way to update the launchplan schedule in-place? Or do folks have to deactivate the old version, register a new version, and active the new version?
r
yes to your question 1. and I have been trying to use flyteremote
flyteremote still required all the tasks to be registered ahead of time seems like
n
you’ll need to update the launchplan version with the new cron expression to update the launchplans dynamically
r
but that’s only one launch plan, in my case, I’ll technically need 100s of launch plans 1 per user
how do I accomplish that?
n
if the launchplans are named based on user id, you can probably iterate through the launch plans in your application code and create new versions for each user based on the cron schedule so something like:
Copy code
from workflows import wf

remote = FlyteRemote(...)

users, cron_schedules = get_cron_schedules_per_user()

for user, cron_schedule in zip(users, cron_schedules):
    # by default, this gets the latest version, but you can pass in
    # a `version="..."` argument
    old_flyte_lp = remote.fetch_launch_plan(name=f"launchplan_{user}")

    # deactivate old launchplan
    remote.client.update_launch_plan(old_flyte_lp.id, "INACTIVE")

    # create new launchplan
    new_lp = LaunchPlan.get_or_create(
        wf,
        name=f"launchplan_{user}",
        schedule=CronSchedule(schedule=cron_schedule, ...),
    )
    new_flyte_lp = remote.register_launch_plan(new_lp, version="new_version")
    remote.client.update_launch_plan(new_flyte_lp, "ACTIVE")
you can either implement this in your application code, or in a
@dynamic
workflow (see docs), but you’ll need to configure it such that the docker image running the dynamic workflow pod can access your Flyte cluster.
r
ya this is exactly what i’m trying to do. One thing i ran into was fast register error, I just realized my repo needs to be zipped up and uploaded for fast registration, that’s where I kinda got stuck
n
oh,
FlyteRemote
can handle this, you need to provide FastSerializationSettings to your SerializationSettings config. We’ll need to document this, but here’s an example:
Copy code
from flytekit.tools import fast_registration, repo

detected_root = repo.find_common_root(["."])
zip_file = fast_registration.fast_package(detected_root, output_dir=None, deref_symlinks=False)

# Upload zip file to Admin using FlyteRemote.
_, native_url = remote.fast_package(detected_root)

fast_serialization_settings = FastSerializationSettings(
    enabled=True,
    distribution_location=native_url,
)

serialization_settings = SerializationSettings(
   ...
   fast_serialization_settings=fast_serialization_settings,
)

# now you can register the launch plan as in the code snippet about
remote.register_launch_plan(lp, serialization_settings, version)
Would you mind opening up a [flyte-docs] issue for documentating this better? 👇
r
thanks @Niels Bantilan! btw, I managed to successfully run a launch plan from my api layer!! here is my launch plan code
Copy code
launch_plan = LaunchPlan.get_or_create(
                process_data_wf_3,
                name="wf_launchplan_3",
                schedule=FixedRate(duration=timedelta(minutes=1)),
                default_inputs={"x": 4, "y": 5},
            )

            execution = self.remote.execute(
                launch_plan,
                inputs={},
                version="kbPtdOiwwodI6FUmBip_wA==",
                project="flytesnacks",
                domain=self.settings.flyte_domain,
            )
however it only ran once, if I understood the code above correctly, shouldn’t it keep running it once every minute due to fixed rate schedule?
n
nice! so yes, when you execute a launch plan it will actually just run… for it to be activated as a schedule you need to do
Copy code
remote.client.update_launch_plan(new_flyte_lp, "ACTIVE")
calling
remote.execute
will just run the launch plan outside of the schedule. Activating the schedule will make it run on the configured schedule. Now if you want to update the launch plan schedule, you’ll have to do the steps I mentioned before to deactivate the old launchplan version, register the new one, and activate the new one. Does that make sense?
r
@Niels Bantilan Thanks so much again! I have the whole thing working now 🕺 !!
n
btw I just learned you don’t have to manually deactivate old launch plans… you can just
ACTIVE
ate the new version of the launchplan and that will automatically deactivate the old ones
r
yes, that’s exactly what I ended up doing, much less code 🙂
I’m getting some cached error, this happens when I run it a second time, or any subsequent number of times… and cron expression for the launch plan does not update
Copy code
CACHED OUT PUT.... {'_name': 'launch_plan_4a6bdc85-5652-4791-9347-bed36a86b4ee', '_workflow': WorkflowBase - app.flyte.tasks.process_data_wf_3 && Inputs (2): {'x': <class 'int'>, 'y': <class 'int'>} && Outputs (0): {} && Output bindings: [] && , '_parameters': parameters {
  key: "x"
  value {
    var {
      type {
        simple: INTEGER
      }
      description: "x"
    }
    default {
      scalar {
        primitive {
          integer: 4
        }
      }
    }
  }
}
parameters {
  key: "y"
  value {
    var {
      type {
        simple: INTEGER
      }
      description: "y"
    }
    default {
      scalar {
        primitive {
          integer: 5
        }
      }
    }
  }
}
, '_fixed_inputs': , '_saved_inputs': {'x': 4, 'y': 5}, '_schedule': cron_schedule {
  schedule: "*/1 * * * *"
}
, '_notifications': [], '_labels': None, '_annotations': None, '_raw_output_data_config': None, '_max_parallelism': None, '_security_context': None}
ERRORR>>>>>>>>>>>>
Error at execute_custom_dashboard_workflow..... The cached values aren't the same as the current call arguments
this is the code
Copy code
try:
            cron_expression = request.get("cron")

            print("cron expression", cron_expression)

            detected_root = repo.find_common_root(["."])
            _, fast_url = self.fast_package(detected_root, deref_symlinks=False)

            fast_serialization_settings = FastSerializationSettings(
                enabled=True, distribution_location=fast_url, destination_dir="/root"
            )

            serialization_settings = SerializationSettings(
                project="flytesnacks",
                domain=self.settings.flyte_domain,
                env=None,
                image_config=ImageConfig(
                    default_image=Image(
                        name="custom_container_task",
                        fqn="<http://cr.flyte.org/flyteorg/flytekit|cr.flyte.org/flyteorg/flytekit>",
                        tag="py3.10-1.3.2",
                    )
                ),
                fast_serialization_settings=fast_serialization_settings,
            )

            launch_plan_name = f"launch_plan_{str(report_id)}"

            self.register_task(serialization_settings)
            self.register_workflow(serialization_settings)
            # self.disable_old_launch_plan(launch_plan_name)

            launch_plan = LaunchPlan.get_or_create(
                process_data_wf_3,
                name=launch_plan_name,
                schedule=CronSchedule(schedule=cron_expression),
                default_inputs={"x": 4, "y": 5},
            )

            flyte_lp_id = self.register_launch_plan(launch_plan)

            self.remote.client.update_launch_plan(flyte_lp_id.id, "ACTIVE")

            print("Success!!")
        except Exception as e:
            print("Error at execute_custom_dashboard_workflow.....", e)

        return launch_plan
do i need a unique version everytime?
n
yep, Flyte versions are immutable so you can’t override them. A random id or timestamp would be okay, we typically recommend the gitsha or your repo, but you can do anything as long as it’s unique
r
so if I have a launchplan, V1 running every 1 minute, and V2, every 3 minutes, when i update the launchplan, the old version will not be running , am i right?
Screen Shot 2023-02-15 at 6.26.12 PM.png
in this case only the one that starts with b0d will run? and the old one will stop being scheduled right?
s
Apologies for the delayed response. The old one gets deactivated if the schedule is the same. If not, you'll need to manually deactivate it I believe.
164 Views