https://flyte.org logo
#ask-the-community
Title
# ask-the-community
s

Stefan Avesand

05/24/2022, 1:17 PM
Good morning! We’re trying out Dynamic tasks for distributed computing/forecasting and notice it takes over 8 minutes for Flyte to kick off the first task. Is this normal? (@Kelsey Messer FYI)
k

Ketan (kumare3)

05/24/2022, 1:44 PM
Can you check the timeline view
s

Stefan Avesand

05/24/2022, 1:54 PM
message has been deleted
k

Ketan (kumare3)

05/24/2022, 1:56 PM
It seems the dynamic task itself was processing?
Cc @Eduardo Apolinario (eapolinario)
s

Stefan Avesand

05/24/2022, 2:01 PM
I don’t think so. As you can see in the Nodes view, some ~150 tasks as spun up. This is the mock example that we’re using to test dynamic tasks:
Copy code
@task
def forecast_task(sd: StructuredDataset) -> StructuredDataset:
    df = sd.open(pd.DataFrame).all()
    df = df.rename(columns={"date": "ds", "avg_subs": "y"})
    df["ds"] = pd.to_datetime(df["ds"])

    results = []
    for g, d in df.groupby(["country", "associate_type", "category", "subcategory"]):
        m = Prophet()
        m.fit(d)
        future = m.make_future_dataframe(periods=365)
        forecast = m.predict(future)
        forecast = forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]]
        forecast = forecast.merge(d, on="ds", how="left")
        results.append(forecast)

    forecast = pd.concat(results)
    res = StructuredDataset(dataframe=forecast)
    return res


@task
def forecast_reducer(results: typing.List[StructuredDataset]) -> StructuredDataset:
    df_array = [x.open(pd.DataFrame).all() for x in results]
    df = pd.concat(df_array)
    return StructuredDataset(dataframe=df)


@dynamic
def forecast_mapper(sd: StructuredDataset) -> StructuredDataset:
    df = sd.open(pd.DataFrame).all()

    results = []
    for g, d in df.groupby(["country"]):
        res = forecast_task(sd=StructuredDataset(dataframe=d))
        results.append(res)

    res = forecast_reducer(results=results)
    return res


@workflow
def forecast() -> StructuredDataset:
    subs = StructuredDataset(
        uri="<bq://sp-one-model.quarterly>_forecast_2022F1.subscribers_main_output"
    )
    sd = forecast_mapper(sd=subs)
    return sd
k

Ketan (kumare3)

05/24/2022, 2:36 PM
Interesting, is the group by talking time?, otherwise seems like some perf issue in the platform- cc @Dan Rammer (hamersaw)
s

Stefan Avesand

05/24/2022, 2:39 PM
The groupby is super quick. Maybe it’s the copying to S3/GCS that takes time? There are some 150 groups, and maybe the copying is done sequentially?
d

Dan Rammer (hamersaw)

05/24/2022, 2:48 PM
@Stefan Avesand how big is the data? Right now I do believe the copies are performed sequentially as you suggested, so that could certainly be the culprit. This is something we have discussed changing to async for obvious perf.
s

Stefan Avesand

05/24/2022, 4:50 PM
Any idea how to activate that flag from the flytectl command?
d

Dan Rammer (hamersaw)

05/24/2022, 5:13 PM
cc @Eduardo Apolinario (eapolinario) @Yuvraj
e

Eduardo Apolinario (eapolinario)

05/24/2022, 11:39 PM
@Stefan Avesand, can you double-check which version of flytekit you're using? In order to set that value you have a few options, including: 1. set an env var named
FLYTE_GCP_GSUTIL_PARALLELISM
to
true
in the docker container that contains your code. 2. Set the value in the config file present in your image under the
gcs
section, e.g. contents of `~/.flyte/config.yaml`:
Copy code
gcp:
  gsutil_parallelism: true
s

Stefan Avesand

05/25/2022, 12:58 AM
The config.yaml file change causes this error:
Copy code
strict mode is on but received keys [map[gcp:{}]] to decode with no config assigned to receive them: failed strict mode check
Trying with the env variable
Copy code
FLYTE_GCP_GSUTIL_PARALLELISM=true flytectl -c /workspaces/business-casing/config.yaml create execution -p business-casing -d development --execFile exec_spec.yaml --files.outputLocationPrefix <gs://business-casing-workflow-storage>
e

Eduardo Apolinario (eapolinario)

05/25/2022, 1:00 AM
oh wait, this config file (or env var) needs to be set in the docker container, not in the flytectl call
in the docker container that will be running your python code
s

Stefan Avesand

05/25/2022, 1:01 AM
Ohh, gotcha
k

Ketan (kumare3)

05/25/2022, 5:05 AM
@Stefan Avesand / @Eduardo Apolinario (eapolinario) / @Kevin Su I think this is a perfect usecase for adding additional details to Flytedecks for performance deepdive
s

Stefan Avesand

05/25/2022, 12:07 PM
That would be super valuable!
Fascinating… I added FLYTE_GCP_GSUTIL_PARALLELISM=true to the docker container and it resulted in the job taking 32 minutes instead of 22.
👀 2
e

Eduardo Apolinario (eapolinario)

05/25/2022, 5:04 PM
wow, that's interesting. Do you have visibility into the gcs side of things? Any logs on the container that could tell us about what's going on there?
s

Stefan Avesand

05/25/2022, 5:58 PM
The logs say nothing really 😞
70 Views