GitHub
10/11/2023, 4:28 AMpd.DataFrame
with dataclasses raises an error:
File "/Users/nielsbantilan/miniconda3/envs/flyte-vscode-demo/lib/python3.9/site-packages/dataclasses_json/core.py", line 201, in _decode_dataclass
init_kwargs[field.name] = _decode_generic(field_type,
File "/Users/nielsbantilan/miniconda3/envs/flyte-vscode-demo/lib/python3.9/site-packages/dataclasses_json/core.py", line 258, in _decode_generic
xs = _decode_items(type_.__args__[0], value, infer_missing)
AttributeError: type object 'DataFrame' has no attribute '__args__'
Expected behavior
this should work like structured dataset
Additional context to reproduce
Using this type in a task
@dataclass_json
@dataclass
class TrainArgs:
hyperparameters: dict
data: pd.DataFrame
@task
def prepare_train_args(hp_grid: List[dict], data: pd.DataFrame) -> List[TrainArgs]:
return [TrainArgs(hp, data) for hp in hp_grid]
will lead to the error above
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteDavid DeStefano
10/17/2023, 10:39 AMGitHub
10/17/2023, 3:22 PMmap_task
with a partially applied function will raise an exception depending on the order of srguments in the original function definition.
Expected behavior
Argument order ought not to matter with a partially applied function being mapped. This would align with Python syntax.
Additional context to reproduce
Workflow python file `workflows/example.py`:
from functools import partial
from flytekit import map_task, task, workflow
@task()
def create_names() -> list[str]:
return ["a", "bb", "ccc"]
@task()
def get_number() -> int:
return 3
@task()
def multiply_name(
number: int,
name: str,
) -> str:
return " ".join([name] * number)
@workflow
def process_all() -> list[str]:
name_list = create_names()
number = get_number()
multiply_name_partial = partial(
multiply_name,
number=number,
)
result = map_task(multiply_name_partial)(name=name_list)
return result
Python 3.11.5
requirements.in:
black==23.9.1
dataclasses-json==0.6.1
flytekit==1.8.0
marshmallow-enum==1.5.1
ipykernel==6.25.2
isort==5.12.0
To reproduce: pyflyte run workflows/example.py process_all
It will work of you change the argument order in the `multiply_name`task to this:
@task()
def multiply_name(
name: str,
number: int,
) -> str:
return " ".join([name] * number)
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/18/2023, 10:34 PMAnnotated[FlyteFile, HashMethod(...)]
for calculating a custom hash for a FlyteFile
doesn't work out-of-the-box, failing an `issubclass` check in flytekit/types/file/file.py.
This makes it impossible to use custom hash functions with FlyteFile
, e.g., to implement content-aware caching.
Note, that the same approach with Annotated[FlyteDirectory, HashMethod(...)]
works as expected out of the box.
Example workflow code
import hashlib
from typing import Annotated
import pandas as pd
from flytekit import task, workflow
from flytekit.core.hash import HashMethod
from flytekit.types.file import FlyteFile
def calc_hash(f: FlyteFile) -> str:
"""Calculate SHA1 hash of a file"""
h = hashlib.sha1(usedforsecurity=False)
with open(f.path, "rb") as f:
while chunk := f.read(4096):
h.update(chunk)
return str(h.hexdigest())
CachedFlyteFile = Annotated[FlyteFile, HashMethod(calc_hash)]
@task
def write_file() -> CachedFlyteFile:
print("write_file")
local_path = "data.parquet"
df = pd.DataFrame(data={"a": [1, 2, 3], "b": [3, 4, 5]})
df.to_parquet(local_path)
return FlyteFile(local_path, remote_path=f"<s3://test-repo/main/{local_path}>")
@task(cache=True, cache_version="1")
def print_file(file: FlyteFile) -> None:
file.download()
print(pd.read_parquet(file))
@workflow
def wf() -> None:
f = write_file()
print_file(file=f)
if __name__ == "__main__":
wf()
wf() # don't expect output from `print_file`, since it should be cached
Expected behavior
The first execution of wf
should run both the write_file
and print_file
tasks, the second execution should only run write_file
and hit the cache for print_file
.
In reality, an exception is raised in `flytekit/types/file/file.py, in to_literal()`:
Additional context to reproduce See example workflow above. I was able to solve the issue by inserting the following before theTypeError: issubclass() arg 1 must be a class
issubclass
check in flytekit/types/file/file.py
(I haven't run the flytekit test suite, so this might not be suitable for a mergeable PR just yet - should I submit a draft PR regardless?):
@@ -284,6 +288,10 @@
"None value cannot be converted to a file."
)
+ # Handle Annotated[FlyteFile, ...] correctly by extracting the wrapped type
+ if issubclass(typing.get_origin(python_type), typing.Annotated):
+ python_type = typing.get_args(python_type)[0]
+
if not (python_type is os.PathLike or issubclass(python_type, FlyteFile)):
raise ValueError(
f"Incorrect type {python_type}, must be either a FlyteFile or os.PathLike"
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/19/2023, 9:58 PM@task(cache=True, cache_version="v1")
def t(log_level: int, a: str) -> str:
...
According to the docs, one of the inputs to cache key calculation is the task signature, but in the case of this example, it'd be great if we could ignore log_level
as part of the cache key calculation.
Goal: What should the final outcome look like, ideally?
We should be able to do something along the lines of:
@task(cache=True, cache_version="v1", ignore_input_vars=["log_level"])
def t(log_level: int, a: str) -> str:
...
This would essentially skip some of the parameters for cache key calculation purposes.
Describe alternatives you've considered
We have the ability to override the hashing mechanism used to translate python types into Flyte types, as described in https://docs.flyte.org/projects/cookbook/en/latest/auto/core/flyte_basics/task_cache.html#caching-of-non-flyte-offloaded-objects.
One could use this idea and provide constant hashes for the arguments they want to ignore, for example:
def constant_function(x: int) -> str:
return "const"
@task
def t_produce_annotated_literals() -> Annotated[int, HashMethod(constant_function)]:
log_level = ...
return log_level
@task(cache=True, cache_version="v1")
def t(log_level: int, a: str) -> str:
...
@workflow
def wf() -> str:
log_level = t_produce_annotated_literals()
return t(log_level=log_level, a="some string")
Propose: Link/Inline OR Additional context
Expose ignore_input_vars
in the @task
decorator and ensure the new interface is used during cache key calculation in both local and remote executions.
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/20/2023, 10:24 PMpyflyte run
by @kumare3 in #1785
• Add is none function by @pingsutw in #1757
• Dynamic workflow should not throw nested task warning by @oliverhu in #1812
• Add a manual image building GH action by @wild-endeavor in #1816
• Enable Azure Workload Identity for fsspec
in flytekit
by @fiedlerNr9 in #1813
• Fix list of annotated structured dataset by @wild-endeavor in #1817
• Support the flytectl config.yaml admin.clientSecretEnvVar option in flytekit by @chaohengstudent in #1819
• Async agent delete function for while loop case by @Future-Outlier in #1802
• fix docs warnings by @samhita-alla in #1827
• Fix extract_task_module by @pingsutw in #1829
• Feat: Add type support for pydantic BaseModels by @ArthurBook in #1660
• Make FlyteRemote example slightly more copy/pastable by @katrogan in #1830
• Pyflyte meta inputs by @kumare3 in #1823
• Use mashumaro to serialize/deserialize dataclass by @hhcs9527 in #1735
• Databricks Agent by @Future-Outlier in #1797
• Prometheus metrics by @pingsutw in #1815
• Pyflyte register optionally activates schedule by @kumare3 in #1832
• Remove versions 3.9 and 3.10 by @wild-endeavor in #1831
• Snowflake agent by @hhcs9527 in #1799
• Update agent metric name by @pingsutw in #1835
• MemVerge MMCloud Agent by @edwinyyyu in #1821
• Add download badges to the readme by @pingsutw in #1836
• Eager local entrypoint and support for offloaded types by @cosmicBboy in #1833
• update requirements and add snowflake agent to api reference by @samhita-alla in #1838
• Fix: Make sure decks created in elastic task workers are transferred to parent process by @fg91 in #1837
• add accept grpc by @wild-endeavor in #1841
• Feat: Enable flytekit
to authenticate with proxy in front of FlyteAdmin by @fg91 in #1787
• Backfill command now supports failure-policy by @kumare3 in #1840
• Pass cluster pool when creating executions by @iaroslav-ciupin in #1208
• Add more clear error message when fetching secrets by @ysysys3074 in #1847
• Pyflyte run workflows correctly handles Optional[TYPE] = None by @cosmicBboy in #1849
• Bump gitpython from 3.1.32 to 3.1.35 in /tests/flytekit/integration/remote/mock_flyte_repo/workflows by @dependabot in #1828
• Bump cryptography from 41.0.3 to 41.0.4 in /tests/flytekit/integration/remote/mock_flyte_repo/workflows by @dependabot in #1844
• update codecov config in pythonbuild ci by @cosmicBboy in #1852
• Improved gate node handling by @kumare3 in #1850
• update codecov yaml, make eager wf test more stable by @cosmicBboy in #1854
• more codecov updates by @cosmicBboy in #1856
• make mlflow plugin work with python 3.11 by @bcvanmeurs in #1855
• Agent get secret function by @Future-Outlier in #1851
• Better Azure blob storage support by @Tom-Newton in #1842
• Remove Envd from dockerfile by @pingsutw in #1814
• Update agent uri format by @hhcs9527 in #1861
• modified setup.py in flytekit-mmcloud by @helenzhangyc in #1864
• Label request_failure metric with error_code by @pingsutw in #1862
• add back Edwin's name by @helenzhangyc in #1868
• Fix py38 aync agent service and add async agent test by @Future-Outlier in #1866
• Pin because not installing by @wild-endeavor in #1872
• Bump pillow from 10.0.0 to 10.0.1 in /tests/flytekit/integration/remote/mock_flyte_repo/workflows by @dependabot in #1867
• Prune hosted github runner to free up space for sandbox by @jeevb in #1875
• Fixes Auth issue, where refresh token is expired by @kumare3 in #1873
• Name field is incorrect assigned by @kumare3 in #1874
• [Docs]Updated examples for customizing resources by @LunarMarathon in #1871
• change key path to SNOWFLAKE_PRIVATE_K…
flyteorg/flytekitGitHub
10/20/2023, 11:21 PMflytectl
to fetchall, archive, and activate launchplans
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/21/2023, 12:14 AMGitHub
10/21/2023, 4:35 AMGitHub
10/23/2023, 12:06 AMKetan (kumare3)
pyflyte run
used to use a special way of handling certain objects, now that is centralized to use the typeEngine
• pyflyte run
and gate node
used different string parsing systems, now that is centralized
• pyflyte run
can now use multiple files
in a directory locally to create a new remote FlyteDirectory representation
• pyflyte run
code has been simplified
Improvements
• pyflyte info
added
• pyflyte fetch <flyte-uri>
added
• This introduces new flyte-uri
concept that is available in the UI and you can retrieve inputs and outputs from any execution (workflow or even a single node) instantaneously as it deeplinks into it.
• centralizes fsspec
as the way to interact with data
Planned work
• Artifacts service that is unblocked and simplified with this
• improved launchform experience from flyte uris
• and many more data interaction improvementsGitHub
10/24/2023, 10:31 PMflyte://
file system and improve remote file handling by @wild-endeavor in #1674
• Improved rendering of output literals by @kumare3 in #1916
• Use Airflow task insdie dynamic workflow by @pingsutw in #1912
Full Changelog: v1.10.0...v1.10.1b0
flyteorg/flytekitGitHub
10/25/2023, 12:06 AMGitHub
10/27/2023, 4:39 AMex1 = flyte_remote.execute(my_task, {"input": 1}, execution_name="foo")
ex2 = flyte_remote.execute(my_task, {"input": 2}, execution_name="foo")
Creates only 1 execution (or 0, if execution named "foo" already existed). The second set of inputs is silently ignored, and the existing execution is returned.
Goal: What should the final outcome look like, ideally?
I would suggest a way to generate a name with a human-readable prefix (with behavior similar to e.g. Kubernetes's generateName
):
ex1 = flyte_remote.execute(my_task, {"input": 1}, execution_name_prefix="foo-")
ex2 = flyte_remote.execute(my_task, {"input": 2}, execution_name_prefix="foo-")
ex1.id.name # Something like "foo-41e2c72fa869"
ex2.id.name # Something like "foo-7742ba115212"
Describe alternatives you've considered
Propose: Link/Inline OR Additional context
Note: I'm happy to create a PR to flytekit if the idea seems reasonable.
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/29/2023, 12:06 AM@dataclass
or NamedTuple
is basically the same:
@dataclass
class Config:
epochs: int
cv_splits: int
...
class Config(NamedTuple):
epochs: int
cv_splits: int
...
So it's a very easy trap to think they're basically interchangeable and then get confused and/or frustrated when flyte behaves in very different ways depending on which one you used. My team is trying to anticipate adding the rest of our team as flyte users (~8 people), as well as handfuls more teams (~5–10 teams) as users, and we think this is an important friction to get ahead of and have a simple recommendation and happy path for.
Goal: What should the final outcome look like, ideally?
Library pseudocode
• Define this once
• Document/explain to users to use Outputs instead of NamedTuple for task outputs
# Outputs is like NamedTuple except:
# - It fills in the type name for you -- it's a nuisance parameter, and flyte ignores it
# - You use it inline instead of inheriting from it, for both type and value usages
# - TODO Add metaclass stuff to make this code actually work as a type (and a value)
Outputs = lambda **kwargs: NamedTuple("Outputs", **kwargs)
Example user code:
from wherever import Outputs
@dataclass
def Config:
...
@dataclass
def TrainStats:
...
@task
def evaluate_model(
config: Config, # A user-defined dataclass
model: tf.keras.Model, # Some type from a library
metrics: List[str], # A normal python datatype
) -> Outputs( # Use Outputs() inline as a type instead of declaring a NamedTuple
success: bool, # A normal python datatype
stats: TrainStats, # A user-defined dataclass
thresholds: np.ndarray # Some type from a library
):
...
return Outputs( # Also use Outputs() as a value, matching the type above
success=...,
stats=...,
thresholds=...,
)
# Simple tasks can ofc still return single outputs too
# - With no name, i.e. flyte's default o1 naming
@task
def sample_train_data(X: pd.DataFrame) -> pd.DataFrame:
...
Describe alternatives you've considered
.
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteRahul Mehta
10/30/2023, 7:26 PMfsspec
that this change will resolve: https://github.com/flyteorg/flytekit/pull/1925. Curious how folks feel about this one but it's holding us back on a few upgrades/seems like a reasonable changeGitHub
10/30/2023, 7:27 PMmap_task.min_success_ratio
functionality to allow for some failures of our input files without stopping workflow execution. This worked great remotely. As I wrote the unit test for the workflow I realized that min_success_ratio
field was not respected for local execution. As soon as any map_task
task failed the workflow exited.
To be clear this is not a blocking issue for us, and is not a critical feature. We simply support parity between local and remote execution when possible
Goal: What should the final outcome look like, ideally?
local execution of a map_task should respect the min_success_ratio
field if set below 1.0
Describe alternatives you've considered
This works remotely, so I skipped writing the unit test for the workflow.
Propose: Link/Inline OR Additional context
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from flytekit import workflow, task, map_task, LaunchPlan
from flytekit.testing import task_mock
import functools
from typing import Optional
@dataclass_json
@dataclass
class AData:
num: Optional[int]
@dataclass_json
@dataclass
class BData:
num: Optional[int]
@task
def t1(a: AData, b: BData, c: str) -> int:
print(c)
if a.num == 2:
raise ValueError("'a' input was 2")
return a.num * b.num
@task
def coalesce(nums: list[Optional[int]]) -> int:
sum = 0
for n in nums:
sum += n if n else 0
return sum
@workflow
def wf(a: list[AData], b: list[BData], c: str) -> int:
t1_fixed_c = functools.partial(t1, c=c)
results = map_task(t1_fixed_c, min_success_ratio=0.5)(a=a, b=b)
return coalesce(nums=results)
def test_map_wf():
a_list = [AData(num=1), AData(num=2), AData(num=3), AData(num=4)]
b_list = [BData(num=5), BData(num=6), BData(num=7), BData(num=8)]
results = wf(a=a_list, b=b_list, c="hello")
assert results == 58
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/31/2023, 12:06 AMPod | Container
, but flytekit, allows using pod as a special task type. To achieve this the user has to install flytekitplugins-pod.
A preferred way would be to allow swapping out container vs pod for all containerized task executions. Let us consider the following example
@task(task_config=MyConfig(....))
def tk():
....
It should be possible to pass in the podconfig as follows
@task(task_config=MyConfig(...), pod=PodConfig())
def tk():
....
It is also ok to need flytekitplugins-pod to be installed to actually handle the right type
Goal: What should the final outcome look like, ideally?
The experience is detailed above. The reason to do this is so that in complicated cases, where users want to control the pod attributes, it should be trivial and possible
Describe alternatives you've considered
this is not easily possible. This has been implemented for ShellTasks as an example
Propose: Link/Inline OR Additional context
NA
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
10/31/2023, 12:06 AM@task
def my_task(foo: int, bar: str) -> bool:
"""
:param foo: Some foo description here
:param bar: Other bar description here
:return: Description of return type here
...
Description Entity Table
FlyteConsole: We should be able to see the description when listing tasks/workflows
image▾
GitHub
11/01/2023, 12:14 AM@task(
task_config=Pod(
pod_spec=V1PodSpec(
containers=[],
node_selector={"node_group": "memory"},
),
),
requests=Resources(
mem="1G",
),
)
def my_pod_task():
...
Goal: What should the final outcome look like, ideally?
Improve the example and point to the new pod plugin
Describe alternatives you've considered
NA
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/01/2023, 1:27 AMpyflyte run
commands, you can specify a --envs
flag and pass in a JSONified dictionary/object to propagate variables to the container running the workflows/tasks.
Like so:
pyflyte run --envs '{"var1":"hello","var2":"world"}' --image etcetcetc workflowfile
We are requesting an equivalent mode of passing an --envs like dictionary in the context of running pyflyte register
and pyflyte launchplan --activate/deactivate
commands
Context being:
pyflyte register -p flytesnacks --image imagename:1.0.5 --version v0.0.2a ./workflow.py
And/or
pyflyte launchplan -p flytesnacks -d staging launchplan-name v0.0.1a --activate
flyteorg/flyteGitHub
11/02/2023, 9:23 PM1_2t9jVOLMAZuz4swwnV3Xng▾
GitHub
11/02/2023, 9:23 PM_if
in the condition
must be a promise, and can't use a native python type in the if statement.
@dynamic
def d1() -> bool:
a = t1()
return (
conditional("train_estimator")
.if_(a==True)
.then(t2())
.else_()
.then(t2()))
@dynamic
def d1(a: bool) -> bool:
return (
conditional("train_estimator")
.if_(a==True) # <- failed because the a isn't a promise
.then(t2())
.else_()
.then(t2()))
Goal: What should the final outcome look like, ideally?
Both promise and native python types should be supported in the conditions
Describe alternatives you've considered
Users must create a task to convert it to a promise and use it in the conditions.
@task
def convert_to_promise(a: bool) -> bool:
return a
Propose: Link/Inline OR Additional context
https://flyte-org.slack.com/archives/CREL4QVAQ/p1666110935856779
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:23 PMnumpy
were deprecated in v1.20.0 and finally removed in v1.24.0
(check Deprecations in https://github.com/numpy/numpy/releases/tag/v1.20.0).
We should remove the mentions to those deprecated aliases from the FlyteSchemaTransformer.
What if we do not do this?
Users who install numpy>=1.24.0
will see this error in flytekit:
/tmp/tmp.pgqfAvNZOC/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py:324: FutureWarning: In the future `np.bool` will be defined as the corresponding NumPy scalar. (This may have returned Python scalars in past versions.
_np.bool: SchemaType.SchemaColumn.SchemaColumnType.BOOLEAN, # type: ignore
Traceback (most recent call last):
File "/tmp/tmp.pgqfAvNZOC/venv/bin/pyflyte", line 5, in <module>
from flytekit.clis.sdk_in_container.pyflyte import main
File "/tmp/tmp.pgqfAvNZOC/venv/lib/python3.10/site-packages/flytekit/__init__.py", line 195, in <module>
from flytekit.types import directory, file, numpy, schema
File "/tmp/tmp.pgqfAvNZOC/venv/lib/python3.10/site-packages/flytekit/types/schema/__init__.py", line 1, in <module>
from .types import (
File "/tmp/tmp.pgqfAvNZOC/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py", line 314, in <module>
class FlyteSchemaTransformer(TypeTransformer[FlyteSchema]):
File "/tmp/tmp.pgqfAvNZOC/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py", line 324, in FlyteSchemaTransformer
_np.bool: SchemaType.SchemaColumn.SchemaColumnType.BOOLEAN, # type: ignore
File "/tmp/tmp.pgqfAvNZOC/venv/lib/python3.10/site-packages/numpy/__init__.py", line 284, in __getattr__
raise AttributeError("module {!r} has no attribute "
AttributeError: module 'numpy' has no attribute 'bool'. Did you mean: 'bool_'?
Related component(s)
flytekit
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:23 PMGitHub
11/02/2023, 9:23 PM@task
def t1() -> Annotated[pd.DataFrame, kwtypes(a=int)]:
...
@task
def t2(df: Annotated[pd.DataFrame, kwtypes(a=float)]):
...
It's conceivable that sometimes you want this type conversion (from int to float) to happen and sometimes you do not.
What is the correct way of allowing the user to control this behavior?
Also where should this be enforced? Enforcing this at run-time is likely best to give as much flexibility to the plugin author as possible.
Implementation
?
Misc
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:23 PMfrom flytekit import workflow
from flytekitplugins.dbt.task import DBTRun, DBTRunInput
dbt_task = DBTRun(name="the_name_of_the_task")
@workflow
def my_wf() -> None:
input = DBTRunInput(
project_dir="dbt_project_dir",
profiles_dir="dbt_project_dir/docker-context",
profile="default",
select=["some_model"]
)
dbt_task(input=input)
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:23 PMpyflyte run
. We should add support launchplans as well.
Goal: What should the final outcome look like, ideally?
We should be able to use the same experience currently offered in pyflyte run
to kickoff launchplans, i.e. users should be able to execute:
> pyflyte run --remote path/to/my/file.py my_launchplan
Go to <http://flyte.example.com/console/projects/flytesnacks/domains/development/executions/f6e7113930e3043e79cc> to see execution in the console.
...
Describe alternatives you've considered
launchplans are an indispensable entity type in the Flyte programming model, hence it should be supported in the CLIs.
Propose: Link/Inline OR Additional context
We should add launch plans to the list of entities handled by pyflyte run
in here.
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:24 PMfrom typing import Dict, List, NamedTuple
from flytekit import task, workflow
class OpenFlightsData(NamedTuple):
routes: List[Dict[str, str]]
airlines: Dict[str, str]
airports: Dict[str, Dict[str, str]]
@task()
def extract_reference_data() -> OpenFlightsData:
pass
Fails with
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[fb5562eecbc2f40f9b0d-n0-0] terminated with exit code (137). Reason [Error]. Message:
.
Expected behavior
No failure
Additional context to reproduce
flytekit 1.2.1
flyte admin 1.1.46
Screenshots
Screen Shot 2022-10-13 at 10 36 58 pm▾
GitHub
11/02/2023, 9:24 PMFile "/Users/.../lib/python3.7/site-packages/flytekit/core/launch_plan.py", line 143, in create
native_types=workflow.python_interface.inputs,
AttributeError: 'NoneType' object has no attribute 'inputs'
Code snippet:
from flytekit import LaunchPlan
flyte_workflow = remote.fetch_workflow(
name="my_workflow", version="v1", project="flytesnacks", domain="development"
)
launch_plan = LaunchPlan.get_or_create(name="my_launch_plan", workflow=flyte_workflow)
Goal: What should the final outcome look like, ideally?
Should be able to create a launch plan for FlyteWorkflow
Describe alternatives you've considered
NA
Propose: Link/Inline OR Additional context
https://flyte-org.slack.com/archives/CP2HDHKE1/p1665377001295529
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyte