GitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMCode Example
is redundant and can be removed.
[Example page (https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/index.html#code-examples).
☐ Replace caution
with warning
.
☐ Indicate Tip
is optional.
☐ Look into headers in the docs and come up with a common syntax for all the headers [H1 to H4].
☐ Change all headers and sub-headers to Title case.
☐ Change the content to simple, active, and direct language.
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMdesign doc
in Note
section of https://docs.flyte.org/projects/flytekit/en/latest/design/authoring.html should show a doc, not a page that leads to access request.
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMflytectl demo start --verbose
flytectl sandbox start --verbose
Have similar flags during teardown of the environments
flytectl demo teardown --verbose
flytectl sandbox teardown --verbose
What if we do not do this?
Longer debugging cycles
Related component(s)
flytectl,flyte
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMpyflyte run
. We would like to at least consider supporting mixed-mode execution, where a locally declared workflow has tasks that run remotely and tasks that run locally.
image▾
GitHub
11/02/2023, 9:25 PM@task
decorated task in a Jupyter notebook, and be able to register and run it from the same Jupyter notebook.
Details
This is part of the data-scientist first story we are pursuing in H2 2022 - continuing to tie together as seamlessly as possible the local/back-end execution story in support of a data science first iteration cycle.
This should probably happen through the FlyteRemote experience. We are thinking of using cloud pickle to pickle the task and ship it off to S3 much like how script mode works.
Some questions to think about
• Does cloud pickle actually work? What are the limitations and how can we detect/prevent the user from making them (for example, cloud pickle probably doesn't work if you're pickling a function that has an import statement inside it). We should at least list out all these limitations and make the user aware of them.
• What will be the task command, specifically how will the task resolver bit work? Unlike in script-mode, which merely updated the code and relied on the same usual python-module/task-finding mechanisms as in a regular non-fast run, this will not only need to download the file of the pickled code from S3, it will also need to unpickle it as part of the resolver process. There's already a cloudpickle resolver, can we use that?
Playing around with cloudpickle
Testing notes
Cursory testing was done by going back and forth between a jupyter notebook with one virtual environment and running code in PyCharm with another.
The version of Python has to match (from docs). The version of cloudpickle also needs to match. Had issues between v2.0 and v2.1 of cloudpickle where something written by 2.1 was not readable by 2.0. Did not try the other way around, but probably best to not assume it will always work. We should just aim for minor version matching.
If in the jupyter notebook I did from dyn_task import t1
and then used t1
inside a dynamic task, and I then pickled the dynamic task, both the jupyter notebook and the Pycharm instance were able to unpickle and run it. If I did import dyn_task
and then did dyn_task.t1
in the dynamic task, and then pickled it, it only worked in Jupyter, not in Pycharm.
If you then add cloudpickle.register_pickle_by_value(dyn_task)
then again both work.
Resources
• GH repo for cloudpickle. The serialize by value or reference discussion is relevant.
• The background section of this stackoverflow thread is a good short read.
Misc
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMimage▾
#url = "<https://github.com/smadarab/flytelab/raw/main/census.csv>" # Make sure the url is the raw version of the file on GitHub
url="<https://github.com/smadarab/flytelab/raw/main/census> 2.csv"
download = requests.get(url).content
df = pd.read_csv(io.StringIO(download.decode('utf-8')),sep=',')
print("df is created",df.columns)
#df.dropna(inplace=True)
#df = df.reset_index()
print(df.columns)
return(df)
@task
def plot_dataset(df:pd.DataFrame) -> pyplot:
x = df['age'].value_counts().index
y = df['age'].value_counts().values
fig = pyplot.figure(figsize =(10, 7))
# Horizontal Bar Plot
pyplot.bar(x,y)
# Show Plot
#pyplot.show()
return(fig)
#@task
#def clean_dataset() -> pd.DataFrame:
pass
#@task
#def transform_cat() -> pd.DataFrame:
pass
#@task
#def transform_num() -> pd.DataFrame:
pass
@task
def train_model(train: pd.DataFrame) -> Tuple[AdaBoostClassifier,OneHotEncoder,MinMaxScaler]:
num_cols = ['age', 'education-num', 'capital-gain',
'capital-loss', 'hours-per-week']
cat_cols = ['workclass',
'marital-status', 'occupation',
'relationship', 'race',
'sex', 'native-country']
log_transform_cols = ['capital-loss', 'capital-gain']
def get_cat_cols(X):
return X[cat_cols]
def get_num_cols(X):
return X[num_cols]
def get_log_transform_cols(X):
return X[log_transform_cols]
def get_dummies(X):
print('\n \n',type(X))
return pd.get_dummies(pd.DataFrame(X))
def cat_imputer(X):
print(X.shape)
return(imputer_cat.fit_transform(X))
#return X.apply(lambda col: imputer_cat.fit_transform(col))
def one_hot_encode(X):
print("one hot encode")
ohe = OneHotEncoder(handle_unknown = 'ignore')
#print("DF:: ")
ohe.fit(pd.DataFrame(X))
global hi
hi=ohe
#dump(ohe, 'onehot.joblib')
return ohe.transform(pd.DataFrame(X)).toarray()
def min_max_scaling(X):
scaler = MinMaxScaler()
scaler.fit(X)
global scale
scale=scaler
return scaler.transform(pd.DataFrame(X)).tolist()
log_transform_pipeline = Pipeline([
('get_log_transform_cols', FunctionTransformer(get_log_transform_cols, validate=False)),
('imputer', SimpleImputer(strategy='mean')),
('log_transform', FunctionTransformer(np.log1p))
])
num_cols_pipeline = Pipeline([
('get_num_cols', FunctionTransformer(get_num_cols, validate=False)),
('imputer', SimpleImputer(strategy='mean')),
('min_max_scaler', FunctionTransformer(min_max_scaling, validate=False))
])
cat_cols_pipeline = Pipeline([
('get_cat_cols', FunctionTransformer(get_cat_cols, validate=False)),
('imputer', SimpleImputer(strategy="most_frequent")),
# ('get_dummies', FunctionTransformer(get_dummies, validate=False))
('one_hot_encode', FunctionTransformer(one_hot_encode, validate=False))
])
steps_ = FeatureUnion([
('log_transform', log_transform_pipeline),
('num_cols', num_cols_pipeline),
('cat_cols', cat_cols_pipeline)
])
full_pipeline = Pipeline([('steps_', steps_)])
y = train['income'].map({'<=50K': 0, '>50K': 1})
X = full_pipeline.fit_transform(train)
model = AdaBoostClassifier(n_estimators=300)
X_train, X_test, y_train, y_test = train_test_split(X, y,shuffle=False)
model = model.fit(X_train, y_train)
return model,hi,scale
#@task
#def test_model() -> pd.DataFrame:
pass
#@workflow
#def main1() -> pd.DataFrame:
return get_dataset()
@workflow
def main() -> pyplot:
return plot_dataset(df=get_dataset())
#@workflow
#def main() -> Tuple[AdaBoostClassifier,OneHotEncoder,MinMaxScaler]:
return train_model(train=get_dataset())
if name == "*main*":
print(f"trained model: {main()}")
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMPythonFunctionTask
and PythonFunctionWorkflow
derive their names implicitly from the module and function where they are defined. While this is convenient, it makes refactoring difficult as names often need to be stable identifiers for downstream consumption. Proposal is to include and optional name
parameter to the decorators which allows overriding this behavior.
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMpyflyte run
and FlyteRemote
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PM$ docker pull ubuntu@sha256:82becede498899ec668628e7cb0ad87b6e1c371cb8a1e597d83a47fac21d6af3
When working in Flyte, users should be able to do the same.
Questions
Should users be able to specify both the tag and a sha for it?
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMScreenshot 2022-04-12 at 10 45 08 AM▾
GitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMflytectl get workflow -p flytesnacks -d development
Error: rpc error: code = Unknown desc = failed database operation with column "workflows.created_at" must appear in the GROUP BY clause or be used in an aggregate function
Similarly it breaks on passing a field selector too
flytectl get workflow -p flytesnacks -d development --filter.fieldSelector "state=0"
Error: rpc error: code = Unknown desc = failed database operation with column "workflows.created_at" must appear in the GROUP BY clause or be used in an aggregate function
It returns correctly in case of project, domain, name
flytectl get workflow -p flytesnacks -d development --filter.fieldSelector "state=0" --filter.sortBy "project"
--------------- ------------- ------------------------------------------------------------- ------------- -------
| PROJECT (45) | DOMAIN | NAME | DESCRIPTION | STATE |
--------------- ------------- ------------------------------------------------------------- ------------- -------
| flytesnacks | development | core.containerization.use_secrets.my_secret_workflow | | |
--------------- ------------- ------------------------------------------------------------- ------------- -------
| flytesnacks | development | core.control_flow.chain_tasks.chain_tasks_wf | | |
--------------- ------------- ------------------------------------------------------------- ------------- -------
Expected behavior
Named Entities should be able to sorted by created_at
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMnp.ndarray
type, but it doesn't support the np.typing.NDArray
generic: https://numpy.org/devdocs/reference/typing.html#numpy.typing.NDArray
This would enable flytekit users to express the actual datatypes of an ndarray.
Goal: What should the final outcome look like, ideally?
The user-facing API would be simple:
import numpy
import numpy.typing as npt
@task
def t1(input: npt.NDArray[np.float64]) -> npt.NDArray[np.int64]:
...
Describe alternatives you've considered
Keeping the status quo, only supporting np.ndarray
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMflyteplugins-vaex
supports automatic serialization and deserialization of vaex dataframe between consecutive tasks using parquet flyteorg/flytekit#1230
It would be good to extend this to HDF5 and arrow for performance and interoperability, when data sets are too large to fit into memory https://vaex.readthedocs.io/en/latest/faq.html#What-is-the-optimal-file-format-to-use-with-vaex
Goal: What should the final outcome look like, ideally?
Register extra handlers VaexDataFrameToHDF5EncodingHandler
and VaexDataFrameToArrowEncodingHandler
, so users can use Annotated
to update the default format:
@task
def t1(f: vaex.dataframe.DataFrameLocal) -> Annotated[StructuredDataset, HDF5]
@task
def t2(f: vaex.dataframe.DataFrameLocal) -> Annotated[StructuredDataset, Arrow]
Describe alternatives you've considered
N/A
Propose: Link/Inline OR Additional context
See discussion thread here flyteorg/flytekit#1230 (comment)
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMimage▾
image▾
GitHub
11/02/2023, 9:26 PM==============================================
set -ex
echo "Hey there! Let's run some bash scripts using Flyte's ShellTask."
echo "Showcasing Flyte's Shell Task." >> /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
if grep "Flyte" /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
then
echo "Found it!" >> /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
else
echo "Not found!"
fi
==============================================
+ echo 'Hey there! Let'\''s run some bash scripts using Flyte'\''s ShellTask.'
Hey there! Let's run some bash scripts using Flyte's ShellTask.
+ echo 'Showcasing Flyte'\''s Shell Task.'
+ grep Flyte /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
Showcasing Flyte's Shell Task.
+ echo 'Found it!'
Traceback (most recent call last):
File "shell.py", line 98, in <module>
print(f"Running wf() {wf()}")
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/workflow.py", line 237, in __call__
return flyte_entity_call_handler(self, *args, **input_kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/promise.py", line 991, in flyte_entity_call_handler
result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/workflow.py", line 252, in local_execute
function_outputs = self.execute(**kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/workflow.py", line 685, in execute
return exception_scopes.user_entry_point(self._workflow_function)(**kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point
return wrapped(*args, **kwargs)
File "shell.py", line 91, in wf
t1_out = t1(x=x)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/base_task.py", line 288, in __call__
return flyte_entity_call_handler(self, *args, **kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/promise.py", line 984, in flyte_entity_call_handler
return cast(LocallyExecutable, entity).local_execute(ctx, **kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/base_task.py", line 269, in local_execute
outputs_literal_map = self.dispatch_execute(ctx, input_literal_map)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/base_task.py", line 520, in dispatch_execute
raise TypeError(f"Output({k}) in task{self.name} received a tuple {v}, instead of {py_type}")
TypeError: Output(i) in tasktask_1 received a tuple (/tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt, /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt), instead of <class 'flytekit.types.file.file.FlyteFile'>
Task 1 is apparently returning a tuple of length 2, with the two items being identical (the same file). We expect a single output from this shell task. We have already investigated this issue briefly and believe the cause is here
If we add print statements, we see that self._output_locs
has length 1 as expected and we can see that self._output_locs
contains:
[OutputLocation(var='i', var_type=<class 'flytekit.types.file.file.FlyteFile'>, location='{inputs.x}')]
<class 'flytekit.extras.tasks.shell.OutputLocation'>
However, the two if statements both resolve as True
.
image▾
FlyteFile
and FlyteDirectory
are inheriting from. I am new to the code base so it is not immediately obvious what the solution would be but using issubclass()
is not sufficient for determining if the var_type
is FlyteFile
or FlyteDirectory
.
Perhaps a simple solution would be:
for v in self._output_locs:
if v.var_type is FlyteFile:
final_outputs.append(FlyteFile(outputs[v.var]))
elif v.var_type is FlyteDirectory:
final_outputs.append(FlyteDirectory(outputs[v.var]))
EDIT UPDATE:
Discovered this issue occurs on python 3.7.6 as well as 3.8.0 but not in 3.8.5
3.8.0
Python 3.8.0 (default, Nov 6 2019, 15:49:01)
Type 'copyright', 'credits' or 'license' for more information
IPython 8.1.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from flytekit.types.directory import FlyteDirectory
...:
...: from flytekit.types.file import FlyteFile
In [2]: issubclass(FlyteDirectory, FlyteFile)
Out[2]: True
In [3]: issubclass(FlyteFile, FlyteDirectory)
Out[3]: True
3.8.5
Python 3.8.5 (default, Sep 4 2020, 02:22:02)
Type 'copyright', 'credits' or 'license' for more information
IPython 8.1.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from flytekit.types.directory import FlyteDirectory
...:
...: from flytekit.types.file import FlyteFile
In [2]: issubclass(FlyteDirectory, FlyteFile)
Out[2]: False
In [3]: issubclass(FlyteFile, FlyteDirectory)
Out[3]: False
EDIT UPDATE 2
Issue looks to have been raised and patched in 3.7.7 and 3.8.2 but restricts usage of ShellTask
(in its current state) to those versions
https://bugs.python.org/issue38878
Expected behavior
The expected behavior is for the example ShellTasks
and workflow
to actually execute and run to completion
Additional context to reproduce
1. Install python 3.7.6
2. Copy the example shell tasks and workflow into a file: shell.py
3. Run locally using python shell.py
Screenshots
```
==============================================
set -ex
echo "Hey there! Let's run some bash scripts using Flyte's ShellTask."
echo "Showcasing Flyte's Shell Task." >> /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
if grep "Flyte" /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
then
echo "Found it!" >> /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
else
echo "Not found!"
fi
==============================================
+ echo 'Hey there! Let'\''s run some bash scripts using Flyte'\''s ShellTask.'
Hey there! Let's run some bash scripts using Flyte's ShellTask.
+ echo 'Showcasing Flyte'\''s Shell Task.'
+ grep Flyte /tmp/flyte/20220317_112528/raw/cfa6f2d613e427d0e0ff8396bd830539/test.txt
Showcasing Flyte's Shell Task.
+ echo 'Found it!'
Traceback (most recent call last):
File "shell.py", line 98, in <module>
print(f"Running wf() {wf()}")
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/workflow.py", line 237, in call
return flyte_entity_call_handler(self, *args, **input_kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/promise.py", line 991, in flyte_entity_call_handler
result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs)
File "/Users/mikezhong/opt/anaconda3/envs/pipeline/lib/python3.7/site-packages/flytekit/core/wo…
flyteorg/flyteGitHub
11/02/2023, 9:26 PMflytectx
maybe?) to manage different authenticated sessions and possibly flytectl configs and make switching between different flyte deployment environments seamless.
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyte