Hello, I am using dynamic to kick off parallel XGB...
# ask-the-community
f
Hello, I am using dynamic to kick off parallel XGBSE training tasks. For very small dataset, it works fine. However, with large dataset, the xgbse_training() will be in [ContainersNotReady|ContainerCreating]: containers with unready status: [arlhbbz2jt56sq7fs2hj-n2-0-dn0-0]| for hours. If I am not using the dynamic, then the xgbse_training task will succeed for the same datasize. Do you knwo what might be going on? Thank you!
Copy code
@dynamic(
    requests=Resources(cpu="4", mem="20Gi"),
)
def train_foreach_tenure_small(
    df: pd.DataFrame,
) -> None:
    
    for tenure in range(1, 3, 1):
        data = df[df['TENURE']==tenure]
        xgbse_training(df)


@task(requests=Resources(mem="5Gi"))
def xgbse_training()
p
Might be worth running
kubectl describe
on the pod mentioned in the status message to see what the last event was?
k
Events are showing as none.
Copy code
]$ kubectl describe pod af49sl86c8p8kc2qqrh8-n2-0-dn0-0 -n marketing-development
Name:         af49sl86c8p8kc2qqrh8-n2-0-dn0-0
Namespace:    marketing-development
Priority:     0
Node:         ip-10-69-46-24.ec2.internal/10.69.46.24
Start Time:   Mon, 10 Apr 2023 07:51:35 +0000
Labels:       domain=development
              execution-id=af49sl86c8p8kc2qqrh8
              interruptible=false
              node-id=dn0
              project=marketing
              shard-key=3
              task-name=train-monthly-ltv-train-wf-xgbse-training
              workflow-name=train-monthly-ltv-train-wf-wf-train-small
Annotations:  <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: false
              <http://kubernetes.io/psp|kubernetes.io/psp>: eks.privileged
Status:       Running
IP:           10.69.47.41
IPs:
  IP:           10.69.47.41
