Anthony
09/28/2022, 12:53 PMpreproc_and_split
step was executed successfully:
pyflyte-execute
--inputs
<s3://my-s3-bucket/metadata/propeller/flyte-anti-fraud-ml-development-a27rchl5z9ndpw297nk8/n0/data/inputs.pb>
--output-prefix
<s3://my-s3-bucket/metadata/propeller/flyte-anti-fraud-ml-development-a27rchl5z9ndpw297nk8/n0/data/0>
--raw-output-data-prefix
<s3://my-s3-bucket/vo/a27rchl5z9ndpw297nk8-n0-0>
--checkpoint-path
<s3://my-s3-bucket/vo/a27rchl5z9ndpw297nk8-n0-0/_flytecheckpoints>
--prev-checkpoint
""
--resolver
flytekit.core.python_auto_container.default_task_resolver
--
task-module
app.workflow
task-name
preproc_and_split
On the output one should expect a small train dataset with 50k records.
In Nods allocation i see a sufficient mem available.
But then the first task has been succeeded i see an eternal hang in this step and flyte don’t produce next executions according to the workflow.
task_resource_defaults
conf is the next:
task_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 1
memory: 3000Mi
storage: 200Mi
limits:
cpu: 5
gpu: 1
memory: 8Gi
storage: 500Mi
I have one task that generates a dataclasses instances on the exit and another task should takes these classes as input params:
@workflow
def main_flow() -> Forecast:
"""
Main Flyte WorkFlow consisting of three tasks:
- @preproc_and_split
- @train_xgboost_clf
- @get_predictions
"""
<http://logger.info|logger.info>(log="#START -- START Raw Preprocessing and Splitting", timestamp=None)
train_cls, target_cls = preproc_and_split()
<http://logger.info|logger.info>(log="#START -- START Initialize Boosting Params", timestamp=None)
saved_mpath = train_xgboost_clf(
feat_cls=train_cls,
target_cls=target_cls,
xgb_params=xgb_params,
cust_metric=BoostingCustMetric
)
Where
def preproc_and_split() -> Tuple[Fraud_Raw_PostProc_Data_Class, Fraud_Raw_Target_Data_Class]:
Any advices why I faced this behaviour?Ketan (kumare3)
Shivay Lamba
09/28/2022, 2:27 PMAnthony
09/28/2022, 2:46 PMpreproc_and_split
task has been executed successfully but I cannot pass its output to the next training task.
I have 14 Gb mem in my local machine in total.
task_resource_defaults
conf is the next:
task_resource_defaults.yaml: |
task_resources:
defaults:
cpu: 1
memory: 3000Mi
storage: 200Mi
limits:
cpu: 5
gpu: 1
memory: 8Gi
storage: 500Mi
Yee
Ketan (kumare3)
Anthony
09/29/2022, 4:48 PMkubectl -n flyte edit cm flyte-admin-base-config
• Edit --> kubectl edit configmap -n flyte flyte-propeller-config
• !!REMOVE task decorator for preproc_and_split()
So only two task has been added to flow:
@task(
cache=False,
cache_version=CACHE_VERSION,
requests=request_resources,
#limits=limit_resources,
)
def train_xgboost_clf(feat_cls: Fraud_Raw_PostProc_Data_Class,
target_cls: Fraud_Raw_Target_Data_Class,
xgb_params: dict,
cust_metric: BoostingCustMetric) -> JoblibSerializedFile:
and next one:
@task(
cache=False,
cache_version=CACHE_VERSION,
requests=request_resources,
#limits=limit_resources,
)
def get_predictions(feat_cls: Fraud_Raw_PostProc_Data_Class,
target_cls: Fraud_Raw_Target_Data_Class,
model_path: JoblibSerializedFile,
thresh: float) -> Forecast:
🚀 Workflow:
@workflow
def main_flow() -> Forecast:
"""
Main Flyte WorkFlow consisting of three tasks:
- @preproc_and_split
- @train_xgboost_clf
- @get_predictions
"""
<http://logger.info|logger.info>(log="#START -- START Raw Preprocessing and Splitting", timestamp=None)
train_cls, target_cls = preproc_and_split()
<http://logger.info|logger.info>(log="#START -- START Initialize Boosting Params", timestamp=None)
saved_mpath = train_xgboost_clf(
feat_cls=train_cls,
target_cls=target_cls,
xgb_params=xgb_params,
cust_metric=BoostingCustMetric
)
forecast = get_predictions(feat_cls=train_cls,
target_cls=target_cls,
model_path=saved_mpath,
thresh=THRESH
)
# serialize_model(model=xgb_clf)
return forecast
• Run workflow locally: pyflyte run app/workflow.py main_flow
- SUCCESS ✅
Ok going further to serialise
and `register`:
pyflyte -c .flyte/sandbox.config --pkgs app package \
--force \
--in-container-source-path /root \
--image anti-fraud-ml-train:c20b6ee57601c0ddf12874fe671603b5ed78ece8
pyflyte
first runs my function preproc_and_split
that produces:
<class 'pandas.core.frame.DataFrame'>, (53621, 46)
Here is a final log output for serialising all pb files:
So we successfully serialised 4 flyte objects and packaged them.
2022-09-29 16:37:04.830|anti-fraud-ml-train ##END -- Feature Transformation Pipeline
2022-09-29 16:37:04.830|anti-fraud-ml-train #START -- Train data info:
<class 'pandas.core.frame.DataFrame'>, (53621, 46)
2022-09-29 16:37:04.879|anti-fraud-ml-train #START -- START Initialize Boosting Params
{"asctime": "2022-09-29 17:37:20,155", "name": "flytekit", "levelname": "WARNING", "message": "Failed to extract schema for object <class 'fraud_preproc.model_dataclass.Forecast'>, (will run schemaless) error: unsupported field type <fields._TimestampField(dump_default=<marshmallow.missing>, attribute=None, validate=None, required=False, load_only=False, dump_only=False, load_default=<marshmallow.missing>, allow_none=False, error_messages={'required': 'Missing data for required field.', 'null': 'Field may not be null.', 'validator_failed': 'Invalid value.'})>If you have postponed annotations turned on (PEP 563) turn it off please. Postponedevaluation doesn't work with json dataclasses"}
Successfully serialized 4 flyte objects
Packaging app.workflow.train_xgboost_clf -> 0_app.workflow.train_xgboost_clf_1.pb
Packaging app.workflow.get_predictions -> 1_app.workflow.get_predictions_1.pb
Packaging app.workflow.main_flow -> 2_app.workflow.main_flow_2.pb
Packaging app.workflow.main_flow -> 3_app.workflow.main_flow_3.pb
Successfully packaged 4 flyte objects into /Users/aektov/pyflyte/anti-fraud-ml-train/flyte-package.tgz
Lets register all objects:
flytectl -c .flyte/sandbox-config.yml \
register files \
--project flyte-anti-fraud-ml \
--domain development \
--archive flyte-package.tgz \
--outputLocationPrefix <s3://my-s3-bucket/anti-fraud-ml-train> \
--force \
--version c20b6ee57601c0ddf12874fe671603b5ed78ece8
Oops we got a error:
--------------------------------------------------------------- --------- --------------------------------------------------------
| NAME | STATUS | ADDITIONAL INFO |
--------------------------------------------------------------- --------- --------------------------------------------------------
| /tmp/register2016944902/0_app.workflow.train_xgboost_clf_1.pb | Success | AlreadyExists |
--------------------------------------------------------------- --------- --------------------------------------------------------
| /tmp/register2016944902/1_app.workflow.get_predictions_1.pb | Success | AlreadyExists |
--------------------------------------------------------------- --------- --------------------------------------------------------
| /tmp/register2016944902/2_app.workflow.main_flow_2.pb | Failed | Error registering file due to rpc error: code = |
| | | ResourceExhausted desc = grpc: received message larger |
| | | than max (57552210 vs. 4194304) |
--------------------------------------------------------------- --------- --------------------------------------------------------
3 rows
Error: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (57552210 vs. 4194304)
make: *** [register] Error 1
4194304 is similar to 4 * 1024 * 1024 and this is my mem limits for task set up in flyte-admin-base-config
But i’m confused here then i saw a 57552210 ~ 54 Gb while trying to register a workflow proto file /tmp/register2016944902/2_app.workflow.main_flow_2.pb
If you remember i can easily run my workflow locally and the max mem consumption is not more than 4Gb.