Good morning! We’re trying out Dynamic tasks for d...
# ask-the-community
s
Good morning! We’re trying out Dynamic tasks for distributed computing/forecasting and notice it takes over 8 minutes for Flyte to kick off the first task. Is this normal? (@Kelsey Messer FYI)
k
Can you check the timeline view
s
message has been deleted
k
It seems the dynamic task itself was processing?
Cc @Eduardo Apolinario (eapolinario)
s
I don’t think so. As you can see in the Nodes view, some ~150 tasks as spun up. This is the mock example that we’re using to test dynamic tasks:
Copy code
@task
def forecast_task(sd: StructuredDataset) -> StructuredDataset:
    df = sd.open(pd.DataFrame).all()
    df = df.rename(columns={"date": "ds", "avg_subs": "y"})
    df["ds"] = pd.to_datetime(df["ds"])

    results = []
    for g, d in df.groupby(["country", "associate_type", "category", "subcategory"]):
        m = Prophet()
        m.fit(d)
        future = m.make_future_dataframe(periods=365)
        forecast = m.predict(future)
        forecast = forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]]
        forecast = forecast.merge(d, on="ds", how="left")
        results.append(forecast)

    forecast = pd.concat(results)
    res = StructuredDataset(dataframe=forecast)
    return res


@task
def forecast_reducer(results: typing.List[StructuredDataset]) -> StructuredDataset:
    df_array = [x.open(pd.DataFrame).all() for x in results]
    df = pd.concat(df_array)
    return StructuredDataset(dataframe=df)


@dynamic
def forecast_mapper(sd: StructuredDataset) -> StructuredDataset:
    df = sd.open(pd.DataFrame).all()

    results = []
    for g, d in df.groupby(["country"]):
        res = forecast_task(sd=StructuredDataset(dataframe=d))
        results.append(res)

    res = forecast_reducer(results=results)
    return res


@workflow
def forecast() -> StructuredDataset:
    subs = StructuredDataset(
        uri="<bq://sp-one-model.quarterly>_forecast_2022F1.subscribers_main_output"
    )
    sd = forecast_mapper(sd=subs)
    return sd
k
Interesting, is the group by talking time?, otherwise seems like some perf issue in the platform- cc @Dan Rammer (hamersaw)
s
The groupby is super quick. Maybe it’s the copying to S3/GCS that takes time? There are some 150 groups, and maybe the copying is done sequentially?
d
@Stefan Avesand how big is the data? Right now I do believe the copies are performed sequentially as you suggested, so that could certainly be the culprit. This is something we have discussed changing to async for obvious perf.
s
Any idea how to activate that flag from the flytectl command?
d
cc @Eduardo Apolinario (eapolinario) @Yuvraj
e
@Stefan Avesand, can you double-check which version of flytekit you're using? In order to set that value you have a few options, including: 1. set an env var named
FLYTE_GCP_GSUTIL_PARALLELISM
to
true
in the docker container that contains your code. 2. Set the value in the config file present in your image under the
gcs
section, e.g. contents of `~/.flyte/config.yaml`:
Copy code
gcp:
  gsutil_parallelism: true
s
The config.yaml file change causes this error:
Copy code
strict mode is on but received keys [map[gcp:{}]] to decode with no config assigned to receive them: failed strict mode check
Trying with the env variable
Copy code
FLYTE_GCP_GSUTIL_PARALLELISM=true flytectl -c /workspaces/business-casing/config.yaml create execution -p business-casing -d development --execFile exec_spec.yaml --files.outputLocationPrefix <gs://business-casing-workflow-storage>
e
oh wait, this config file (or env var) needs to be set in the docker container, not in the flytectl call
in the docker container that will be running your python code
s
Ohh, gotcha
k
@Stefan Avesand / @Eduardo Apolinario (eapolinario) / @Kevin Su I think this is a perfect usecase for adding additional details to Flytedecks for performance deepdive
s
That would be super valuable!
Fascinating… I added FLYTE_GCP_GSUTIL_PARALLELISM=true to the docker container and it resulted in the job taking 32 minutes instead of 22.
👀 2
e
wow, that's interesting. Do you have visibility into the gcs side of things? Any logs on the container that could tell us about what's going on there?
s
The logs say nothing really 😞
201 Views