GitHub
12/31/2022, 12:43 AMList[Any]
. This slows down serialization. it takes more than 15 mins to upload the pickle to s3 if the size of list is 1000.
People don't care about how we serialize List[Any]
. We can just convert entire list into a single pickle file, which reduces the time required for serialization.
Goal: What should the final outcome look like, ideally?
it will make serialization faster
Describe alternatives you've considered
• Raise an error when using large list
• Add detailed warning
Propose: Link/Inline OR Additional context
Slack Thread
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteXuan Hu
01/03/2023, 9:40 AMDave Liu
01/03/2023, 11:16 PMGitHub
01/06/2023, 6:03 PMGitHub
01/06/2023, 6:04 PMGitHub
01/10/2023, 12:27 AMGitHub
01/11/2023, 11:58 PMGitHub
01/12/2023, 6:57 PMpyflyte
the -d
switch was overloaded, used for both domain
and the destination-dir
that flytekit will extract user code into when using the fast-register feature. The latter has been updated to -D
. This may require some users to change commands in CI/CD if the short form of these switches is used.
Protobuf Support
In May of last year Google announced a breaking change to the support for protobuf in python. Starting with this release we moved to this new version of the protobuf tooling, specifically, this affects some of the flytekit dependencies, including grpcio
and grpcio-status
.
Some flytekit dependencies, namely tensorflow
haven't upgraded yet to the new protobuf version yet (more details in tensorflow/tensorflow#53234), so for the time being, flytekit v1.2.x will be maintained for bug fixes (and possibly features as well).
* * *
Additional
Please see Flyte 1.3 Release Notes for additional changes (Databricks, Human-in-the-loop Tasks, Dask, and FlyteRemote updates) and usage.
* * *
What's Changed
• Sanitize query template input in sqlite task by @eapolinario in #1359
• TypeTransformer for reading and writing from TensorFlowRecord format by @ryankarlos in #1240
• Bump numpy from 1.21.6 to 1.22.0 in /tests/flytekit/integration/remote/mock_flyte_repo/workflows by @dependabot in #1170
• Update ray plugin dependency by @pingsutw in #1361
• Bring in buf protobufs by @eapolinario in #1363
• Set default format of structured dataset to empty by @pingsutw in #1159
• Adds CLI reference for pyflyte by @samhita-alla in #1362
• Update plugins requirements files versions by @eapolinario in #1365
• Signaling by @wild-endeavor in #1133
• Adding created and updated at to ExecutionClosure model by @wild-endeavor in #1371
• Restrict numpy due to deprecated aliases by @eapolinario in #1376
• Add Databricks config to Spark Job by @pingsutw in #1358
• Add overwrite_cache option the to calls of remote and local executions by @hfurkanvural in #1375
• Remove project/domain from being overridden with execution values in serialized context by @wild-endeavor in #1378
• Use TaskSpec instead of TaskTemplate for fetch_task and avoid network when loading module by @kumare3 in #1348
• Register Databricks config by @pingsutw in #1379
• PodSpec should not require primary_container name by @kumare3 in #1380
• fix(pyflyte register): change -d to -D for --destination-dir as -d is already used for --domain by @mcloney-ddm in #1381
• [Snyk] Security upgrade torch from 1.12.1 to 1.13.1 by @snyk-bot in #1374
• Handle Optional[FlyteFile] in Dataclass type transformer by @eapolinario in #1393
• Update flyte deck plugin's dependency by @pingsutw in #1395
• add FastSerializationSettings to docs by @cosmicBboy in #1386
• Added more pod tests and an example pod task by @kumare3 in #1382
• Convert default value of dict to json string in pyflyte run by @pingsutw in #1399
• docs: update register help, non-fast version is supported by @pbrogan12 in #1402
• Update log level for structured dataset by @pingsutw in #1394
• Add Niels to code owners by @pingsutw in #1404
• Signal use by @wild-endeavor in #1398
• User Documentation Proposal by @pingsutw in #1200
• Add support MLFlow plugin by @kumare3 in #1274
• fix remote API reference by @cosmicBboy in #1405
• Read structured dataset from a folder by @pingsutw in #1406
• Update default config to work out-of-the-box with flytectl demo by @cosmicBboy in #1384
• Add dask plugin #patch by @bstadlbauer in #1366
New Contributors
• @hfurkanvural made their first contribution in #1375
• @mcloney-ddm made their first contribution in #1381
• @pbrogan12 made their first contribution in #1402
Full Changelog: v1.2.5...v1.3.0
flyteorg/flytekitGitHub
01/17/2023, 3:22 PMGitHub
01/18/2023, 6:37 PMAndrew Achkar
01/20/2023, 2:40 PMhonnix
01/24/2023, 4:50 PMremote.execute
. Can someone take a look at it? Thank you. https://github.com/flyteorg/flyte/issues/3261GitHub
01/25/2023, 2:16 AMmake setup
as part of the dev environment, however that command fails on Mac M1's with this error message:
ERROR: Ignored the following versions that require a different python version: 2.6.2 Requires-Python >=3.6, <3.9
ERROR: Could not find a version that satisfies the requirement tensorflow-io-gcs-filesystem==0.28.0 (from versions: none)
ERROR: No matching distribution found for tensorflow-io-gcs-filesystem==0.28.0
and
ERROR: No matching distribution found for tensorflow==2.8.1
The issue stems from the fact that tensorflow special-cases a dependency based on the platform+OS, while we currently only maintain a single set of requirements files.
Expected behavior
Mac M1's users should be able to run make setup
successfully.
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteXuan Hu
01/28/2023, 9:18 AMFLYTE_PLATFORM_AUTH
work in flytekit as described in https://docs.flyte.org/en/latest/deployment/cluster_config/auth_setup.html#continuous-integration-ci. I tried to investigate into the source code of flytekit but failed. To be more speific, the source code of flytekit/configuration/__init__.py
and flytekit/configuration/internal.py
.GitHub
01/30/2023, 7:01 AMFile "/home/dev/conda_dev/devenv/Linux/envs/devenv-3.8-c/lib/python3.8/site-packages/flytekit/remote/remote.py", line 847, in execute
return self.execute_remote_task_lp(
File "/home/dev/conda_dev/devenv/Linux/envs/devenv-3.8-c/lib/python3.8/site-packages/flytekit/remote/remote.py", line 924, in execute_remote_task_lp
return self._execute(
File "/home/dev/conda_dev/devenv/Linux/envs/devenv-3.8-c/lib/python3.8/site-packages/flytekit/remote/remote.py", line 715, in _execute
type_hints[k] = TypeEngine.guess_python_type(input_flyte_type_map[k].type)
File "/home/dev/conda_dev/devenv/Linux/envs/devenv-3.8-c/lib/python3.8/site-packages/flytekit/core/type_engine.py", line 856, in guess_python_type
return transformer.guess_python_type(flyte_type)
File "/home/dev/conda_dev/devenv/Linux/envs/devenv-3.8-c/lib/python3.8/site-packages/flytekit/core/type_engine.py", line 1125, in guess_python_type
return typing.Union[tuple(TypeEngine.guess_python_type(v.type) for v in literal_type.union_type.variants)]
File "/home/dev/conda_dev/devenv/Linux/envs/devenv-3.8-c/lib/python3.8/site-packages/flytekit/core/type_engine.py", line 1125, in <genexpr>
return typing.Union[tuple(TypeEngine.guess_python_type(v.type) for v in literal_type.union_type.variants)]
AttributeError: 'LiteralType' object has no attribute 'type'
The fix seems to be changing v.type
-> v
Expected behavior
It should work!
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
02/03/2023, 12:37 AMpod_template
and pod_template_name
arguments for PythonAutoContainerTask
, its downstream tasks, and @task
. by @ByronHsu in #1425
New Contributors
• @ppiegaze made their first contribution in #1416
• @ByronHsu made their first contribution in #1425
Full Changelog: v1.3.1...v1.4.0b0
flyteorg/flytekitGitHub
02/03/2023, 5:19 PMdef generate_backfill_workflow(
start_date: datetime, end_date: datetime, base_lp: LaunchPlan
) -> Workflow:
if base_lp.schedule is None:
raise ValueError("Backfill can only be created for scheduled launchplans")
if isinstance(base_lp.schedule, CronSchedule):
pass
else:
raise NotImplementedError("The launchplan schedule needs to be a cron schedule")
if start_date >= end_date:
raise ValueError("Start date should be greater than end date")
print(f"Generating backfill for {start_date} to {end_date}")
wf = Workflow(name=f"backfill-{base_lp.name}")
lp_iter = croniter(
base_lp.schedule.cron_schedule.schedule, start_time=start_date, ret_type=datetime
)
while True:
next_start_date = lp_iter.get_next()
if next_start_date > end_date:
break
print(f"Adding -> {next_start_date}")
wf.add_launch_plan(base_lp, kickoff_time=next_start_date)
`
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
02/08/2023, 11:26 PMpod_template
and pod_template_name
arguments for PythonAutoContainerTask
, its downstream tasks, and @task
. by @ByronHsu in #1425
• Auto Backfill workflow by @kumare3 in #1420
• Fix primitive decoder when evaluating Promise by @samhita-alla in #1432
• set maximum python version to 3.10 by @cosmicBboy in #1433
• Revert "Remove project/domain from being overridden with execution values in serialized context (#1378)" by @eapolinario in #1460
New Contributors
• @ppiegaze made their first contribution in #1416
• @ByronHsu made their first contribution in #1425
Full Changelog: v1.3.1...v1.3.2
flyteorg/flytekitGitHub
02/14/2023, 7:39 PMGitHub
02/16/2023, 12:50 AMSamhita Alla
Samhita Alla
Samhita Alla
Sam Eckert
02/17/2023, 9:54 PMclass Compounds1DTransformer(TypeTransformer[Compounds1D]):
"""
Custom transformer for Compounds1D to be used with Flyte
"""
def __init__(self):
super().__init__("Compounds1D", Compounds1D)
self._transformer = FlyteSchemaTransformer()
def get_literal_type(self, t: Type[Compounds1D]) -> LiteralType:
return self._transformer.get_literal_type(FlyteSchema)
def to_literal(
self, ctx: FlyteContext, python_val: Compounds1D, python_type: Type[Compounds1D], expected: LiteralType
) -> Literal:
return self._transformer.to_literal(ctx, python_val=python_val.df, python_type=FlyteSchema, expected=LiteralType)
def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[Compounds1D]) -> Compounds1D:
data = self._transformer.to_python_value(ctx, lv, expected_python_type=FlyteSchema)
df = data.open().all()
return Compounds1D(df)
TypeEngine.register(
Compounds1DTransformer()
)
In this example, Compounds1D
is a Pydantic model wrapping a dataframe called df
. I call the workflow via FlyteRemote.execute
, and as far as I can tell, the Compounds1D
type is being passed around between tasks correctly. However, when I fetch the results using FlyteWorkflowExecution.outputs
the output comes back as a FlyteSchema
. Does FlyteRemote
not understand custom types?Fhuad Balogun
02/21/2023, 1:52 PMGitHub
02/21/2023, 11:58 PM{"json":{"exec_id":"atfkcwwv5cfr7wzhqq94","node":"n1","ns":"flytesnacks-
development","res_ver":"5501760","routine":"worker-3","tasktype":"python-
task","wf":"flytesnacks:development:<http://example_test.wf|example_test.wf>"},"level":"error","msg":"DataCatalog failed to get outputs from
artifact 45bd1d68-a013-43b1-a56b-b7597b559125, err: unexpected artifactData: [o0] type:
[structured_dataset_type:\u003c\u003e ] does not match any task output type:
[structured_dataset_type:\u003cformat:\"parquet\" \u003e ]","ts":"2022-09-12T06:56:41Z"}
When the cache is enabled, we'll retrieve artifacts from datacatalog and check if the structured dataset's schema and format match the expected type.
However, the default format of the structured dataset in the expected type is always Parquet
, but the format of the output structured dataset is ""
.
@task(cache=True, cache_version="1.0")
def t2() -> StructuredDataset: # The default format of structured dataset is Parquet here
df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
return StructuredDataset(df, uri=bq_uri) # The format of structured dataset is ""
Two ways to fix it.
1. Change these lines to
if len(structuredDatasetType.Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
return false
}
2. Change the default format of the expected type to "" in flytekit, and change these lines to the below. However, it will break existing users. If users upgrade flytekit, they have to upgrade the propeller as well.
if len(t.literalType.GetStructuredDatasetType().Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
return false
}
structuredDatasetType
is input type
t.literalType.GetStructuredDatasetType()
is expected type
Expected behavior
BQ task should run successfully even if the cache is enabled
Additional context to reproduce
import uuid
import pandas as pd
from typing_extensions import Annotated
from flytekit import task, workflow, StructuredDataset, kwtypes
@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
df = pd.DataFrame({
"name": ["dylan", "steve"],
"age": [33, 32]
})
return StructuredDataset(df)
@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> StructuredDataset:
df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
table_id = str(uuid.uuid4())
bq_uri = f"<bq://flyte-test-340607.dataset>.{table_id}"
return StructuredDataset(df, uri=bq_uri)
@workflow
def wf() -> StructuredDataset:
return t2(sd=t1())
if __name__ == "__main__":
wf()
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
02/22/2023, 10:24 PMimage▾
GitHub
02/23/2023, 1:58 AMPradithya Aria Pura
02/24/2023, 7:17 AMwith_override
feature in flytekit. The example demonstrate overriding task’s cpu & memory limits.
@workflow
def my_pipeline(x: typing.List[int]) -> int:
return square_1(x=count_unique_numbers_1(x=x)).with_overrides(
limits=Resources(cpu="6", mem="500Mi")
)
Can it be done by using workflow input. e.g.
@workflow
def my_pipeline(x: typing.List[int], cpu: str, mem: str) -> int:
return square_1(x=count_unique_numbers_1(x=x)).with_overrides(
limits=Resources(cpu=cpu, mem=mem)
)
?GitHub
02/27/2023, 10:09 PMfrom typing import List
from pydantic import BaseModel
class Foo(BaseModel):
count: int
size: float = None
Support as a valid transform
Describe alternatives you've considered
dataclasses are already supported, but they allow limited extensibility to schema extraction as they use marshmallow underneath
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyte