Rupsha Chaudhuri
12/28/2022, 5:23 AMKevin Su
12/28/2022, 6:15 AMkubectl describe
? just want to make sure flyte set the resource correctlyRupsha Chaudhuri
12/28/2022, 6:17 AM# -- Task default resources configuration
task_resource_defaults:
task_resources:
defaults:
cpu: 100m
memory: 200Mi
storage: 200Mi
ephemeral-storage: 1Gi
gpu: 0
limits:
cpu: 9
memory: 18Gi
storage: 20Gi
ephemeral-storage: 28Gi
gpu: 0
Kevin Su
12/28/2022, 6:19 AMRupsha Chaudhuri
12/28/2022, 6:27 AM{"asctime": "2022-12-28 05:20:19,825", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:25,924", "name": "flytekit", "levelname": "INFO", "message": "We won't register PyTorchCheckpointTransformer, PyTorchTensorTransformer, and PyTorchModuleTransformer because torch is not installed."}
{"asctime": "2022-12-28 05:20:26,124", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,124", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,125", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,125", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:27,733", "name": "flytekit.entrypoint", "levelname": "INFO", "message": "Welcome to Flyte! Version: 1.2.1"}
{"asctime": "2022-12-28 05:20:27,734", "name": "flytekit.entrypoint", "levelname": "INFO", "message": "Using user directory /tmp/flyte-i8059ewt/sandbox/local_flytekit/9f57ef630f6df792206a67f0b4e593c5"}
{"asctime": "2022-12-28 05:20:32,830", "name": "flytekit", "levelname": "INFO", "message": "Entering timed context: Copying (<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> -> /tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb)"}
{"asctime": "2022-12-28 05:20:41,531", "name": "flytekit", "levelname": "INFO", "message": "Output of command '['aws', 's3', 'cp', '<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb>', '/tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb']':\nb'Completed 130 Bytes/130 Bytes (697 Bytes/s) with 1 file(s) remaining\\rdownload: <s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> to ../tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb\\n'\n"}
{"asctime": "2022-12-28 05:20:41,532", "name": "flytekit", "levelname": "INFO", "message": "Exiting timed context: Copying (<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> -> /tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb) [Wall Time: 8.701755479996791s, Process Time: 0.027927802s]"}
{"asctime": "2022-12-28 05:20:41,534", "name": "flytekit", "levelname": "INFO", "message": "Invoking flyte.workflows.my_workflow.read_input_records with inputs: {'num_requests': 1000, 'input_path': '<s3://my_bucket/some_file_name.parquet>'}"}
Kevin Su
12/28/2022, 6:29 AMRupsha Chaudhuri
12/28/2022, 6:32 AMLimits:
cpu: 100m
memory: 200Mi
Requests:
cpu: 100m
memory: 200Mi
Kevin Su
12/28/2022, 6:34 AMRupsha Chaudhuri
12/28/2022, 6:35 AMkubectl describe pod flytepropeller-6984f5cd7-ws65t
Limits:
cpu: 200m
ephemeral-storage: 100Mi
memory: 200Mi
Requests:
cpu: 10m
ephemeral-storage: 50Mi
memory: 100Mi
Kevin Su
12/28/2022, 6:37 AMRupsha Chaudhuri
12/28/2022, 6:38 AMk8s.yaml: |
plugins:
k8s:
default-cpus: 100m
default-env-vars: []
default-memory: 100Mi
Kevin Su
12/28/2022, 6:45 AMRupsha Chaudhuri
12/28/2022, 6:46 AMtask_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 100m
ephemeral-storage: 1Gi
gpu: 0
memory: 200Mi
storage: 200Mi
limits:
cpu: 9
ephemeral-storage: 28Gi
gpu: 0
memory: 18Gi
storage: 20Gi
Kevin Su
12/28/2022, 6:54 AMRupsha Chaudhuri
12/28/2022, 7:00 AMKevin Su
12/28/2022, 7:01 AMtask_resources.defaults.
To change the limit value, you have to update the limit in task decorator.Rupsha Chaudhuri
12/28/2022, 7:11 AMlimits
in the decorator? There’s also requests
in the decoratorRupsha Chaudhuri
12/28/2022, 7:13 AMKevin Su
12/28/2022, 7:14 AMtask_resources
by defaultRupsha Chaudhuri
12/28/2022, 7:23 AMKetan (kumare3)
Rupsha Chaudhuri
12/28/2022, 5:13 PMExiting timed context: Writing (/tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d -> <s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d>) [Wall Time: 6.298165863845497s, Process Time: 0.029386856000002126s]
1614 Output of command '['aws', 's3', 'cp', '--acl', 'bucket-owner-full-control', '/tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d', '<s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d']'>:
b'Completed 8.9 KiB/8.9 KiB (47.7 KiB/s) with 1 file(s) remaining\rupload: ../tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d to <s3://tesla-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d>\n'
1615 Entering timed context: Writing (/tmp/flyte2qtcr_i0/local_flytekit/8016c26332f3085b27a3d08692b5df94 -> <s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/ebb807108d3cd57bbfff58b978a8fa22/8016c26332f3085b27a3d08692b5df94>)
Kevin Su
12/28/2022, 6:37 PMRupsha Chaudhuri
12/28/2022, 6:38 PMKevin Su
12/28/2022, 7:13 PMRupsha Chaudhuri
12/28/2022, 7:14 PMKevin Su
12/28/2022, 7:16 PMRupsha Chaudhuri
12/28/2022, 7:16 PMKevin Su
12/28/2022, 7:18 PMRupsha Chaudhuri
12/28/2022, 8:25 PMKetan (kumare3)
Rupsha Chaudhuri
12/28/2022, 8:26 PMKetan (kumare3)
Kevin Su
12/28/2022, 8:27 PMRupsha Chaudhuri
12/28/2022, 8:29 PMKevin Su
12/28/2022, 8:29 PMRupsha Chaudhuri
12/28/2022, 8:32 PMimport awswrangler as wr
import pandas as pd
import boto3
from time import perf_counter
@task(
#cache=True,
#cache_version="1.0",
limits=Resources(mem="10Gi"),
disable_deck=True,
secret_requests=[
Secret(group=SECRET_GROUP, key="aws_secret_access_key"),
Secret(group=SECRET_GROUP, key="aws_access_key_id"),
],
)
def read_input_records(input_path: str, num_requests: int) -> List[Dict[str, Any]]:
aws_access_key_id = flytekit.current_context().secrets.get(
SECRET_GROUP, "aws_access_key_id"
)
aws_secret_access_key = flytekit.current_context().secrets.get(
SECRET_GROUP, "aws_secret_access_key"
)
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=AWS_REGION,
)
start = perf_counter()
df = wr.s3.read_parquet(path=input_path, boto3_session=session)
stop = perf_counter()
flytekit.current_context().<http://logging.info|logging.info>(
f"Read {len(df)} records in {stop - start} s"
)
if num_requests != -1 and num_requests < len(df):
df = df[:num_requests]
stop_2 = perf_counter()
flytekit.current_context().<http://logging.info|logging.info>(
f"Truncated to {len(df)} records in {stop_2 - stop} s"
)
records = df.to_dict("records")
stop_3 = perf_counter()
flytekit.current_context().<http://logging.info|logging.info>(
f"Converted to records in {stop_3 - stop_2} s"
)
return records
Please adjust aws credentials etc as per your setupKevin Su
12/28/2022, 11:36 PMAny
, so it will be serialized the data to pickle, which means you will have thousand of pickle files. In addition, flytekit will upload the file serially, so it become very slow.any
to python type?import typing
from typing import List, Dict, Any
import numpy as np
import pandas as pd
from flytekit import task, Resources, workflow, StructuredDataset
@task(
limits=Resources(mem="4Gi"),
disable_deck=True,
)
def create_parquet() -> StructuredDataset:
df = pd.DataFrame(np.random.choice(['foo','bar','baz'], size=(100000, 3)), columns=["a", "b", "c"])
df = df.apply(lambda col: col.astype('category'))
return StructuredDataset(dataframe=df)
@task(
limits=Resources(mem="4Gi"),
disable_deck=True,
)
def read_input_records(sd: StructuredDataset) -> List[Dict[str, Any]]:
df = sd.open(pd.DataFrame).all()
records = typing.cast(pd.DataFrame, df).to_dict()
return [records]*1000 # 1000 * 3
@workflow
def wf():
sd = create_parquet()
read_input_records(sd=sd)
if __name__ == "__main__":
wf()
Rupsha Chaudhuri
12/29/2022, 3:31 AMKetan (kumare3)
Rupsha Chaudhuri
12/29/2022, 7:18 AMBabis Kiosidis
12/29/2022, 9:09 AMtask_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 100m
ephemeral-storage: 1Gi
gpu: 0
memory: 200Mi
storage: 200Mi
limits:
cpu: 9
ephemeral-storage: 28Gi
gpu: 0
memory: 18Gi
storage: 20Gi
The task_resources.defaults
refers to the defaults limits that apply on any POD.
The task.resources.limits
refers to the upper limit that can be set from the user's repo.
If no request/limit is set on a task, the task_resources.defaults are applied as both.Kevin Su
12/29/2022, 8:08 PM