https://flyte.org logo
a

Anthony

09/28/2022, 12:53 PM
Hey everyone again 🙌 I see a stuck in triggering to the next task. My main workflow is depicted in attached pic. First
preproc_and_split
step was executed successfully:
Copy code
pyflyte-execute
--inputs
<s3://my-s3-bucket/metadata/propeller/flyte-anti-fraud-ml-development-a27rchl5z9ndpw297nk8/n0/data/inputs.pb>
--output-prefix
<s3://my-s3-bucket/metadata/propeller/flyte-anti-fraud-ml-development-a27rchl5z9ndpw297nk8/n0/data/0>
--raw-output-data-prefix
<s3://my-s3-bucket/vo/a27rchl5z9ndpw297nk8-n0-0>
--checkpoint-path
<s3://my-s3-bucket/vo/a27rchl5z9ndpw297nk8-n0-0/_flytecheckpoints>
--prev-checkpoint
""
--resolver
flytekit.core.python_auto_container.default_task_resolver
--
task-module
app.workflow
task-name
preproc_and_split
On the output one should expect a small train dataset with 50k records. In Nods allocation i see a sufficient mem available. But then the first task has been succeeded i see an eternal hang in this step and flyte don’t produce next executions according to the workflow.
task_resource_defaults
conf is the next:
Copy code
task_resource_defaults.yaml: |
    task_resources:
      defaults:
        cpu: 1
        memory: 3000Mi
        storage: 200Mi
      limits:
        cpu: 5
        gpu: 1
        memory: 8Gi
        storage: 500Mi
I have one task that generates a dataclasses instances on the exit and another task should takes these classes as input params:
Copy code
@workflow
def main_flow() -> Forecast:
    """
    Main Flyte WorkFlow consisting of three tasks:
        -  @preproc_and_split
        -  @train_xgboost_clf
        -  @get_predictions
    """
    <http://logger.info|logger.info>(log="#START -- START Raw Preprocessing and Splitting", timestamp=None)
    train_cls, target_cls = preproc_and_split()

    <http://logger.info|logger.info>(log="#START -- START Initialize Boosting Params", timestamp=None)
    saved_mpath = train_xgboost_clf(
                            feat_cls=train_cls,
                            target_cls=target_cls,
                            xgb_params=xgb_params,
                            cust_metric=BoostingCustMetric
                         )
Where
Copy code
def preproc_and_split() -> Tuple[Fraud_Raw_PostProc_Data_Class, Fraud_Raw_Target_Data_Class]:
Any advices why I faced this behaviour?
k

Ketan (kumare3)

09/28/2022, 1:57 PM
Hi Anthony welcome to the community- this is odd!
The thing is next node is not even queued
@Shivay Lamba can you run this example and let’s get a reproduction- seems odd
s

Shivay Lamba

09/28/2022, 2:27 PM
Sure, I will try to run this and share the result here
a

Anthony

09/28/2022, 2:46 PM
i cannot share due to PII a private dataset but i wanna test flyte on some real cases (not toy examples) and here im downsampled ~50k records for training. First
preproc_and_split
task has been executed successfully but I cannot pass its output to the next training task. I have 14 Gb mem in my local machine in total.
task_resource_defaults
conf is the next:
Copy code
task_resource_defaults.yaml: |
    task_resources:
      defaults:
        cpu: 1
        memory: 3000Mi
        storage: 200Mi
      limits:
        cpu: 5
        gpu: 1
        memory: 8Gi
        storage: 500Mi
y

Yee

09/28/2022, 9:12 PM
are all the tasks defined in the same file?
k

Ketan (kumare3)

09/29/2022, 1:46 PM
But shouldn’t that simply fail to load task
a

Anthony

09/29/2022, 4:48 PM
@Yee Yes all task are defined within one script (look at workflow.py) BTW I conduct the next experiments and got new odd behaviour. • Reinit flytectl sandbox . • Edit -->
kubectl -n flyte edit cm flyte-admin-base-config
• Edit -->
kubectl edit configmap -n flyte flyte-propeller-config
!!REMOVE task decorator for
preproc_and_split()
So only two task has been added to flow:
Copy code
@task(
    cache=False,
    cache_version=CACHE_VERSION,
    requests=request_resources,
    #limits=limit_resources,
)
def train_xgboost_clf(feat_cls: Fraud_Raw_PostProc_Data_Class,
                      target_cls: Fraud_Raw_Target_Data_Class,
                      xgb_params: dict,
                      cust_metric: BoostingCustMetric) -> JoblibSerializedFile:
and next one:
Copy code
@task(
    cache=False,
    cache_version=CACHE_VERSION,
    requests=request_resources,
    #limits=limit_resources,
)
def get_predictions(feat_cls: Fraud_Raw_PostProc_Data_Class,
                    target_cls: Fraud_Raw_Target_Data_Class,
                    model_path: JoblibSerializedFile,
                    thresh: float) -> Forecast:
