Haytham Abuelfutuh
03/02/2022, 3:05 PMMike Seid
03/02/2022, 3:07 PMFarman Pirzada
03/02/2022, 5:14 PMHaytham Abuelfutuh
03/02/2022, 5:30 PMMike Seid
03/02/2022, 7:02 PMimport os
import pathlib
import gcsfs
from flytekit import kwtypes, task
from flytekit.core.context_manager import FlyteContextManager
from flytekit.types.file import HTMLPage, PythonNotebook
from flytekitplugins.papermill import NotebookTask
notebook = NotebookTask(
name="run-notebook",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "train.ipynb"),
inputs=kwtypes(a=int, b=int, uri=str),
outputs=kwtypes(sum=int),
)
@task(cache_version="1", cache=True)
def store_notebook_run(
notebook: PythonNotebook, html: HTMLPage, output_uri: str
) -> str:
context = FlyteContextManager.current_context()
fs = gcsfs.GCSFileSystem()
fs.put(notebook.path, output_uri + "/notebook.ipynb")
fs.put(html.path, output_uri + "/notebook.html")
<http://context.user_space_params.logging.info|context.user_space_params.logging.info>("Writing files")
return output_uri
Haytham Abuelfutuh
03/02/2022, 9:58 PM@task(cache_version="1", cache=True)
def store_notebook_run(
notebook: PythonNotebook, html: HTMLPage, output_uri: str
) -> str:
html.download()
# use html.path
If you want the original remote path (“gcs:/….“) then you can use remote_source
: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.types.file.FlyteFile.html#flytekit.types.file.FlyteFile.remote_sourceMike Seid
03/03/2022, 2:31 PMValueError: Attempting to trigger download on non-downloadable file /tmp/flyte/20220302_145646/raw/a0bfeb4e5acae41e9c7b531ab4e295e5/train-out.ipynb
Ketan (kumare3)
03/03/2022, 6:41 PMYee
03/03/2022, 6:53 PMstore_notebook_run
@Mike Seid ? can you paste the workflow?Mike Seid
03/03/2022, 6:55 PMimport datetime
from flytekit import LaunchPlan, workflow
...
from mseid_schedule_notebook.tasks import notebook, store_notebook_run
@workflow
def train_notebook_workflow(
styx_parameter: datetime.datetime, hades_overwrite: bool, uri_prefix: str
) -> int:
....
# Use remote task to generate the storage uri
generated_uri = ...
result = notebook(a=100, b=200, uri=generated_uri)
stored_uri = store_notebook_run(
notebook=result.out_nb, html=result.out_rendered_nb, output_uri=generated_uri
)
return result.sum
lp_train_notebook = LaunchPlan.create(
"train_notebook_workflow",
train_notebook_workflow,
fixed_inputs={
"uri_prefix": "<gs://mseid-schedule-notebook-storage>",
},
default_inputs={
"hades_overwrite": False,
},
)
Yee
03/03/2022, 7:03 PMHaytham Abuelfutuh
03/03/2022, 7:03 PMMike Seid
03/03/2022, 7:04 PMYee
03/03/2022, 7:05 PMMike Seid
03/16/2022, 1:56 PM