GitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMPythonInstanceTask
. Observed this when building a task plugin.
Task's `__init__()`:
def __init__(
self,
...
**kwargs,
):
...
inputs.update({"runtime": (bool, False)})
super(..., self).__init__(
...
interface=Interface(inputs=inputs, outputs=outputs),
**kwargs,
)
Example:
task_object = ...(
...
inputs=kwtypes(dataset=str),
...
)
@workflow
def valid_wf(dataset: str = "...") -> None:
task_object(dataset=dataset)
valid_wf()
Error:
flytekit.common.exceptions.user.FlyteAssertion: Input was not specified for: runtime of type simple: BOOLEAN
Expected behavior
The above code shouldn't be throwing an error.
[Optional] Additional context
To Reproduce
Steps to reproduce the behavior:
1. Same as the above code flow.
Screenshots
If applicable, add screenshots to help explain your problem.
flyteorg/flyteGitHub
11/02/2023, 9:27 PMreturn FlyteFile["jpg"](path=out_path)
This causes my type checker (pylance/pyright) to complain because "jpg"
is not defined. And when a type parameter isn't supplied in a signature (ie: it's just FlyteFile) then the return type is partially unknown and inferred as FlyteFile[Unknown]
NB: The same applies to FlyteDirectory
.
flyteorg/flyteGitHub
11/02/2023, 9:27 PMWarning: <http://apiextensions.k8s.io/v1beta1|apiextensions.k8s.io/v1beta1> CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use <http://apiextensions.k8s.io/v1|apiextensions.k8s.io/v1> CustomResourceDefinition
is pointed out when running Kustomize deployment
Expected behavior
Current version of K8s is 1.21 -- we should upgrade that to refer to rbac.authorization.k8s.io/v1 by v.1.22+ release
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMScreen Shot 2021-07-16 at 1 45 04 PM▾
GitHub
11/02/2023, 9:27 PM@dynamic
def my_task(secret_key: str) -> str:
@task(secret_requests=[Secret(group="my-group", key=secret_key)])
def inner_task() -> str:
...
return inner_task()
Describe alternatives you've considered
1. Loading all keys for a secret. Doesn't work with some secret providers (particularly AWS Secret Manager)
2. This worked:
@task(secret_requests=[Secret(group="my-group", key="")])
def inner_task() -> str:
...
@dynamic
def my_task(secret_key: str) -> str:
inner_task.secret_requests = [Secret(group="my-group", key=secret_key)]
return inner_task()
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMqueue_timeout
flag that can be set at a global scope through configs, or at a task scope (ideally can also be on a project/domain/WF levels). And when flytepropeller detects that a task hasn't started executing for that period of time, it should just abort it.
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMflytectl register files -p flytesnacks -d development -a flyte-package.tgz -v v1-fast1
update_workflow_meta
flytectl update workflow --activate -p -d
flytectl update workflow --archive -p -d
update_task_meta
flytectl update task --activate -p -d
flytectl update task --archive -p -d
update_launch_plan_meta
flytectl update launchplan --activate -p -d
flytectl update launchplan --archive -p -d
update_cluster_resource_attributes
flytectl update cluster-resource-attribute --attrFile cra.yaml
update_execution_queue_attributes
flytectl update execution-queue-attribute --attrFile cra.yaml
update_execution_cluster_label
flytectl update execution-cluster-label --attrFile cra.yaml
get_matching_attributes
flytectl get task-resource-attribute -p -d
flytectl get cluster-resource-attribute -p -d
flytectl get execution-cluster-label -p -d
flytectl get execution-queue-attribute -p -d
flytectl get plugin-override -p -d
list_matching_attributes
flytectl get task-resource-attribute -p -d
flytectl get cluster-resource-attribute -p -d
flytectl get execution-cluster-label -p -d
flytectl get execution-queue-attribute -p -d
flytectl get plugin-override -p -d
setup_config
Not yet supported
** Linked tickets *
Pagination support #1146
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PM"<http://corp.net/uniqueKey|corp.net/uniqueKey>": "{{ .nodeNameWithRetry }}"
would get filled in correctly. This is useful for metrics libraries potentially.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMException
or at best ValueError
in pretty much all cases.
We should clean this up
• Create a proper exception hierarchy for system errors (one already exists for user exceptions).
• Potentially strip out the scoped exception business.
See flyteorg/flytekit#543 for more background.
What if we do not do this?
Lack of polish.
Related component
Possibly related to #740.
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PM// This is workflow layer metadata. These settings are only applicable to the workflow as a whole, and do not
// percolate down to child entities (like tasks) launched by the workflow.
message WorkflowMetadata {
// Indicates the runtime priority of workflow executions.
QualityOfService quality_of_service = 1;
// Failure Handling Strategy
enum OnFailurePolicy {
// FAIL_IMMEDIATELY instructs the system to fail as soon as a node fails in the workflow. It'll automatically
// abort all currently running nodes and clean up resources before finally marking the workflow executions as
// failed.
FAIL_IMMEDIATELY = 0;
// FAIL_AFTER_EXECUTABLE_NODES_COMPLETE instructs the system to make as much progress as it can. The system will
// not alter the dependencies of the execution graph so any node that depend on the failed node will not be run.
// Other nodes that will be executed to completion before cleaning up resources and marking the workflow
// execution as failed.
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE = 1;
}
// Defines how the system should behave when a failure is detected in the workflow execution.
OnFailurePolicy on_failure = 2;
}
// The difference between these settings and the WorkflowMetadata ones is that these are meant to be passed down to
// a workflow's underlying entities (like tasks). For instance, 'interruptible' has no meaning at the workflow layer, it
// is only relevant when a task executes. The settings here are the defaults that are passed to all nodes
// unless explicitly overridden at the node layer.
// If you are adding a setting that applies to both the Workflow itself, and everything underneath it, it should be
// added to both this object and the WorkflowMetadata object above.
message WorkflowMetadataDefaults {
// Whether child nodes of the workflow are interruptible.
bool interruptible = 1;
}
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PMgetType()
and getCustom()
methods in SdkRunnableTask
. All runnable tasks require users to implement run()
method. Many plugins don't need tasks to be runnable, for instance, pseudo-code for Athena plugin.
abstract class AthenaTask extends SdkRunnableTask<Void, Void> {
String getType() {
return "athena";
}
abstract String getQuery();
SdkStruct getCustom() { ... }
Void run(Void inputs) {
return null;
}
}
class MyQueryTask extends AthenaTask {
String getQuery() {
return "SELECT 1";
}
}
As you can see from this example, it's possible to hide run()
method from users. However, SDK will see that the task is runnable, and will stage jars for it. That can be avoided if we introduce non-runnable tasks.
We need to carefully iterate on the design of this feature, and see how to consistently fit it into existing testing support, and methods that work with runnable tasks.
flyteorg/flyteGitHub
11/02/2023, 9:27 PMSdkBindingData
should become SdkBindingData<T>
, where T
should reflect the corresponding Java or Scala type. This will allow surface type information only available in workflow graph as type information embedded into Java/Scala types. We should update all existing methods to preserve type information where it's possible, for instance:
class SdkBindingData {
static SdkBindingData<String> literalOfString(String value);
}
flyteorg/flyteGitHub
11/02/2023, 9:27 PMpublic abstract class SdkDynamicWorkflow<InputT> {
public abstract void expand(SdkWorkflowBuilder builder, InputT input);
}
flyteorg/flyteGitHub
11/02/2023, 9:27 PMSdkWofkflow
, SdkLaunchPlan
, and SdkRemoteWorkflow
(to be added, similarly to SdkRemoteTask
).
flyteorg/flyteGitHub
11/02/2023, 9:27 PMGitHub
11/02/2023, 9:27 PM[2021-04-07T04:45:51Z] status = StatusCode.UNAVAILABLE
[2021-04-07T04:45:51Z] details = "upstream connect error or disconnect/reset before headers. reset reason: connection failure"
[2021-04-07T04:45:51Z] debug_error_string = "{"created":"@1617770750.462629552","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1039,"grpc_message":"upstream connect error or disconnect/reset before headers. reset reason: connection failure","grpc_status":14}"
Expected behavior
On Network related error responses Flytekit should retry the request. This retry should follow a specified retry policy whether that be exponential back offs, incremental intervals, randomization or some other retry policy.
[Optional] Additional context
To Reproduce
Steps to reproduce the behavior:
1.
2.
Screenshots
If applicable, add screenshots to help explain your problem.
flyteorg/flyte