limited-train-92032
05/24/2022, 1:17 PMfreezing-airport-6809
limited-train-92032
05/24/2022, 1:54 PMfreezing-airport-6809
freezing-airport-6809
limited-train-92032
05/24/2022, 2:01 PM@task
def forecast_task(sd: StructuredDataset) -> StructuredDataset:
df = sd.open(pd.DataFrame).all()
df = df.rename(columns={"date": "ds", "avg_subs": "y"})
df["ds"] = pd.to_datetime(df["ds"])
results = []
for g, d in df.groupby(["country", "associate_type", "category", "subcategory"]):
m = Prophet()
m.fit(d)
future = m.make_future_dataframe(periods=365)
forecast = m.predict(future)
forecast = forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]]
forecast = forecast.merge(d, on="ds", how="left")
results.append(forecast)
forecast = pd.concat(results)
res = StructuredDataset(dataframe=forecast)
return res
@task
def forecast_reducer(results: typing.List[StructuredDataset]) -> StructuredDataset:
df_array = [x.open(pd.DataFrame).all() for x in results]
df = pd.concat(df_array)
return StructuredDataset(dataframe=df)
@dynamic
def forecast_mapper(sd: StructuredDataset) -> StructuredDataset:
df = sd.open(pd.DataFrame).all()
results = []
for g, d in df.groupby(["country"]):
res = forecast_task(sd=StructuredDataset(dataframe=d))
results.append(res)
res = forecast_reducer(results=results)
return res
@workflow
def forecast() -> StructuredDataset:
subs = StructuredDataset(
uri="<bq://sp-one-model.quarterly>_forecast_2022F1.subscribers_main_output"
)
sd = forecast_mapper(sd=subs)
return sd
freezing-airport-6809
limited-train-92032
05/24/2022, 2:39 PMhallowed-mouse-14616
05/24/2022, 2:48 PMlimited-train-92032
05/24/2022, 4:50 PMlimited-train-92032
05/24/2022, 4:57 PMhallowed-mouse-14616
05/24/2022, 5:13 PMhigh-accountant-32689
05/24/2022, 11:39 PMFLYTE_GCP_GSUTIL_PARALLELISM
to true
in the docker container that contains your code.
2. Set the value in the config file present in your image under the gcs
section, e.g. contents of `~/.flyte/config.yaml`:
gcp:
gsutil_parallelism: true
limited-train-92032
05/25/2022, 12:58 AMstrict mode is on but received keys [map[gcp:{}]] to decode with no config assigned to receive them: failed strict mode check
limited-train-92032
05/25/2022, 12:58 AMFLYTE_GCP_GSUTIL_PARALLELISM=true flytectl -c /workspaces/business-casing/config.yaml create execution -p business-casing -d development --execFile exec_spec.yaml --files.outputLocationPrefix <gs://business-casing-workflow-storage>
high-accountant-32689
05/25/2022, 1:00 AMhigh-accountant-32689
05/25/2022, 1:01 AMlimited-train-92032
05/25/2022, 1:01 AMfreezing-airport-6809
limited-train-92032
05/25/2022, 12:07 PMlimited-train-92032
05/25/2022, 4:02 PMhigh-accountant-32689
05/25/2022, 5:04 PMlimited-train-92032
05/25/2022, 5:58 PM