Louis DiNatale
12/08/2022, 4:55 PMarray size > max allowed for cache lookup. requested [15434]. writer allowed [10000] reader allowed [10000]
Evgeny Romanov
12/09/2022, 9:21 AMvarsha Parthasarathy
12/11/2022, 3:51 AMAdrian Garcia Badaracco
12/12/2022, 1:45 AMNischel Kandru (Woven Planet)
12/12/2022, 3:01 AMtask_executions
table in flyte AdminDb
?
1. Can you please let me know when are started_at
and duration
set to null
?
2. I am of the impression that { created_at
and task_execution_created_at
}, { updated_at
and task_execution_updated_at
} should be the same. Can you please confirm this?
cc @Yee
@Alex Pozimenko (FYI)Tom Stokes
12/12/2022, 1:38 PM$ docker ps
>>>
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
dbf8f5dcb150 <http://cr.flyte.org/flyteorg/flyte-sandbox:dind-bfa1dd4e6057b6fc16272579d61df7b1832b96a7|cr.flyte.org/flyteorg/flyte-sandbox:dind-bfa1dd4e6057b6fc16272579d61df7b1832b96a7> "tini flyte-entrypoi…" About an hour ago Up About an hour 0.0.0.0:30081-30082->30081-30082/tcp, 0.0.0.0:30084->30084/tcp, 2375-2376/tcp, 0.0.0.0:30086-30088->30086-30088/tcp flyte-sandbox
From which we can then find the images that exist inside the dbf8f5dcb150
container:
$ docker exec -it dbf8f5dcb150 docker image ls
>>>
REPOSITORY TAG IMAGE ID CREATED SIZE
papermill-exploration latest 3c40c6deb126 23 minutes ago 948MB
...
I can see my project in there under the tag papermill-exploration:latest
.
I then serialize and submit my workflow spec as follows:
pyflyte --pkgs workflows package -f --image "papermill-exploration:latest"
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v2
All of which works:
$ flytectl get workflows --project flytesnacks --domain development
>>>
--------- ------------------------------------ -----------------------------
| VERSION | NAME | CREATED AT |
--------- ------------------------------------ -----------------------------
| v2 | workflows.workflow.nb_to_python_wf | 2022-12-12T12:41:53.987960Z |
--------- ------------------------------------ -----------------------------
| v1 | workflows.workflow.nb_to_python_wf | 2022-12-12T12:33:08.295661Z |
--------- ------------------------------------ -----------------------------
2 rows
I then attempt to invoke the workflow, but the resulting pod cannot pull the image:
$ flytectl get execution --project flytesnacks --domain development azlfqvzfsbz4lr8pbmlt
>>>
---------------------- ------------------------------------ ------------- -------- ---------------- -------------------------------- --------------- -------------------- ---------------------------------------------------------
| NAME | LAUNCH PLAN NAME | TYPE | PHASE | SCHEDULED TIME | STARTED | ELAPSED TIME | ABORT DATA (TRUNC) | ERROR DATA (TRUNC) |
---------------------- ------------------------------------ ------------- -------- ---------------- -------------------------------- --------------- -------------------- ---------------------------------------------------------
| azlfqvzfsbz4lr8pbmlt | workflows.workflow.nb_to_python_wf | LAUNCH_PLAN | FAILED | | 2022-12-12T13:07:23.548693519Z | 23.161600293s | | [1/1] currentAttempt done. Last Error: USER::containers |
| | | | | | | | | with unready status: [azlfqvzfsbz4lr8pbmlt-n |
---------------------- ------------------------------------ ------------- -------- ---------------- -------------------------------- --------------- -------------------- ---------------------------------------------------------
1 rows
$ docker exec -it dbf8f5dcb150 kubectl -n flytesnacks-development describe pod azlfqvzfsbz4lr8pbmlt-n0-0
>>>
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 27m default-scheduler Successfully assigned flytesnacks-development/azlfqvzfsbz4lr8pbmlt-n0-0 to dbf8f5dcb150
Normal Pulling 25m (x4 over 27m) kubelet Pulling image "papermill-exploration:latest"
Warning Failed 25m (x4 over 27m) kubelet Failed to pull image "papermill-exploration:latest": rpc error: code = Unknown desc = Error response from daemon: pull access denied for papermill-exploration, repository does not exist or may require 'docker login': denied: requested access to the resource is denied
Warning Failed 25m (x4 over 27m) kubelet Error: ErrImagePull
Warning Failed 25m (x6 over 27m) kubelet Error: ImagePullBackOff
Normal BackOff 2m22s (x106 over 27m) kubelet Back-off pulling image "papermill-exploration:latest"
Have I missed something here? Are the pods not authenticated against the docker repo? Or am I not specifying my images correctly?George D. Torres
12/12/2022, 3:02 PMn0 -> taskA
, n1 -> taskB
, etchonnix
12/12/2022, 4:08 PMRuksana Kabealo
12/12/2022, 7:03 PMTarmily Wen
12/12/2022, 11:32 PM@dynamic
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
return pl_mnist_train_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
seunggs
12/13/2022, 2:29 AMAdrian Rumpold
12/13/2022, 12:07 PMFlyteRemote
or similar (i.e., flytekit.clients.friendly
) API client instance when running in a container?
I have seen Config.auto()
, but it needs a config file to do its magic. Also, there's FlyteContext.flyte_client
, which looks promising but is set to None
, when I try to look it up on a context obtained via FlyteContextManager.current_context()
at runtime.
Any pointers are highly appreciated! 🙏Jay Ganbat
12/13/2022, 5:37 PMseunggs
12/13/2022, 11:25 PMRupsha Chaudhuri
12/14/2022, 6:09 PMTarmily Wen
12/14/2022, 10:49 PMLaura Lin
12/14/2022, 11:22 PMLaura Lin
12/15/2022, 1:11 AMKyle B
12/15/2022, 5:21 AM--net
flag to the flyte execution so that it can network to other containers using their name(e.g. https://<dockercontainername>:5000
) instead of the ip? Is there a better way of setting up mlflow instead of it having its own docker container that Is manually started using docker run
?varsha Parthasarathy
12/15/2022, 12:08 PMLouis DiNatale
12/15/2022, 4:34 PMFailed to check the existence of deck file.
Jay Ganbat
12/15/2022, 10:09 PMThomas Kobber Panum
12/16/2022, 10:35 AMstorage.type: custom
and storage.custom: {}
to make Stow use Azure Blob Storage (non-S3 compatible). Does anyone have an example of using storage.type: custom
correctly?Muthukrishnan Ramya
12/16/2022, 2:55 PMKetan (kumare3)
12/17/2022, 8:07 AMBernhard Stadlbauer
12/19/2022, 12:35 PMflytekit
on conda-forge
(link) is 1.1.0
.
From what it looks like, most of what this would need is merging the associated PRs in the flytekit-feedstock.
Is someone maintaining this at the moment? If not, happy to help here.
cc @Sugato RayKatrina P
12/19/2022, 3:04 PMpyflyte package
and flytectl register
Successfully serialized 3 flyte objects
Packaging app_module.workflow.extract_emails -> 0_app_module.workflow.xx_1.pb
Packaging <http://app_module.workflow.cx|app_module.workflow.cx> -> 1_app_module.workflow.a_2.pb
Packaging <http://app_module.workflow.cx|app_module.workflow.cx> -> 2_app_module.workflow.a_3.pb
Successfully packaged 3 flyte objects into /root/flyte-package.tgz
tar: Removing leading `/' from member names
drwx------ root/root 0 2022-12-19 14:39 /
-rw-r--r-- root/root 6109 2022-12-19 14:39 0_app_module.workflow.xx_1.pb
-rw-r--r-- root/root 217 2022-12-19 14:39 1_app_module.workflow.a_2.pb
-rw-r--r-- root/root 194 2022-12-19 14:39 2_app_module.workflow.a_3.pb
Error: input package have some invalid files. try to run pyflyte package again [flyte-package.tgz]
{"json":{},"level":"error","msg":"input package have some invalid files. try to run pyflyte package again [flyte-package.tgz]","ts":"2022-12-19T14:39:46Z"}
Louis DiNatale
12/19/2022, 4:54 PM@task(
task_config=Pod(
pod_spec=V1PodSpec(
node_selector={'node_group': 'memory'},
),
),
)
David Cupp
12/19/2022, 10:33 PMDavid Cupp
12/20/2022, 4:24 PMget_dag( job_input ) -> DAG of task inputs
In Flyte, I need to read the response, which is a DAG represented as an adjacency list. The directed edges say which tasks need to be executed before which other tasks, and each node in the graph has a payload which is the input to the task.
This get_dag()
call happens at runtime, so I understand from [1] that I can do this:
@dynamic
start_workflow(...) -> ...:
dag = get_dag(...)
???
@workflow
def my_workflow(...) -> None
return start_workflow(...)
but I can't figure out how to use "imperative workflows"[2] at the same time. How do I get a `
wf = Workflow(name="my.imperative.workflow.example")
?
I did find the dynamic workflow in the API docs: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html
But how do I use it? Do I need to create a "sub workflow" or is there a more direct way?
[1] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/control_flow/dynamics.html
[2] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/flyte_basics/imperative_wf_style.htmlDavid Cupp
12/20/2022, 4:24 PMget_dag( job_input ) -> DAG of task inputs
In Flyte, I need to read the response, which is a DAG represented as an adjacency list. The directed edges say which tasks need to be executed before which other tasks, and each node in the graph has a payload which is the input to the task.
This get_dag()
call happens at runtime, so I understand from [1] that I can do this:
@dynamic
start_workflow(...) -> ...:
dag = get_dag(...)
???
@workflow
def my_workflow(...) -> None
return start_workflow(...)
but I can't figure out how to use "imperative workflows"[2] at the same time. How do I get a `
wf = Workflow(name="my.imperative.workflow.example")
?
I did find the dynamic workflow in the API docs: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html
But how do I use it? Do I need to create a "sub workflow" or is there a more direct way?
[1] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/control_flow/dynamics.html
[2] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/flyte_basics/imperative_wf_style.html74 def _run_task(task_id, adj_list, submitted_tasks):
75 if task_id in submitted_tasks:
76 # we already ran it
77 return submitted_tasks[task_id]
78
79 mylist = adj_list.get((int(task_id)))
80 dep_ids = []
81 if mylist is not None:
82 dep_ids = list(adj_list[int(task_id)])
83
84 markers = []
85 for dep_id in dep_ids:
86 if dep_id in submitted_tasks:
87 markers.append(submitted_tasks[dep_id])
88 else:
89 markers.append(_run_task(dep_id, adj_list, submitted_tas
90
91 marker = run_task(task_input="TODO", markers=markers)
92 submitted_tasks[task_id] = marker
93 return submitted_tasks[task_id]
94
95 @dynamic
96 def wf4(n: int = 5) -> None:
97
98 nodes, adj_list = get_dag()
99 submitted_tasks = {}
100 for node_id in nodes:
101 _run_task(node_id, adj_list, submitted_tasks)
Yee
12/20/2022, 10:03 PMDavid Cupp
12/20/2022, 10:56 PMto run generic workflows or somethingThis is actually exactly what I want to do. My company has a lot of critical jobs that run as DAGs and our existing system has many flaws including unreliability and a lack of scalability. These jobs are currently owned by many different teams. This requirement about fully dynamic dags came from one of the learnings of that system. Right now the current system uses a plugin model where multiple teams all own code in the same process. This is terrible because: 1. every team needs to understand how the DAG execution system works 2. no team actually understands how it works; the first time someone sees the code they own is at 3am during an incident. Instead, the team that keeps the system online becomes the de facto owner of everyones code. So, for the brand new world of scheduling, I have instilled 1 requirement: no business logic in the scheduling/DAG execution system. ALL business logic must live in (preferably stateless) services outside the DAG execution system: 1. the definition of jobs comes from RPC methods like
getJobs()
2. the definition of a task (which we have always been able to calculate at runtime) comes from an RPC method getDAG()
3. the actual business logic to run a task lives in an RPC method runTask()
So the requirement to keep all business logic out of the scheduler is something that I am trying to impose.
The requirement that DAGs must be calculated at runtime comes from the fact that our jobs already run this way -- we can compromise on this but we would have to rewrite many jobs completely with the new constraint.getDag()
and passed without modification directly to runTask()
.
The serialization mechanism is up to the people who implement getDag()
and runTask()
-- I provide a version field to allow people to change serialization mechanisms in a backwards-compatible way.Yee
12/20/2022, 11:04 PMDavid Cupp
12/20/2022, 11:07 PM