Controlled By:  flyteworkflow/af49sl86c8p8kc2qqrh8
Containers:
  af49sl86c8p8kc2qqrh8-n2-0-dn0-0:
    Container ID:  <docker://85c96fc54a94dc53944482c7091c802f01948f4f2a604c2b33e676074ad3c24>8
    Image:         <http://613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-spark-root-user:v-5|613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-spark-root-user:v-5>
    Image ID:      <docker-pullable://613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-spark-root-user@sha256:f7331f453275fddba83e6793ca16179f04c2dcebd6ed7ed07dc37ffc0a845aee>
    Port:          <none>
    Host Port:     <none>
    Args:
      pyflyte-fast-execute
      --additional-distribution
      <s3://dev-wm-max-ml-flyte-us-east-1/6y/marketing/development/7NFZOXKPRBVVDC7QIGQGPL5J4A======/fastdf2af7949e4597d1ee1840f9322a9993.tar.gz>
      --dest-dir
      /root
      --
      pyflyte-execute
      --inputs
      <s3://dev-wm-max-ml-flyte-us-east-1/metadata/propeller/marketing-development-af49sl86c8p8kc2qqrh8/n2/data/0/dn0/inputs.pb>
      --output-prefix
      <s3://dev-wm-max-ml-flyte-us-east-1/metadata/propeller/marketing-development-af49sl86c8p8kc2qqrh8/n2/data/0/dn0/0>
      --raw-output-data-prefix
      <s3://dev-wm-max-ml-flyte-us-east-1/m3/af49sl86c8p8kc2qqrh8-n2-0-dn0-0>
      --checkpoint-path
      <s3://dev-wm-max-ml-flyte-us-east-1/m3/af49sl86c8p8kc2qqrh8-n2-0-dn0-0/_flytecheckpoints>
      --prev-checkpoint
      ""
      --resolver
      flytekit.core.python_auto_container.default_task_resolver
      --
      task-module
      train.monthly_ltv_train_wf
      task-name
      xgbse_training
    State:          Running
      Started:      Mon, 10 Apr 2023 07:51:36 +0000
    Ready:          True
    Restart Count:  0
    Limits:
      cpu:     2
      memory:  40Gi
    Requests:
      cpu:     2
      memory:  40Gi
    Environment:
      FLYTE_INTERNAL_EXECUTION_WORKFLOW:  marketing:development:train.monthly_ltv_train_wf.wf_train_small
      FLYTE_INTERNAL_EXECUTION_ID:        af49sl86c8p8kc2qqrh8
      FLYTE_INTERNAL_EXECUTION_PROJECT:   marketing
      FLYTE_INTERNAL_EXECUTION_DOMAIN:    development
      FLYTE_ATTEMPT_NUMBER:               0
      FLYTE_INTERNAL_TASK_PROJECT:        marketing
      FLYTE_INTERNAL_TASK_DOMAIN:         development
      FLYTE_INTERNAL_TASK_NAME:           train.monthly_ltv_train_wf.xgbse_training
      FLYTE_INTERNAL_TASK_VERSION:        dW-H1Ra7T_2-SM6A3K-nkg==
      FLYTE_INTERNAL_PROJECT:             marketing
      FLYTE_INTERNAL_DOMAIN:              development
      FLYTE_INTERNAL_NAME:                train.monthly_ltv_train_wf.xgbse_training
      FLYTE_INTERNAL_VERSION:             dW-H1Ra7T_2-SM6A3K-nkg==
      DEFAULT_ENV_VAR:                    VALUE
      MY_NAME:                            KARTHIKRAJ
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-bcpld (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             True
  ContainersReady   True
  PodScheduled      True
Volumes:
  kube-api-access-bcpld:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
QoS Class:                   Guaranteed
Node-Selectors:              <none>
Tolerations:                 <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
                             <http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
Events:                      <none>
Hi @Pryce , Any idea What might be causing this
p
@karthikraj tough to say, I'm very much a novice myself 😅 are the large and small datasets coming from the same place? Wondering if it's a permissions issue maybe?
b
Check your flytepropeller instance as well. Dynamic nodes are executed on flytepropeller which might need to pull the data you are passing between tasks. If the data is too large but flytepropeller does not have the resources to fit the data in memory, there might be an issue there.
i think it makes sense to not pass large datasets as inputs/outputs of tasks, to simplify a bit the metadata handling of the system. If you could use a bucket/client to read write somewhere else it's worth a try
k
@Frank Shen Could you please confirm if we are trying to pass huge datasets between tasks?
b
By Large dataset, i mean the dataframe pass to
xgbse_training(df)
f
@Babis Kiosidis, I see. I think you nailed the issue. Thanks a lot! CC: @karthikraj
@Babis Kiosidis, regarding your comment below, is it meant for tasks in general, or tasks managed by dynamic? I have a lot of workflows and I am passing large pandas DF data between them right now.
Copy code
i think it makes sense to not pass large datasets as inputs/outputs of tasks, to simplify a bit the metadata handling of the system.
b
Yeah actually I think that this behaviour is the same for all types of tasks. You should be able to verify yourself from the Flyteconsole UI. Navigate to the Execution page and open a task. The Inputs tab on the task will contain the exact input values that are passed from flytepropeller. But maybe dataframes should be serialized/deserialized and stored in a bucket from inside the docker container with flytekit, and not be loaded by flytepropeller? wdyt @Ketan (kumare3)?
k
@Babis Kiosidis dataframes / files and directories are automatically stored in the bucket - they never enter propeller - for safety and security
So pass much data as you want
Folks pass terabytes
b
is this the same behaviour for dynamic workflows?
p
@Frank Shen what does the code look like for the much larger dataset? Is the iterator in the
range()
function above what's significantly larger? If that's the case then the delay with the larger dataset may be caused from dynamically making a dag with many many task nodes. Perhaps refactoring to a map task may be helpful.
f
@Pryce, not the range() getting bigger. It’s the dataframe being passed to each parallel task getting much bigger.
150 Views