seunggs
12/27/2022, 5:54 PM/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py:323: FutureWarning: In the future `np.bool` will be defined as the corresponding NumPy scalar. (This may have returned Python scalars in past versions.
_np.bool: SchemaType.SchemaColumn.SchemaColumnType.BOOLEAN, # type: ignore
Traceback (most recent call last):
File "/opt/venv/bin/pyflyte", line 5, in <module>
from flytekit.clis.sdk_in_container.pyflyte import main
File "/opt/venv/lib/python3.10/site-packages/flytekit/__init__.py", line 195, in <module>
from flytekit.types import directory, file, numpy, schema
File "/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/__init__.py", line 1, in <module>
from .types import (
File "/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py", line 313, in <module>
class FlyteSchemaTransformer(TypeTransformer[FlyteSchema]):
File "/opt/venv/lib/python3.10/site-packages/flytekit/types/schema/types.py", line 323, in FlyteSchemaTransformer
_np.bool: SchemaType.SchemaColumn.SchemaColumnType.BOOLEAN, # type: ignore
File "/opt/venv/lib/python3.10/site-packages/numpy/__init__.py", line 284, in __getattr__
raise AttributeError("module {!r} has no attribute "
AttributeError: module 'numpy' has no attribute 'bool'. Did you mean: 'bool_'?
Kevin Su
12/27/2022, 6:45 PMseunggs
12/27/2022, 7:51 PMflytekit==1.2.3
Yee
seunggs
12/27/2022, 8:02 PMgrpcio-status<1.49.0
flytekit==1.2.3
flytekit==1.2.7
now?Yee
seunggs
12/27/2022, 8:03 PMYee
seunggs
12/28/2022, 5:16 PMYee
Niels Bantilan
12/30/2022, 7:05 PMseunggs
12/30/2022, 7:29 PMsignal
node. Is that just a custom node to use as a flag or is there something built in in Flyte?Niels Bantilan
12/30/2022, 7:39 PMseunggs
12/30/2022, 7:41 PMSidharth(Sid)
01/19/2023, 10:41 AMNiels Bantilan
01/19/2023, 2:51 PMWhereas Flyte on the other hand has caching at both workflow and task levelsCaching works with `@task`s and
@dynamic
workflows. Currently, caching is not supported for static workflows
You can also version the caches if needed.Yes, Flyte’s opinion is it’s too complicated trying to figure out if a task’s upstream dependencies have changed (which could potentially live in other modules, etc), so you can use any version string to version the cache.
Flyte cache can be based on the hash of the input and output of tasks as well.The main use case for a user-defined hash method for inputs is for blob-store-serialized objects like files, directories, dataframes, pickle files, etc. In this case, you need to define a
HashMethod
, which will incur some runtime cost as Flyte computes the hash of, e.g. a dataframe.Sidharth(Sid)
01/19/2023, 3:04 PMHashMethod
means "both the input parameters to the function, and the output" ie., DF/files/filepaths/pickles etc will be hashed and stored in Flyte storage (which in aws is an S3 bucket). Am I understanding this correctly?
This means if I run a large spark based "task1" , and then the next "task2" requires "task1"s output for some operation, using HashMethod and potentially "cache_version" , I can run a workflow multiple times for evaluating "task2" which takes "task1"s cached output right?
Basically Im trying to say that "Hashed and versioned tasks" could potentially avoid multiple writes to disk (output_1.csv, output2.csv etc) while a data science/data engineering "task2" is being ideated/refined?
I have one more question, if my above understanding is correct. So please do clarify.Niels Bantilan
01/19/2023, 4:56 PMHashMethod
annotated outputs (e.g. for files, dataframes) will calculate a hash key based on the user-defined hash function, and this key will be used as the cache key.
Assume task1
produces this output, when the output is passed into downstream task task2
, the hash key will be used to determine whether or not to re-run task2
or just hit the cache to return the pre-computed value.
This means if I run a large spark based “task1” , and then the next “task2" requires “task1”s output for some operation, using HashMethod and potentially “cache_version” , I can run a workflow multiple times for evaluating “task2” which takes “task1"s cached output right?correct
Basically Im trying to say that “Hashed and versioned tasks” could potentially avoid multiple writes to disk (output_1.csv, output2.csv etc) while a data science/data engineering “task2" is being ideated/refined?correct
task1
is a relatively cheap spark job that produces a parquet file. The output of this task has a HashMethod
so has a cache key associated with the output.
• task2
is an expensive data processing spark job that depends on task1
, and is set to cache=True
with a cache_version="1"
• assuming that cache_version
stays the same and the output of task1
produces the same cache key, the first invocation of task2
will run it, but subsequent invocations will hit the cache. However, since task1
doesn’t have cache=True
, task1
will always run.
Now if task1
is also cached based on some primitive datatype inputs (like datetime
, int
, str
, etc), then task1
will not be run (avoiding multiple writes to disk) if a cache key for the output already exists.Sidharth(Sid)
01/19/2023, 5:29 PMKetan (kumare3)
Niels Bantilan
01/19/2023, 6:44 PMSidharth(Sid)
01/19/2023, 7:16 PMYee
Niels Bantilan
01/19/2023, 7:32 PMSlackbot
01/19/2023, 7:32 PMNiels Bantilan
01/19/2023, 7:34 PMKetan (kumare3)
Sidharth(Sid)
01/20/2023, 8:54 AMPeeter Piegaze
01/20/2023, 1:37 PMSidharth(Sid)
01/20/2023, 2:16 PMKetan (kumare3)
Sidharth(Sid)
01/21/2023, 7:13 AM