wonderful-apple-6051
12/27/2024, 9:03 AMk3d cluster create flyte-cluster -v $PWD/k3data:/data
After that I went ahead and I created a custom docker image that has flytekit
in it. my simple workflow look like this
clustering_container_name = "espymur/summarization-clustering:flyte-latest-2"
@task(container_image=clustering_container_name)
def generate_date(days_ago: int) -> str:
"""generate the date for the data pull"""
date = (datetime.now() - timedelta(days=days_ago)).strftime("%Y-%m-%d")
return date
The code for the workflow is here:
@workflow
def clustering_pipeline(
environment: str, days_ago: int = 0, embedding_model_id: str = "dunzhang/stella_en_400M_v5"
) -> str:
"""This is the end to end clustering pipeline"""
date = generate_date(days_ago=days_ago)
new_data = pull_data(environment=environment, date=date)
embedded_documents = compute_embeddings(
embedding_model_id=embedding_model_id, new_data=new_data
)
important_news_df = cluster_data(
embedded_documents=embedded_documents, new_data=new_data
)
data_path = save_data(environment=environment,
important_news_df=important_news_df, date=date)
return data_path
if __name__ == "__main__":
clustering_pipeline("local", 0, "dunzhang/stella_en_400M_v5")
I am not sharing the code for other task for simplicity.
When I run that pipeline I got the following error:
Traceback (most recent call last) ──────────────────────╮
│ /opt/venv/lib/python3.10/site-packages/flytekit/core/data_persistence.py:310 │
│ in get │
│ │
│ ❱ 310 │ │ │ │ dst = await file_system._get(from_path, to_path, recur │
│ │
│ /opt/venv/lib/python3.10/site-packages/fsspec/asyn.py:669 in _get │
│ │
│ ❱ 669 │ │ return await _run_coros_in_chunks( │
│ │
│ /opt/venv/lib/python3.10/site-packages/fsspec/asyn.py:268 in │
│ _run_coros_in_chunks │
│ │
│ ❱ 268 │ │ │ result, k = await done.pop() │
│ │
│ /opt/venv/lib/python3.10/site-packages/fsspec/asyn.py:245 in _run_coro │
│ │
│ ❱ 245 │ │ │ return await asyncio.wait_for(coro, timeout=timeout), i │
│ │
│ /usr/local/lib/python3.10/asyncio/tasks.py:408 in wait_for │
│ │
│ ❱ 408 │ │ return await fut │
│ │
│ /opt/venv/lib/python3.10/site-packages/fsspec/callbacks.py:81 in func │
│ │
│ ❱ 81 │ │ │ │ return await fn(path1, path2, callback=child, **kwargs │
│ │
│ /opt/venv/lib/python3.10/site-packages/s3fs/core.py:1355 in _get_file │
│ │
│ ❱ 1355 │ │ │ with open(lpath, "wb") as f0: │
╰──────────────────────────────────────────────────────────────────────────────╯
PermissionError: [Errno 13] Permission denied:
'/root/fast7e41e8718dd0f36659c79430be8d487d.tar.gz'
During handling of the above exception, another exception occurred:
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /opt/venv/lib/python3.10/site-packages/s3fs/core.py:734 in _lsdir │
│ │
│ ❱ 734 │ │ │ │ async for c in self._iterdir( │
│ │
│ /opt/venv/lib/python3.10/site-packages/s3fs/core.py:784 in _iterdir │
│ │
│ ❱ 784 │ │ async for i in it: │
│ │
│ /opt/venv/lib/python3.10/site-packages/aiobotocore/paginate.py:30 in │
│ __anext__ │
│ │
│ ❱ 30 │ │ │ response = await self._make_request(current_kwargs) │
│ │
│ /opt/venv/lib/python3.10/site-packages/aiobotocore/client.py:412 in │
│ _make_api_call │
│ │
│ ❱ 412 │ │ │ raise error_class(parsed_response, operation_name) │
╰──────────────────────────────────────────────────────────────────────────────╯
ClientError: An error occurred (AccessDenied) when calling the ListObjectsV2
operation: Access Denied.