GitHub
03/09/2023, 8:53 PMGitHub
03/09/2023, 9:10 PMGitHub
03/09/2023, 9:22 PMScreen Shot 2022-09-19 at 10 19 31 AM▾
GitHub
03/09/2023, 9:22 PMScreen Shot 2022-09-16 at 9 39 14 AM▾
Screen Shot 2022-09-16 at 9 39 03 AM▾
GitHub
03/09/2023, 9:24 PMScreen Shot 2022-09-13 at 9 06 17 AM▾
GitHub
03/09/2023, 9:37 PMGitHub
03/09/2023, 9:37 PMGitHub
03/09/2023, 9:38 PMGitHub
03/09/2023, 9:38 PMGitHub
03/09/2023, 9:40 PMGitHub
03/09/2023, 9:42 PM<http://localhost:8088/api/v1/project_domain_attributes/flytesnacks/staging?resource_type=6>
for example). If something is found, it shows this
image▾
<http://localhost:8088/api/v1/project_attributes/flytesnacks?resource_type=6>
This request is special because unlike the higher specificity requests (like the project-domain one above and project/domain/workflow/lp requests), this endpoint will also merge in and return settings from Admin's system level configuration. This is helpful for the user.
This ticket is to add a call to that endpoint, and merge in the existing call, with the existing project-domain call taking higher priority of course. Keep in mind that you should do a deep check on objects. That is, if the security context object is there, but both role and service account are empty strings, then don't use it ofc.
See flyteorg/flyte#2322 for the backend changes and more information, but keep in mind the information on there is also a bit confusing because there is a longer term project described in that issue.
flyteorg/flyteconsoleGitHub
03/09/2023, 9:42 PMGitHub
03/09/2023, 9:43 PMGitHub
03/09/2023, 9:44 PM[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[adbrq7s7x2kbwln2k5df-n0-0] terminated with exit code (1). Reason [Error]. Message:
a_worflows.py", line 211, in
def process_data_frames(lc_df: pd.DataFrame=None, data_frame: pd.DataFrame=None)->pd.DataFrame:
File "/usr/local/lib/python3.9/site-packages/flytekit/core/task.py", line 209, in task
return wrapper(_task_function)
File "/usr/local/lib/python3.9/site-packages/flytekit/core/task.py", line 193, in wrapper
task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))(
File "/usr/local/lib/python3.9/site-packages/flytekit/core/tracker.py", line 30, in call
o = super(InstanceTrackingMeta, cls).call(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/flytekit/core/python_function_task.py", line 118, in init
super().init(
File "/usr/local/lib/python3.9/site-packages/flytekit/core/python_auto_container.py", line 72, in init
super().init(
File "/usr/local/lib/python3.9/site-packages/flytekit/core/base_task.py", line 389, in init
interface=transform_interface_to_typed_interface(interface),
File "/usr/local/lib/python3.9/site-packages/flytekit/core/interface.py", line 210, in transform_interface_to_typed_interface
inputs_map = transform_variable_map(interface.inputs, input_descriptions)
File "/usr/local/lib/python3.9/site-packages/flytekit/core/interface.py", line 318, in transform_variable_map
res[k] = transform_type(v, descriptions.get(k, k))
File "/usr/local/lib/python3.9/site-packages/flytekit/core/interface.py", line 332, in transform_type
return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=description)
File "/usr/local/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 611, in to_literal_type
transformer = cls.get_transformer(python_type)
File "/usr/local/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 584, in get_transformer
raise ValueError(f"Generic Type {python_type.origin} not supported currently in Flytekit.")
ValueError: Generic Type typing.Union not supported currently in Flytekit.
the data frames has columns with strings, consequently the dtypes for these columns were object.
Would it be possible, for some one to let me know what is the problem or if I'm missing something? Of if this because, I'm running it in a Windows machine?
Thanks and Regards
flyteorg/flyteconsoleGitHub
03/09/2023, 9:45 PMGitHub
03/09/2023, 9:46 PMGitHub
03/09/2023, 9:47 PMLiteral
type) that the user will enter to resume that node. The inputs should be the same as the launch window (ie, rendering with a type and empty-text helpers) and should validate in the same manner as the launch form.
Additionally, we should show the node's input. The comps are currently out of date but will soon be updated to no longer click-to-show and instead will always ben shown as json (in the same manner we display node inputs in the node execution detail panel).
Screen Shot 2022-09-13 at 9 06 42 AM▾
Screen Shot 2022-09-13 at 9 14 24 AM▾
GitHub
03/09/2023, 9:48 PMImage▾
GitHub
03/09/2023, 9:51 PMGitHub
03/09/2023, 9:54 PMGitHub
03/09/2023, 9:55 PMrecent-runs▾
GitHub
03/09/2023, 9:58 PMv1.1.7
.
I believe the culprit to be launch plan references in workflows with at least one other sub-workflow called.
I've used the launchplan example from the Flyte docs to reproduce the issue.
The following workflow caused the second error message:
import calendar
import datetime
from flytekit import dynamic, task, workflow, LaunchPlan
@task
def greet(day_of_week: str, number: int, am: bool) -> str:
greeting = "Have a great " + day_of_week + " "
greeting += "morning" if am else "evening"
return greeting + "!" * number
@workflow
def go_greet(day_of_week: str, number: int, am: bool = False) -> str:
return greet(day_of_week=day_of_week, number=number, am=am)
morning_greeting = LaunchPlan.create(
"morning_greeting",
go_greet,
fixed_inputs={"am": True},
default_inputs={"number": 1},
)
@workflow
def test_flyteconsole_graph_launchplanref() -> None:
go_greet(day_of_week="Monday", number=1, am=True) # Important, workflow must have sub-workflows!
today = datetime.datetime.today()
for n in range(7):
day = today + datetime.timedelta(days=n)
weekday = calendar.day_name[day.weekday()]
if day.weekday() < 5:
print(morning_greeting(day_of_week=weekday))
else:
print(morning_greeting(number=3, day_of_week=weekday))
If I remove the first line from test_flyteconsole_graph_launchplanref
(and thus don't have any other sub-workflows), the graph renders just fine.
Our production workflow produces the following stacktrace:
TypeError: Cannot read properties of null (reading 'resourceType')
at t.checkIfObjectsAreSame (main-56c8d9de.js:1:349519)
at t.getSubWorkflowFromId (main-56c8d9de.js:1:350236)
at m (main-56c8d9de.js:1:347158)
at p (main-56c8d9de.js:1:347537)
at f (main-56c8d9de.js:1:348547)
at m (main-56c8d9de.js:1:347200)
at p (main-56c8d9de.js:1:347537)
at f (main-56c8d9de.js:1:348589)
at t.transformerWorkflowToDag (main-56c8d9de.js:1:348639)
at main-56c8d9de.js:1:345223
The actual stack trace displayed in my reproduction attempt leads to a slightly different place when accessing the graph:
TypeError: Cannot read properties of undefined (reading 'n0-0-n0')
at main-56c8d9de.js:1:344707
at t.WorkflowGraph (main-56c8d9de.js:1:344858)
at we (react-dom.production.min.js:84:293)
at zj (react-dom.production.min.js:226:496)
at Th (react-dom.production.min.js:152:223)
at tj (react-dom.production.min.js:152:152)
at Te (react-dom.production.min.js:146:151)
at react-dom.production.min.js:61:68
at unstable_runWithPriority (react.production.min.js:25:260)
at Da (react-dom.production.min.js:60:280)
Interestingly enough, the "Nodes" tab is already partially broken though:
error3
The stack trace printed when loading the nodes points to the same location as our production workflow.
It seems to me that parseNode
in components/WorkflowGraph/transformerWorkflowToDag.tsx
only seems to handle `subworkflowRefs`, but not launchplanRefs
, thus passing on null
as id
to `getSubWorkflowFromId` and subsequently `checkIfObjectsAreSame`.
The nodes causing the crash have no subworkflowRefs
, but a launchplanRef
instead, according to my hastily slapped on debug logging:
launchplanRef
If the workflow executed doesn't have any sub-workflows, we're not running into this issue as the comparison containing `checkIfObjectsAreSame` is not run at all.
I was able to "fix" this by having checkIfObjectsAreSame
check for `null`/`undefined` before its `for` loop:
export const checkIfObjectsAreSame = (a, b) => {
if ((!a && b) || (a && !b)) {
return false;
} else if (!a && !b) {
return true;
}
for (const k in a) {
if (a[k] !== b[k]) {
return false;
}
}
return true;
};
Above change "fixes" the graphs (and nodes view), however I feel like that's only halfway to solving the actual issue 😅
Would that additional check be enough in this case (if so, I can open a tiny PR, if you'd like), or must parseNode
handle the launchplanRefs
as well?
I'm happy to help with further testing/development if so desired, but would appreciate some pointers on how to best proceed 🙂
flyteorg/flyteGitHub
03/09/2023, 10:01 PMPhase
and PhaseVersion
information that each plugin reports. If this has not changed (based on these two field) then FltyePropeller immediately returns and bypasses sending a TaskExecutionEvent reporting the task state to FlyteAdmin.
The problem is that plugins may report new status with the same Phase
and PhaseVersion
but with an updated Reason
, the Reason
field contains information regarding task execution. One such scenario is where a k8s Pod requests more resources than are available within the cluster. The initial TaskExecutionEvent
will report a Reason
"task submitted to K8s". In a subsequent evaluation the plugin updates this Reason
to "Unschedulable:0/1 nodes are available: 1 Insufficient memory.", however this is never reported to FlyteAdmin because the Phase
and PhaseVersion
remain the same.
Expected behavior
The FlyteConsole UI should always display the latest status. This requires FlytePropeller to correctly identify an update and therefore to send an event to FlyteAdmin reporting the change.
Additional context to reproduce
Start a workflow with resource requests larger than what is available in the cluster. In the task status' pane in the UI it will always display the "RUNNING" phase with a reason of "task submitted to K8s".
@task(requests=Resources(mem="12G"))
def say_hello_resources(name: str) -> str:
return f"hello {name}"
@workflow
def my_wf_resources(name: str) -> str:
res = say_hello_resources(name=name)
return res
Screenshots
image▾
GitHub
03/09/2023, 10:17 PMGitHub
03/09/2023, 10:35 PMGitHub
03/09/2023, 11:22 PMpyflyte-map-execute
. it does two things before running the task.
1. Update sub task input / output interface to list interface
2. Append job id to output prefix
However, when using raw container, the entrypoint will not be pyflyte-map-execute
.
Therefore, we should update the output prefix for copilot, and support upload collection in the copilot.
Type
☐ Bug Fix
☑︎ Feature
☐ Plugin
Are all requirements met?
☐ Code completed
☑︎ Smoke tested
☐ Unit tests added
☐ Code documentation added
☐ Any pending items have an associated Issue
Complete description
image▾
GitHub
03/09/2023, 11:35 PMpyflyte-map-execute ....
Type
☑︎ Bug Fix
☐ Feature
☐ Plugin
Are all requirements met?
☑︎ Code completed
☑︎ Smoke tested
☐ Unit tests added
☐ Code documentation added
☐ Any pending items have an associated Issue
Complete description
image▾
import typing
from typing import List
from flytekit import map_task, task, workflow, ContainerTask, kwtypes
calculate_ellipse_area_shell = ContainerTask(
name="ellipse-area-metadata-python",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=int),
outputs=kwtypes(area=float),
image="pingsutw/raw-container:v9",
command=[
"python",
"test.py",
"{{.inputs.a}}",
"/var/outputs",
],
)
@task
def coalesce(b: List[str]) -> str:
coalesced = "".join(b)
return coalesced
@task
def g_l(n: int) -> List[int]:
res = []
for i in range(n):
res.append(i)
return res
@workflow
def wf(n: int = 2):
l = g_l(n=n)
map_task(calculate_ellipse_area_shell)(a=l)
if __name__ == "__main__":
result = wf()
• dockerfile
FROM python:3.10-slim-buster
WORKDIR /root
COPY *.py /root/
• test.py
import math
import sys
import os
def write_output(output_dir, output_file, v):
with open(f"{output_dir}/{output_file}", "w") as f:
f.write(str(v))
def calculate_area(a, b):
return math.pi * a * b
def main(a, output_dir):
# parse list
li = a.strip('][').split(',')
res = [eval(i) for i in li]
# get the job index
index = int(os.environ.get(os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME")))
area = calculate_area(res[index], 3)
write_output(output_dir, "area", f"{area}")
write_output(output_dir, "_SUCCESS", "")
if __name__ == "__main__":
a = sys.argv[1]
output_dir = sys.argv[2]
main(a, output_dir)
Tracking Issue
https://flyte-org.slack.com/archives/CP2HDHKE1/p1678230956906899
Follow-up issue
flyteorg/flytecopilot#54
flyteorg/flyteplugins#329
flyteorg/flytekit
✅ All checks have passed
30/30 successful checksGitHub
03/09/2023, 11:37 PM<https://github.com/flyteorg/flytekit/tree/master|master>
by wild-endeavor
<https://github.com/flyteorg/flytekit/commit/7c3c255655d25e70988028d8773b034c7519d6e8|7c3c2556>
- Prefer FLYTE_ prefixed AWS creds env vars (#1523)
flyteorg/flytekitGitHub
03/09/2023, 11:38 PMGitHub
03/09/2023, 11:40 PM<https://github.com/flyteorg/flytekit/tree/master|master>
by pingsutw
<https://github.com/flyteorg/flytekit/commit/aee20eac7d0d65874b5f3c04dc1a5ddfed591063|aee20eac>
- Filter out remote entity when generating pb (#1545)
flyteorg/flytekit