Is it bad practice to modify the serialized protob...
# flyte-support
b
Is it bad practice to modify the serialized protobufs from pyflyte after they're generated? Context: we want to register flyte workflows through our build systems, but the build systems inherently don't have knowledge of the pushed container images. Would it be frowned upon to generate the container image and protobufs (with place holder urls) at build time, then load and rewrite the task protobufs with the appropriate image URL at registration time?
đź”– 1
t
are you saying you don’t know what the image will be at wf compile time?
i wouldn’t say it’s bad practice, but that is not supported today. the compiled flyte protos are meant to be portable (like no project/domain) but not that portable. that said you don’t need the image to exist before you register.
you can always create the image after. so the situation you describe is only relevant when you don’t know the container image name itself.
are you using image digests or image tag out of curiosity?
b
yeah unfortunately building images has to be done as a separate job. To answer the other complications: • We won't know the image that will be pushed because — yes we're using the docker digest which makes it rather difficult to predict • The build team is removing the ability to tag images, so we also can't rely on tagging
in terms of "not supported", this is just prebuilt functionality in flyte that's not there right? Nothing seems to be stopping us from reading in the proto, modifying, and then saving it back out
for more context — the build team is not trying to be difficult, but the ~50 tag limit per repo is limiting as they try to assign jobs to use a small number of repos to improve caching. So digests is their preferred way of referencing images
h
Hi @thankful-minister-83577 I was able to replace the image uri for few workflows and they ran successfully. I identify: 1. Only task pb files are need to replace. 2. Two places we need to replace in the TaskSpec a. container.image b. if it is dynamic task, the container.env[0].value i. We need to deserialize it to serialize settings and replace the image + image_config May i know what else we should replace too?
t
i am a bit impressed you found 2b. 🙂
but no i think that’s it
working on something else rn. let me think about this a bit more and get back to you later but i think that’s it
h
Thanks Yee!
It took me a while to find the 2b
deathlyhallows 1
Hi @thankful-minister-83577 May I have you eye on our implementation pls? We have tested it and it will work, but still want an extra confirm from your side!
Copy code
def replace_image_in_pb_files(path: str, image_uri: str):
    """
    Replace the image uri for the task pb files.
    Example of pb file:
    template {
    ...
        container {
            image: "placeholder.image/docker:latest"
            ...
            args: "task-module"
            args: "src.python.ml.airail.hpo"
            args: "task-name"
            args: "start_sweep_agents"
            resources {
            }
            env { # This is a image config for dynamic tasks.
              key: "_F_SS_C"
              value: "H4sIAAAAAAAC/+VTy27bMBD8lUDnSrTkmLJ66j+0tyAQ+FjK21CkSlLOC/73LKkYbtBP6EXYndkd7Qyk9wpnMcGovDM4Vd/v3isNRqw2jYUoiBNzLq5M9e2uMn9cRmgtCXQQYjNZL4VtluB1E1PABRr0bKtqY18TMKFPXrEYVLO8ppN3TYGb2Y7wslgfREICBQaBtonPAEskRsyLhSasbizQj3gS3YHnI5LIF1edUe2e973iw/1+x1vDe3nodqD1rm13XX9QLXS8A9V3x34YuOGgpBlEC+29PBxNdSGt4jaS3MN/4PcxO6bLf4NKpOhWawnQfiZvt/5MNunAGwDufGsmTGOAxd+QzeSILkFYAtAzX8v8ktiZVplEx7aZfYk0h/FEKmcMaRWWRsbgfSpLckWrWZRMijewtV8TezrWJFXDC6j6569a9b3hmvPjfuiLNOX8Kc+2+L/mzLac2decWYSAwuJbSdyghUgio84jOQ12vfLfongQMY1XifIa6lJCN8Xy54AT0oKm2ggbIYcMkehtVGP4K33M345cC2O9KiOf9OXyATTgdGSoAwAA"
            }
    }
    :param path: pb file path
    :param real_image: real image uri
    :return:
    """

    # Initialize a protobuf object instance based on TaskSpec
    pb_object = TaskSpec()
    # Read and parse the protobuf message from file
    with open(path, "rb") as f:
        data = f.read()
    pb_object.ParseFromString(data)

    # Replace image uri in container.image
    if pb_object.template.container.image != IMAGE_PLACEHOLDER:
        raise ValueError(
            f"Image uri is not the placeholder in {path}, this file is not generated by workflow_serializer.py"
        )
    pb_object.template.container.image = image_uri

    # Replace image uri in container.env
    if len(pb_object.template.container.env) > 0:
        container_env_value = pb_object.template.container.env[0].value
        pb_object.template.container.env[0].value = replace_image_in_container_env_value(
            str(container_env_value), image_uri
        )

    log.debug(f"New pb file of {path}: {pb_object.template.container}")
    with open(path, "wb") as f:
        f.write(pb_object.SerializeToString())


