brash-london-45337
05/02/2024, 4:06 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
brash-london-45337
05/02/2024, 11:27 PMbrash-london-45337
05/02/2024, 11:27 PMbrash-london-45337
05/02/2024, 11:32 PMhelpful-van-10149
05/03/2024, 4:47 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
helpful-van-10149
05/03/2024, 4:56 PMhelpful-van-10149
05/03/2024, 5:03 PMhelpful-van-10149
06/18/2024, 7:39 PMdef replace_image_in_pb_files(path: str, image_uri: str):
"""
Replace the image uri for the task pb files.
Example of pb file:
template {
...
container {
image: "placeholder.image/docker:latest"
...
args: "task-module"
args: "src.python.ml.airail.hpo"
args: "task-name"
args: "start_sweep_agents"
resources {
}
env { # This is a image config for dynamic tasks.
key: "_F_SS_C"
value: "H4sIAAAAAAAC/+VTy27bMBD8lUDnSrTkmLJ66j+0tyAQ+FjK21CkSlLOC/73LKkYbtBP6EXYndkd7Qyk9wpnMcGovDM4Vd/v3isNRqw2jYUoiBNzLq5M9e2uMn9cRmgtCXQQYjNZL4VtluB1E1PABRr0bKtqY18TMKFPXrEYVLO8ppN3TYGb2Y7wslgfREICBQaBtonPAEskRsyLhSasbizQj3gS3YHnI5LIF1edUe2e973iw/1+x1vDe3nodqD1rm13XX9QLXS8A9V3x34YuOGgpBlEC+29PBxNdSGt4jaS3MN/4PcxO6bLf4NKpOhWawnQfiZvt/5MNunAGwDufGsmTGOAxd+QzeSILkFYAtAzX8v8ktiZVplEx7aZfYk0h/FEKmcMaRWWRsbgfSpLckWrWZRMijewtV8TezrWJFXDC6j6569a9b3hmvPjfuiLNOX8Kc+2+L/mzLac2decWYSAwuJbSdyghUgio84jOQ12vfLfongQMY1XifIa6lJCN8Xy54AT0oKm2ggbIYcMkehtVGP4K33M345cC2O9KiOf9OXyATTgdGSoAwAA"
}
}
:param path: pb file path
:param real_image: real image uri
:return:
"""
# Initialize a protobuf object instance based on TaskSpec
pb_object = TaskSpec()
# Read and parse the protobuf message from file
with open(path, "rb") as f:
data = f.read()
pb_object.ParseFromString(data)
# Replace image uri in container.image
if pb_object.template.container.image != IMAGE_PLACEHOLDER:
raise ValueError(
f"Image uri is not the placeholder in {path}, this file is not generated by workflow_serializer.py"
)
pb_object.template.container.image = image_uri
# Replace image uri in container.env
if len(pb_object.template.container.env) > 0:
container_env_value = pb_object.template.container.env[0].value
pb_object.template.container.env[0].value = replace_image_in_container_env_value(
str(container_env_value), image_uri
)
log.debug(f"New pb file of {path}: {pb_object.template.container}")
with open(path, "wb") as f:
f.write(pb_object.SerializeToString())
def replace_image_in_container_env_value(value: str, real_image: str) -> str:
"""
Original implementation of serialized_context in flytekit: <https://github.com/flyteorg/flytekit/blob/v1.9.1/flytekit/configuration/__init__.py#L818-L828>
Example container_env_value:
env {
key: "_F_SS_C"
value: "H4sIAAAAAAAC/+VTQY7bMAz8ysLn2mw22TTpN9pbUQi0RDtsZEmV6DTOIn+vpDTIAttbj71Y5AwxFEf0a8MTjqS0dwOPzeen18bQgLMVVYmKOJxKcGeaD0/N8NMV5LxcOvZgvD5SfC6EYFFpLAolaa4ZqTopg9/+Sel70QrR/yAtGXeztRkwfkJ2j/xEMbF/A5A7PZKRRUUK/oGERQ7eKXZCMUTK39ITAi5w8BPByLanKNBp1AeCHi9kQdVD3bmX/YAfPxHisFvpXuvtM24320Hjar3ebzYvQGfS0XsB7SeVJHIgdfHeojMUb5qtnwWOu9YHaUt5++Vru1qbtTFo9qv9Dnp2kKKG24VhsIsQTFbROVgfUfLQgByRLaRfRCFlBqdgCRJFRssX6uKcH9lS+vs9ug7eu1H73uD6WKXtMbt44igz2uyuKpP9D6aFRZlSUhYO7ka8D6pNmETdJWqbnImwG1P9ochhb8nkeECbqOxx3nF2t1LD8c2Cc5m9nytjva4lf+jr9Td99Ag7vwMAAA=="
}
In order to decode it, we need to do the thing in reverse:
1. Decode the base64 encoded string
2. Decompress the data with gzip
3. Decode the bytes to string
Example of the decoded value:
{'image_config': {'default_image': {'name': 'default', 'fqn': '<http://xyz.io/docker2|xyz.io/docker2>', 'tag': 'latest'}, 'images': [{'name': 'default', 'fqn': '<http://xyz.io/docker2|xyz.io/docker2>', 'tag': 'latest'}]}
Afterward, we will replace the image uri in the decoded value and decode it back to the base64 encoded string.
{'image_config': {'default_image': {'name': 'default', 'fqn': '<http://containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256|containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256>', 'tag': '2fc13677c6943061f67b520edd0110275c1e262ec7287996f6ecbf9a1e14b58f'}, 'images': [{'name': 'default', 'fqn': '<http://containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256|containers.global.prod.stripe.io/stripe-flyte/adhoc/src.python.flyte.ml_exploration.airail.sweeps_example.run_sweep@sha256>', 'tag': '2fc13677c6943061f67b520edd0110275c1e262ec7287996f6ecbf9a1e14b58f'}]}
:param value: The value of the container.env
:param real_image: The real image uri
:return: The new encoded value of the container.env
"""
if not value:
raise ValueError("Cannot decode an empty value")
# Start with base64 decoding
b64_decoded_bytes = base64.b64decode(value)
# Decompress the data with gzip
buf = BytesIO(b64_decoded_bytes)
with gzip.GzipFile(mode="rb", fileobj=buf) as f:
json_bytes = f.read()
decoded_json = json_bytes.decode("utf-8")
image_config = json.loads(decoded_json)
try:
image_repo, image_sha = real_image.split(":")
# Update 'fqn' and 'tag' in 'default_image'
image_config["image_config"]["default_image"]["fqn"] = image_repo
image_config["image_config"]["default_image"]["tag"] = image_sha
# Update 'fqn' and 'tag' in 'images'
for image in image_config["image_config"]["images"]:
image["fqn"] = image_repo
image["tag"] = image_sha
<http://log.info|log.info>(f"new image config: {image_config}")
except Exception as e:
raise ValueError(f"Cannot update image uri: {real_image}. Error: {e}")
# Convert the image_config to a JSON string
decoded_json_replaced = json.dumps(image_config)
# Compress the JSON string with gzip
buf = BytesIO()
with gzip.GzipFile(mode="wb", fileobj=buf, mtime=0) as f:
f.write(decoded_json_replaced.encode("utf-8"))
# Encode the compressed data with base64
return base64.b64encode(buf.getvalue()).decode("utf-8")
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
helpful-van-10149
06/19/2024, 8:44 PMhelpful-van-10149
06/19/2024, 8:46 PMwhy are you doing this again? is it just for the sha? or is the name itself unknown?It is a long story. But in short we cannot know the image uri when we serialize the workflow. Thus we have to replace the image uri in another service when we registering the workflows
helpful-van-10149
06/19/2024, 8:47 PMif you’re doing this for the shaIt is for sha. I was thinking the same, since the code above is just happened to work due to Flytekit won't validate the image again when registering.
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
helpful-van-10149
06/20/2024, 1:49 AMso you know the image name, but your company requires that images are run with the sha specified…We won't know the image name. When we serialize the workflows, we pass a dummy image placeholder.