little-cricket-84530
12/28/2022, 5:23 AMglamorous-carpet-83516
12/28/2022, 6:15 AMglamorous-carpet-83516
12/28/2022, 6:16 AMkubectl describe
? just want to make sure flyte set the resource correctlylittle-cricket-84530
12/28/2022, 6:17 AM# -- Task default resources configuration
task_resource_defaults:
task_resources:
defaults:
cpu: 100m
memory: 200Mi
storage: 200Mi
ephemeral-storage: 1Gi
gpu: 0
limits:
cpu: 9
memory: 18Gi
storage: 20Gi
ephemeral-storage: 28Gi
gpu: 0
little-cricket-84530
12/28/2022, 6:18 AMglamorous-carpet-83516
12/28/2022, 6:19 AMlittle-cricket-84530
12/28/2022, 6:27 AM{"asctime": "2022-12-28 05:20:19,825", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:25,924", "name": "flytekit", "levelname": "INFO", "message": "We won't register PyTorchCheckpointTransformer, PyTorchTensorTransformer, and PyTorchModuleTransformer because torch is not installed."}
{"asctime": "2022-12-28 05:20:26,124", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,124", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,125", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,125", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:27,733", "name": "flytekit.entrypoint", "levelname": "INFO", "message": "Welcome to Flyte! Version: 1.2.1"}
{"asctime": "2022-12-28 05:20:27,734", "name": "flytekit.entrypoint", "levelname": "INFO", "message": "Using user directory /tmp/flyte-i8059ewt/sandbox/local_flytekit/9f57ef630f6df792206a67f0b4e593c5"}
{"asctime": "2022-12-28 05:20:32,830", "name": "flytekit", "levelname": "INFO", "message": "Entering timed context: Copying (<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> -> /tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb)"}
{"asctime": "2022-12-28 05:20:41,531", "name": "flytekit", "levelname": "INFO", "message": "Output of command '['aws', 's3', 'cp', '<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb>', '/tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb']':\nb'Completed 130 Bytes/130 Bytes (697 Bytes/s) with 1 file(s) remaining\\rdownload: <s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> to ../tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb\\n'\n"}
{"asctime": "2022-12-28 05:20:41,532", "name": "flytekit", "levelname": "INFO", "message": "Exiting timed context: Copying (<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> -> /tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb) [Wall Time: 8.701755479996791s, Process Time: 0.027927802s]"}
{"asctime": "2022-12-28 05:20:41,534", "name": "flytekit", "levelname": "INFO", "message": "Invoking flyte.workflows.my_workflow.read_input_records with inputs: {'num_requests': 1000, 'input_path': '<s3://my_bucket/some_file_name.parquet>'}"}
glamorous-carpet-83516
12/28/2022, 6:29 AMlittle-cricket-84530
12/28/2022, 6:32 AMLimits:
cpu: 100m
memory: 200Mi
Requests:
cpu: 100m
memory: 200Mi
little-cricket-84530
12/28/2022, 6:32 AMglamorous-carpet-83516
12/28/2022, 6:34 AMglamorous-carpet-83516
12/28/2022, 6:35 AMlittle-cricket-84530
12/28/2022, 6:35 AMlittle-cricket-84530
12/28/2022, 6:36 AMkubectl describe pod flytepropeller-6984f5cd7-ws65t
Limits:
cpu: 200m
ephemeral-storage: 100Mi
memory: 200Mi
Requests:
cpu: 10m
ephemeral-storage: 50Mi
memory: 100Mi
little-cricket-84530
12/28/2022, 6:37 AMglamorous-carpet-83516
12/28/2022, 6:37 AMglamorous-carpet-83516
12/28/2022, 6:37 AMlittle-cricket-84530
12/28/2022, 6:38 AMlittle-cricket-84530
12/28/2022, 6:42 AMk8s.yaml: |
plugins:
k8s:
default-cpus: 100m
default-env-vars: []
default-memory: 100Mi
little-cricket-84530
12/28/2022, 6:42 AMglamorous-carpet-83516
12/28/2022, 6:45 AMlittle-cricket-84530
12/28/2022, 6:46 AMkubectl edit cm flyte-admin-base-config
task_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 100m
ephemeral-storage: 1Gi
gpu: 0
memory: 200Mi
storage: 200Mi
limits:
cpu: 9
ephemeral-storage: 28Gi
gpu: 0
memory: 18Gi
storage: 20Gi
glamorous-carpet-83516
12/28/2022, 6:54 AMglamorous-carpet-83516
12/28/2022, 7:00 AMlittle-cricket-84530
12/28/2022, 7:00 AMglamorous-carpet-83516
12/28/2022, 7:01 AMglamorous-carpet-83516
12/28/2022, 7:10 AMtask_resources.defaults.
To change the limit value, you have to update the limit in task decorator.little-cricket-84530
12/28/2022, 7:11 AMlittle-cricket-84530
12/28/2022, 7:13 AMlimits
in the decorator? There’s also requests
in the decoratorglamorous-carpet-83516
12/28/2022, 7:13 AMlittle-cricket-84530
12/28/2022, 7:13 AMglamorous-carpet-83516
12/28/2022, 7:14 AMglamorous-carpet-83516
12/28/2022, 7:16 AMtask_resources
by defaultlittle-cricket-84530
12/28/2022, 7:23 AMlittle-cricket-84530
12/28/2022, 7:27 AMlittle-cricket-84530
12/28/2022, 7:31 AMfreezing-airport-6809
little-cricket-84530
12/28/2022, 5:13 PMExiting timed context: Writing (/tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d -> <s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d>) [Wall Time: 6.298165863845497s, Process Time: 0.029386856000002126s]
1614 Output of command '['aws', 's3', 'cp', '--acl', 'bucket-owner-full-control', '/tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d', '<s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d']'>:
b'Completed 8.9 KiB/8.9 KiB (47.7 KiB/s) with 1 file(s) remaining\rupload: ../tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d to <s3://tesla-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d>\n'
1615 Entering timed context: Writing (/tmp/flyte2qtcr_i0/local_flytekit/8016c26332f3085b27a3d08692b5df94 -> <s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/ebb807108d3cd57bbfff58b978a8fa22/8016c26332f3085b27a3d08692b5df94>)
little-cricket-84530
12/28/2022, 5:14 PMlittle-cricket-84530
12/28/2022, 6:31 PMglamorous-carpet-83516
12/28/2022, 6:37 PMglamorous-carpet-83516
12/28/2022, 6:37 PMlittle-cricket-84530
12/28/2022, 6:38 PMlittle-cricket-84530
12/28/2022, 7:11 PMglamorous-carpet-83516
12/28/2022, 7:13 PMlittle-cricket-84530
12/28/2022, 7:14 PMglamorous-carpet-83516
12/28/2022, 7:16 PMlittle-cricket-84530
12/28/2022, 7:16 PMglamorous-carpet-83516
12/28/2022, 7:18 PMlittle-cricket-84530
12/28/2022, 8:25 PMfreezing-airport-6809
freezing-airport-6809
little-cricket-84530
12/28/2022, 8:26 PMfreezing-airport-6809
glamorous-carpet-83516
12/28/2022, 8:27 PMlittle-cricket-84530
12/28/2022, 8:29 PMglamorous-carpet-83516
12/28/2022, 8:29 PMlittle-cricket-84530
12/28/2022, 8:32 PMimport awswrangler as wr
import pandas as pd
import boto3
from time import perf_counter
@task(
#cache=True,
#cache_version="1.0",
limits=Resources(mem="10Gi"),
disable_deck=True,
secret_requests=[
Secret(group=SECRET_GROUP, key="aws_secret_access_key"),
Secret(group=SECRET_GROUP, key="aws_access_key_id"),
],
)
def read_input_records(input_path: str, num_requests: int) -> List[Dict[str, Any]]:
aws_access_key_id = flytekit.current_context().secrets.get(
SECRET_GROUP, "aws_access_key_id"
)
aws_secret_access_key = flytekit.current_context().secrets.get(
SECRET_GROUP, "aws_secret_access_key"
)
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=AWS_REGION,
)
start = perf_counter()
df = wr.s3.read_parquet(path=input_path, boto3_session=session)
stop = perf_counter()
flytekit.current_context().<http://logging.info|logging.info>(
f"Read {len(df)} records in {stop - start} s"
)
if num_requests != -1 and num_requests < len(df):
df = df[:num_requests]
stop_2 = perf_counter()
flytekit.current_context().<http://logging.info|logging.info>(
f"Truncated to {len(df)} records in {stop_2 - stop} s"
)
records = df.to_dict("records")
stop_3 = perf_counter()
flytekit.current_context().<http://logging.info|logging.info>(
f"Converted to records in {stop_3 - stop_2} s"
)
return records
Please adjust aws credentials etc as per your setupglamorous-carpet-83516
12/28/2022, 11:36 PMglamorous-carpet-83516
12/29/2022, 12:27 AMAny
, so it will be serialized the data to pickle, which means you will have thousand of pickle files. In addition, flytekit will upload the file serially, so it become very slow.glamorous-carpet-83516
12/29/2022, 12:27 AMany
to python type?glamorous-carpet-83516
12/29/2022, 12:28 AMimport typing
from typing import List, Dict, Any
import numpy as np
import pandas as pd
from flytekit import task, Resources, workflow, StructuredDataset
@task(
limits=Resources(mem="4Gi"),
disable_deck=True,
)
def create_parquet() -> StructuredDataset:
df = pd.DataFrame(np.random.choice(['foo','bar','baz'], size=(100000, 3)), columns=["a", "b", "c"])
df = df.apply(lambda col: col.astype('category'))
return StructuredDataset(dataframe=df)
@task(
limits=Resources(mem="4Gi"),
disable_deck=True,
)
def read_input_records(sd: StructuredDataset) -> List[Dict[str, Any]]:
df = sd.open(pd.DataFrame).all()
records = typing.cast(pd.DataFrame, df).to_dict()
return [records]*1000 # 1000 * 3
@workflow
def wf():
sd = create_parquet()
read_input_records(sd=sd)
if __name__ == "__main__":
wf()
little-cricket-84530
12/29/2022, 3:31 AMfreezing-airport-6809
little-cricket-84530
12/29/2022, 7:18 AMswift-animal-75798
12/29/2022, 9:09 AMtask_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 100m
ephemeral-storage: 1Gi
gpu: 0
memory: 200Mi
storage: 200Mi
limits:
cpu: 9
ephemeral-storage: 28Gi
gpu: 0
memory: 18Gi
storage: 20Gi
The task_resources.defaults
refers to the defaults limits that apply on any POD.
The task.resources.limits
refers to the upper limit that can be set from the user's repo.
If no request/limit is set on a task, the task_resources.defaults are applied as both.
Users cannot set higher limits for their tasks than the task_resources.limits
.glamorous-carpet-83516
12/29/2022, 8:08 PM