GitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMstring cron_expression
one so that sdk users can eventually move to using the richer CronSchedule
message.
Goal: What should the final outcome look like, ideally?
FlyteAdmin should support reading from the CronSchedule
field in Schedule messages and the sdk should expose this to end users to configure (and default to 0 as the offset always given that it is used specifically by Styx).
Describe alternatives you've considered
the current impl
Flyte component
☐ Overall
☐ Flyte Setup and Installation scripts
☐ Flyte Documentation
☐ Flyte communication (slack/email etc)
☐ FlytePropeller
☑︎ FlyteIDL (Flyte specification language)
☐ Flytekit (Python SDK)
☑︎ FlyteAdmin (Control Plane service)
☐ FlytePlugins
☐ DataCatalog
☐ FlyteStdlib (common libraries)
☐ FlyteConsole (UI)
☐ Other
[Optional] Propose: Link/Inline
If you have ideas about the implementation please propose the change. If inline keep it short, if larger then you link to an external document.
Additional context
Related #529
Is this a blocker for you to adopt Flyte
N/A
flyteorg/flyteGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMPod reported success despite being OOMKilled
, when a process in the task pod was OOMKilled but not pyflyte so task still completes. Reporting success despite a bad condition like OOMKilled is confusing - did the task succeed or not? What were the memory request/limits set? Which process was OOMKilled?
• containers with unready status: [execution_id]|context deadline exceeded
, generally when a container image cannot be pulled from the registry. This is a combination of an unclear k8s message (why is the container unready?) and a Go specific error that a general user wouldn't understand (context deadline exceeded, i.e. timed out waiting). What should the user do?
• [3/3] currentAttempt done. Last Error: SYSTEM::object [execution_id] terminated in the background, manually
, generally when the task pod was on an instance that was spot pre-empted or otherwise removed. What does 'currentAttempt done' mean, what does 'terminated in the background, manually' mean - what manually terminated it? This error is actually benign in most cases because the retry should succeed, but the message gives no indication of that
Goal: What should the final outcome look like, ideally?
The error message should strike a balance between -
• being understandable for the general user (little k8s knowledge)
• contain enough info for an engineer to troubleshoot
• be actionable without being too prescriptive and eliminating other causes
For example, the first error message above could be -
Task ended with one or more processes killed due to lack of memory. Current memory request is X, limit is Y. Process killed was Z. Consider increasing the memory request/limit of this task.
Alternatively, maybe an error code that a user can look up elsewhere with more info would be a better way to keep the necessary k8s detail in one place, and the general user explanation in another.
Flyte component
☐ Overall
☐ Flyte Setup and Installation scripts
☑︎ Flyte Documentation
☐ Flyte communication (slack/email etc)
☐ FlytePropeller
☐ FlyteIDL (Flyte specification language)
☐ Flytekit (Python SDK)
☐ FlyteAdmin (Control Plane service)
☑︎ FlytePlugins
☐ DataCatalog
☐ FlyteStdlib (common libraries)
☑︎ FlyteConsole (UI)
☐ Other
Additional context
See https://flyte-org.slack.com/archives/CNMKCU6FR/p1611329101012800 for some further discussion. Also seems like #512 and #535 are similar issues.
Is this a blocker for you to adopt Flyte
Nope, already a user :)
flyteorg/flyteGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AM@workflow
def my_wf():
# pull data from datacatalog
a = datacatalog.query(tag="abc")
x = my_task(input=a)
# push data to datacatalog
datacatalog.publish(dataset=x, tag="xyz")
FlytePlugins WebAPI
AWS Athena Plugin
Flyte component
☐ Overall
☐ Flyte Setup and Installation scripts
☐ Flyte Documentation
☐ Flyte communication (slack/email etc)
☐ FlytePropeller
☐ FlyteIDL (Flyte specification language)
☑︎ Flytekit (Python SDK)
☐ FlyteAdmin (Control Plane service)
☑︎ FlytePlugins
☐ DataCatalog
☐ FlyteStdlib (common libraries)
☐ FlyteConsole (UI)
☐ Other
flyteorg/flyteGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMkubectl explain deployment
KIND: Deployment
VERSION: apps/v1
DESCRIPTION:
Deployment enables declarative updates for Pods and ReplicaSets.
FIELDS:
apiVersion <string>
APIVersion defines the versioned schema of this representation of an
object. Servers should convert recognized schemas to the latest internal
value, and may reject unrecognized values. More info:
<https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources>
kind <string>
Kind is a string value representing the REST resource this object
represents. Servers may infer this from the endpoint the client submits
requests to. Cannot be updated. In CamelCase. More info:
<https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds>
metadata <Object>
Standard object metadata.
spec <Object>
Specification of the desired behavior of the Deployment.
status <Object>
Most recently observed status of the Deployment.
flyteorg/flyteGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMLaunchPlan
class does the patching, overwriting the execute function effectively, it doesn't work on launch plans today (which don't have an execute
function because *call*'s are dispatched to the underlying workflow). Can you try to think of a way around this?
Also as part of this, let's rename the task_mock
function as well, it's already a misnomer because it's already capable of patching workflows. Think of an appropriate name, but also, let's keep the task_mock
alias in flytekit/__init__.py
so that we don't break existing users.
flyteorg/flyteGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMGitHub
09/02/2023, 12:38 AMimport pytest
from mock import patch as _system_patch
from flytekit.core.task import task
from flytekit.core.testing import patch as flyte_patch
from flytekit.core.workflow import ImperativeWorkflow, workflow
@task
def t1(a: str) -> str:
return a + " world"
wb = ImperativeWorkflow(name="my.workflow")
wb.add_workflow_input("in1", str)
node = wb.add_entity(t1, a=wb.inputs["in1"])
wb.add_workflow_output("from_n0t1", node.outputs["o0"])
@_system_patch("flytekit.core.workflow.ImperativeWorkflow.execute")
def test_return_none_errors(mock_execute):
mock_execute.return_value = None
with pytest.raises(Exception):
wb(in1="hello")
@flyte_patch(wb)
def test_imperative_patching(mock_wb):
mock_wb.return_value = "hi"
@workflow
def my_functional_wf(a: str) -> str:
x = wb(in1=a)
return x
assert my_functional_wf(a="hello") == "hi"
It is unclear what is happening. Somehow, the flyte_patch
patching logic is interfering with the normal mock.patch
. The mere presence of the second test, basically makes it seem like the mock.patch
isn't there on the first test. Basically ImperativeWorkflow.execute
is not patched and returns what it normally would return, and hence the exception isn't raised.
If you copy paste the wb
workflow and use a new variable for it for the second test, then things pass. This will take some digging into the mock library to understand how they're interfering with each other.
flyteorg/flyteGitHub
09/02/2023, 12:38 AMFAIL_AFTER_EXECUTABLE_NODES_COMPLETE
enabled on the parent workflow or allowed_failure_ratio
enabled on the dynamic_task (which only works with sub-python_tasks). This aborts any of the dependent downstream tasks automatically. But sometimes, we would like to allow certain % of the tasks to fail since we may not always need 100% of the output.
Goal: What should the final outcome look like, ideally?
There should be an option to allow a % of the sub-workflows or sub-dynamic_tasks to fail.
Describe alternatives you've considered
Gone through the discussions at #191. Tried using FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
and allowed_failure_ratio
but neither works in this scenario.
[Optional] Propose: Link/Inline OR Additional context
I can think of two ways to do this:
1. Expand allowed_failure_ratio
to include sub-workflows and sub-dynamic_tasks
2. Add something similar to a try-catch
in the dynamic_task that we could use to catch any sub-execution failures and make the parent task succeed (as suggested here).
flyteorg/flyteGitHub
09/02/2023, 12:38 AM[2021-04-07T04:45:51Z] status = StatusCode.UNAVAILABLE
[2021-04-07T04:45:51Z] details = "upstream connect error or disconnect/reset before headers. reset reason: connection failure"
[2021-04-07T04:45:51Z] debug_error_string = "{"created":"@1617770750.462629552","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1039,"grpc_message":"upstream connect error or disconnect/reset before headers. reset reason: connection failure","grpc_status":14}"
Expected behavior
On Network related error responses Flytekit should retry the request. This retry should follow a specified retry policy whether that be exponential back offs, incremental intervals, randomization or some other retry policy.
[Optional] Additional context
To Reproduce
Steps to reproduce the behavior:
1.
2.
Screenshots
If applicable, add screenshots to help explain your problem.
flyteorg/flyteGitHub
09/02/2023, 11:00 AMimage▾
image▾
GitHub
09/02/2023, 11:46 AMimage▾
image▾
GitHub
09/03/2023, 12:39 AMGitHub
09/03/2023, 12:39 AMSdkWofkflow
, SdkLaunchPlan
, and SdkRemoteWorkflow
(to be added, similarly to SdkRemoteTask
).
flyteorg/flyteGitHub
09/03/2023, 12:39 AMpublic abstract class SdkDynamicWorkflow<InputT> {
public abstract void expand(SdkWorkflowBuilder builder, InputT input);
}
flyteorg/flyteGitHub
09/03/2023, 12:39 AMSdkBindingData
should become SdkBindingData<T>
, where T
should reflect the corresponding Java or Scala type. This will allow surface type information only available in workflow graph as type information embedded into Java/Scala types. We should update all existing methods to preserve type information where it's possible, for instance:
class SdkBindingData {
static SdkBindingData<String> literalOfString(String value);
}
flyteorg/flyte