cuddly-jelly-27016
01/10/2025, 10:36 AM--remote
flag, but produces a Maximum recursion depth exceeded
error when running with the flag.
workflows/pipeline.py
import datetime
from pathlib import Path
from time import sleep
import flytekit
from flytekit import task, workflow, FlyteDirectory
import polars as pl
@task()
def create_initial_dataset_task() -> pl.DataFrame:
df = pl.DataFrame(
{
"learner_int_id": [1, 1, 1, 1],
"content_int_id": [10, 20, 30, 40],
"score": [0.5, 0.6, 0.7, 0.8],
"outcome_int": [2, 0, 1, 1],
"play_rank": [1, 2, 3, 4],
}
)
return df
@task()
def create_second_dataset_task() -> pl.DataFrame:
df = pl.DataFrame(
{
"learner_int_id": [2, 2, 2, 2],
"content_int_id": [10, 20, 30, 40],
"score": [0.5, 0.6, 0.7, 0.8],
"outcome_int": [2, 0, 1, 1],
"play_rank": [1, 2, 3, 4],
}
)
return df
@task()
def combine_datasets_task(dataset_1: pl.DataFrame, dataset_2:pl.DataFrame) -> pl.DataFrame:
"""Combine datasets from a folder into a single parquet file."""
combined = pl.concat([dataset_1, dataset_2])
return combined
@task()
def train_model_task(dataset: pl.DataFrame) -> dict:
for i in range(10):
print(f"Epoch {i}...")
sleep(3)
print("Model trained!")
return {"status": "trained", "Other params": {"lr": 0.001, "drop-out": 0.5}}
@task()
def evaluate_model_task(model: dict) -> dict:
print("Testing model...")
lr = model["Other params"]["lr"]
for i in range(10):
print((i + 1) * lr)
return {"datetime": datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S"), "status": "evaluated",
"my-metric": {"lr": lr, "accuracy": 0.9}}
@task()
def create_folder(name: str) -> FlyteDirectory:
folder = Path(flytekit.current_context().working_directory) / name
folder.mkdir(parents=True, exist_ok=True)
return FlyteDirectory(path=str(folder))
@workflow
def pipeline() -> None:
ds1 = create_initial_dataset_task()
ds2 = create_second_dataset_task()
dataset = combine_datasets_task(ds1, ds2)
model = train_model_task(dataset)
metrics = evaluate_model_task(model)
if name == '__main__':
# folder = Path("data") / datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S")
pipeline()
Dockerfile
FROM python:3.12-slim-bookworm
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
# Install the project into /app
#WORKDIR /app
# Then, add the rest of the project source code and install it
# Installing separately from its dependencies allows optimal layer caching
COPY pyproject.toml pyproject.toml
COPY uv.lock uv.lock
RUN uv sync --frozen
# Place executables in the environment at the front of the path
# add /app/ in front for it to activate the environment
ENV PATH=".venv/bin:$PATH"
COPY src .
#COPY config.py .
COPY workflows .
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE=$tag
# Reset the entrypoint, don't invoke uv
# This seems to work though
ENTRYPOINT ["uv", "run"]
Traceback:
Trace:
```
Traceback (most recent call last):
File "/usr/local/lib/python3.12/pathlib.py", line 441, in str
return self._str
^^^^^^^^^
AttributeError: 'PosixPath' object has no attribute '_str'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/pathlib.py", line 555, in drive
return self._drv
^^^^^^^^^
AttributeError: 'PosixPath' object has no attribute '_drv'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/.venv/lib/python3.12/site-packages/flytekit/bin/entrypoint.py", line 164, in _dispatch_execute
task_def = load_task()
^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/bin/entrypoint.py", line 583, in load_task
return resolver_obj.load_task(loader_args=resolver_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/utils.py", line 312, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/python_auto_container.py", line 271, in load_task
task_module = importlib.import_module(name=task_module) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 999, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/workflows/pipeline.py", line 9, in <module>
@task()
^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/task.py", line 359, in wrapper
task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 82, in call
o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/python_function_task.py", line 139, in init
name, _, _, _ = extract_task_module(task_function)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 382, in extract_task_module
mod_name = get_full_module_path(mod, mod_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 391, in get_full_module_path
new_mod_name = _mod_sanitizer.get_absolute_module_name(inspect.getabsfile(mod), package_root)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 328, in get_absolute_module_name
return self._resolve_abs_module_name(path, package_root)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 318, in _resolve_abs_module_name
mod_name = self._resolve_abs_module_name(dirname, package_root)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 318, in _resolve_abs_module_name
mod_name = self._resolve_abs_module_name(dirname, package_root)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/flytekit/core/tracker.py", line 318, in _resolve_abs_module_name
mod_name = self._resolve_abs_module_name(dirname, package_root)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[Previous line repeated 964 more times]
File "/.venv/lib/python3.12/site-packag…
flyteorg/flytecuddly-jelly-27016
01/10/2025, 10:36 AM