ripe-battery-86055
09/02/2024, 10:58 AMfrom kubernetes import client as k8s_client, config
from flytekit import workflow
from flytekit.configuration import SerializationSettings
from flytekit.exceptions import user as _user_exceptions
from flytekitplugins.job.task import Job, JobFunctionTask  # Import from the plugin file
# Step 1: Define the Job Specification
job_spec = k8s_client.V1Job(
    api_version="batch/v1",
    kind="Job",
    metadata=k8s_client.V1ObjectMeta(name="example-job"),
    spec=k8s_client.V1JobSpec(
        template=k8s_client.V1PodTemplateSpec(
            spec=k8s_client.V1PodSpec(
                containers=[
                    k8s_client.V1Container(
                        name="example",
                        image="busybox",
                        command=["sleep", "100s"]
                    )
                ],
                restart_policy="Never"
            )
        )
    )
)
# Step 2: Define Metadata
metadata = k8s_client.V1ObjectMeta(
    name="example-job",
    labels={"example-label": "example-value"},
    annotations={"example-annotation": "example-value"}
)
# Step 3: Create Job Configuration
job_config = Job(job_spec=job_spec, metadata=metadata)
# Step 4: Create Job Function Task
def sample_task_function():
    pass
job_task = JobFunctionTask(task_config=job_config, task_function=sample_task_function)
# Step 5: Get Custom Job Dictionary
serialization_settings = SerializationSettings()
job_dict = job_task.get_custom(serialization_settings)
print(job_dict)