def replace_image_in_container_env_value(value: str, real_image: str) -> str:
    """
    Original implementation of serialized_context in flytekit: <https://github.com/flyteorg/flytekit/blob/v1.9.1/flytekit/configuration/__init__.py#L818-L828>
    Example container_env_value:
    env {
      key: "_F_SS_C"
      value: "H4sIAAAAAAAC/+VTQY7bMAz8ysLn2mw22TTpN9pbUQi0RDtsZEmV6DTOIn+vpDTIAttbj71Y5AwxFEf0a8MTjqS0dwOPzeen18bQgLMVVYmKOJxKcGeaD0/N8NMV5LxcOvZgvD5SfC6EYFFpLAolaa4ZqTopg9/+Sel70QrR/yAtGXeztRkwfkJ2j/xEMbF/A5A7PZKRRUUK/oGERQ7eKXZCMUTK39ITAi5w8BPByLanKNBp1AeCHi9kQdVD3bmX/YAfPxHisFvpXuvtM24320Hjar3ebzYvQGfS0XsB7SeVJHIgdfHeojMUb5qtnwWOu9YHaUt5++Vru1qbtTFo9qv9Dnp2kKKG24VhsIsQTFbROVgfUfLQgByRLaRfRCFlBqdgCRJFRssX6uKcH9lS+vs9ug7eu1H73uD6WKXtMbt44igz2uyuKpP9D6aFRZlSUhYO7ka8D6pNmETdJWqbnImwG1P9ochhb8nkeECbqOxx3nF2t1LD8c2Cc5m9nytjva4lf+jr9Td99Ag7vwMAAA=="
    }
    In order to decode it, we need to do the thing in reverse:
        1. Decode the base64 encoded string
        2. Decompress the data with gzip
        3. Decode the bytes to string
    Example of the decoded value:
        {'image_config': {'default_image': {'name': 'default', 'fqn': '<http://xyz.io/docker2|xyz.io/docker2>', 'tag': 'latest'}, 'images': [{'name': 'default', 'fqn': '<http://xyz.io/docker2|xyz.io/docker2>', 'tag': 'latest'}]}
    Afterward, we will replace the image uri in the decoded value and decode it back to the base64 encoded string.
        {'image_config': {'default_image': {'name': 'default', 'fqn': '<http://containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256|containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256>', 'tag': '2fc13677c6943061f67b520edd0110275c1e262ec7287996f6ecbf9a1e14b58f'}, 'images': [{'name': 'default', 'fqn': '<http://containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256|containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256>', 'tag': '2fc13677c6943061f67b520edd0110275c1e262ec7287996f6ecbf9a1e14b58f'}]}
    :param value: The value of the container.env
    :param real_image: The real image uri
    :return: The new encoded value of the container.env
    """
    if not value:
        raise ValueError("Cannot decode an empty value")
    # Start with base64 decoding
    b64_decoded_bytes = base64.b64decode(value)

    # Decompress the data with gzip
    buf = BytesIO(b64_decoded_bytes)
    with gzip.GzipFile(mode="rb", fileobj=buf) as f:
        json_bytes = f.read()
    decoded_json = json_bytes.decode("utf-8")
    image_config = json.loads(decoded_json)

    try:
        image_repo, image_sha = real_image.split(":")
        # Update 'fqn' and 'tag' in 'default_image'
        image_config["image_config"]["default_image"]["fqn"] = image_repo
        image_config["image_config"]["default_image"]["tag"] = image_sha

        # Update 'fqn' and 'tag' in 'images'
        for image in image_config["image_config"]["images"]:
            image["fqn"] = image_repo
            image["tag"] = image_sha

        <http://log.info|log.info>(f"new image config: {image_config}")
    except Exception as e:
        raise ValueError(f"Cannot update image uri: {real_image}. Error: {e}")
    # Convert the image_config to a JSON string
    decoded_json_replaced = json.dumps(image_config)

    # Compress the JSON string with gzip
    buf = BytesIO()
    with gzip.GzipFile(mode="wb", fileobj=buf, mtime=0) as f:
        f.write(decoded_json_replaced.encode("utf-8"))

    # Encode the compressed data with base64
    return base64.b64encode(buf.getvalue()).decode("utf-8")
t
yeah looks fine…
thanks for posting
the decoded thing is not an image config though, it’s a serialization settings object
so the name of the variable should maybe be changed.
but otherwise yeah it’s fine.
why are you doing this again? is it just for the sha? or is the name itself unknown?
if you’re doing this for the sha… then i think maybe we need to think of a more platform level change.
h
Thanks Yee!
why are you doing this again? is it just for the sha? or is the name itself unknown?
It is a long story. But in short we cannot know the image uri when we serialize the workflow. Thus we have to replace the image uri in another service when we registering the workflows
if you’re doing this for the sha
It is for sha. I was thinking the same, since the code above is just happened to work due to Flytekit won't validate the image again when registering.
t
so you know the image name, but your company requires that images are run with the sha specified…
can you create an issue with this please (maybe add in the code you have above as a work around)? i want to just have a ticket out in the open.
if there’s enough desire within the community maybe something can be done more sustainably about it
could you include some redacted examples too please of image names as you want them? i think that would hlep
h
so you know the image name, but your company requires that images are run with the sha specified…
We won't know the image name. When we serialize the workflows, we pass a dummy image placeholder.