🚀 Workflow:
Copy code
@workflow
def main_flow() -> Forecast:
    """
    Main Flyte WorkFlow consisting of three tasks:
        -  @preproc_and_split
        -  @train_xgboost_clf
        -  @get_predictions
    """
    <http://logger.info|logger.info>(log="#START -- START Raw Preprocessing and Splitting", timestamp=None)
    train_cls, target_cls = preproc_and_split()

    <http://logger.info|logger.info>(log="#START -- START Initialize Boosting Params", timestamp=None)
    saved_mpath = train_xgboost_clf(
                            feat_cls=train_cls,
                            target_cls=target_cls,
                            xgb_params=xgb_params,
                            cust_metric=BoostingCustMetric
                         )

    forecast = get_predictions(feat_cls=train_cls,
                               target_cls=target_cls,
                               model_path=saved_mpath,
                               thresh=THRESH
                               )

    # serialize_model(model=xgb_clf)


    return forecast
• Run workflow locally:
pyflyte run app/workflow.py main_flow
- SUCCESS Ok going further to
serialise
and `register`:
Copy code
pyflyte -c .flyte/sandbox.config --pkgs app package \
		--force \
		--in-container-source-path /root \
		--image anti-fraud-ml-train:c20b6ee57601c0ddf12874fe671603b5ed78ece8
pyflyte
first runs my function
preproc_and_split
that produces:
Copy code
<class 'pandas.core.frame.DataFrame'>, (53621, 46)
Here is a final log output for serialising all pb files: So we successfully serialised 4 flyte objects and packaged them.
Copy code
2022-09-29 16:37:04.830|anti-fraud-ml-train            ##END -- Feature Transformation Pipeline
2022-09-29 16:37:04.830|anti-fraud-ml-train            #START -- Train data info:
 <class 'pandas.core.frame.DataFrame'>, (53621, 46)
2022-09-29 16:37:04.879|anti-fraud-ml-train            #START -- START Initialize Boosting Params
{"asctime": "2022-09-29 17:37:20,155", "name": "flytekit", "levelname": "WARNING", "message": "Failed to extract schema for object <class 'fraud_preproc.model_dataclass.Forecast'>, (will run schemaless) error: unsupported field type <fields._TimestampField(dump_default=<marshmallow.missing>, attribute=None, validate=None, required=False, load_only=False, dump_only=False, load_default=<marshmallow.missing>, allow_none=False, error_messages={'required': 'Missing data for required field.', 'null': 'Field may not be null.', 'validator_failed': 'Invalid value.'})>If you have postponed annotations turned on (PEP 563) turn it off please. Postponedevaluation doesn't work with json dataclasses"}
Successfully serialized 4 flyte objects
  Packaging app.workflow.train_xgboost_clf -> 0_app.workflow.train_xgboost_clf_1.pb
  Packaging app.workflow.get_predictions -> 1_app.workflow.get_predictions_1.pb
  Packaging app.workflow.main_flow -> 2_app.workflow.main_flow_2.pb
  Packaging app.workflow.main_flow -> 3_app.workflow.main_flow_3.pb
Successfully packaged 4 flyte objects into /Users/aektov/pyflyte/anti-fraud-ml-train/flyte-package.tgz
Lets register all objects:
Copy code
flytectl -c .flyte/sandbox-config.yml \
		register files \
		--project flyte-anti-fraud-ml \
		--domain development \
		--archive flyte-package.tgz \
                --outputLocationPrefix <s3://my-s3-bucket/anti-fraud-ml-train> \
		--force \
		--version c20b6ee57601c0ddf12874fe671603b5ed78ece8
Oops we got a error:
Copy code
--------------------------------------------------------------- --------- --------------------------------------------------------
| NAME                                                          | STATUS  | ADDITIONAL INFO                                        |
 --------------------------------------------------------------- --------- --------------------------------------------------------
| /tmp/register2016944902/0_app.workflow.train_xgboost_clf_1.pb | Success | AlreadyExists                                          |
 --------------------------------------------------------------- --------- --------------------------------------------------------
| /tmp/register2016944902/1_app.workflow.get_predictions_1.pb   | Success | AlreadyExists                                          |
 --------------------------------------------------------------- --------- --------------------------------------------------------
| /tmp/register2016944902/2_app.workflow.main_flow_2.pb         | Failed  | Error registering file due to rpc error: code =        |
|                                                               |         | ResourceExhausted desc = grpc: received message larger |
|                                                               |         | than max (57552210 vs. 4194304)                        |
 --------------------------------------------------------------- --------- --------------------------------------------------------
3 rows
Error: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (57552210 vs. 4194304)
make: *** [register] Error 1
4194304 is similar to 4 * 1024 * 1024 and this is my mem limits for task set up in
flyte-admin-base-config
But i’m confused here then i saw a 57552210 ~ 54 Gb while trying to register a workflow proto file
/tmp/register2016944902/2_app.workflow.main_flow_2.pb
If you remember i can easily run my workflow locally and the max mem consumption is not more than 4Gb.
33 Views