Frank Shen
04/05/2023, 5:33 PMEduardo Matus
04/06/2023, 12:37 AMFlyte User Role
Finally create a role for Flyte users. This is the role that user pods will end up assuming when Flyte kicks them off.
Create a role flyte-user-role from the IAM console. Select "AWS service" again for the type, and EC2 for the use case. Also add the AmazonS3FullAccess policy for now.
The flyte user role is the one the docker image will use to access resources (like S3, elasticsearch, etc?)Pryce
04/06/2023, 3:52 AMBroder Peters
04/06/2023, 6:26 AMFelix Ruess
04/06/2023, 12:08 PMJan Fiedler
04/06/2023, 1:13 PMLouis DiNatale
04/06/2023, 7:59 PMPryce
04/07/2023, 5:34 AMfrom typing import List
from flytekit import kwtypes, workflow, dynamic
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.file import FlyteFile
s1 = ShellTask(
name="shorten",
debug=True,
script="""
set -ex
head {inputs.infile} > {outputs.i}
""",
inputs=kwtypes(infile=FlyteFile),
output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="outfile.txt")],
)
@dynamic
def shorten_files(files_in: List[FlyteFile]) -> str:
for i in range(len(files_in)):
s1(infile=files_in[i])
return "DONE"
@workflow
def wf(files_in: List[FlyteFile]) -> str:
return shorten_files(files_in=files_in)
if __name__ == "__main__":
print(f"Running wf() {wf()}")
.. invoked with:
pyflyte run --remote dynamic_shell.py wf --files_in '["xaa", "xab"]'
Any help would be greatly appreciated!Ena Škopelja
04/07/2023, 5:02 PMOIDC
userAuth
(and Self
appAuth
if it's relevant), meaning any time I want to register a workflow or list a resource with flyte a browser window gets opened to complete the auth flow. I'm trying to set up a service that will be able to run workflows but won't have access to a browser. I was looking around the authentication page in the docs but I don't really understand which clientId
/ clientSecret
does it refer to in my case. Has anyone set this up before?seunggs
04/07/2023, 7:25 PMPryce
04/07/2023, 7:44 PMPryce
04/07/2023, 7:48 PMk logs ff1da0f5f07414327b85-n0-0 -n flytesnacks-development
@seunggsFrank Shen
04/07/2023, 9:15 PM@dynamic(
requests=Resources(cpu="4", mem="20Gi"),
)
def train_foreach_tenure_small(
df: pd.DataFrame,
) -> None:
for tenure in range(1, 3, 1):
data = df[df['TENURE']==tenure]
xgbse_training(df)
@task(requests=Resources(mem="5Gi"))
def xgbse_training()
Frank Shen
04/07/2023, 9:17 PMFrank Shen
04/07/2023, 9:36 PMFrank Shen
04/07/2023, 9:37 PMRyan Kuk
04/08/2023, 12:06 AMPryce
04/09/2023, 8:36 PMYicheng Lu
04/10/2023, 3:42 AMcurl -sL <https://ctl.flyte.org/install|https://ctl.flyte.org/install> | sudo bash -s -- -b /usr/local/bin
flytectl demo start
export FLYTECTL_CONFIG=/root/.flyte/config-sandbox.yaml
# then, pyflyte run --remote ./test.py wf
It is unlikely due to insufficient memory since any tasks can cause the above problem. Any hint on this? thank you in advance!Eric Song
04/10/2023, 2:52 PMAnnotated HashMethod
to the input types of a task instead of the output. Then, the HashMethod
can be run on the inputs when a cached task is invoked to determine the cache keys and then check if there's a hit.
I tried this out and it kind of works for local caching? but not for remote.
@task(cache=True, cache_version="1.0")
def my_task(obj: ty.Annotated[pd.DataFrame, HashMethod(hash_df)]) -> pd.DataFrame:
redis_client.incr(REDIS_COUNTER_KEY)
return obj
@dynamic
def my_workflow():
obj = pd.DataFrame(
{
"name": ["a", "b"],
"val": ["test1", "test2"],
}
)
obj = my_task(obj=obj)
obj = my_task(obj=obj)
my_task(obj=obj)
In the example above my_task
will be be called three times the first time my_workflow
is called. This still doesn't match my expectation since I thought it would be called once for the first call and for the cache to be hit on the second and third call since the input obj
is the same.
However! The second run of my_workflow
has a cache hit for all three calls to my_task
so it does work to some extent even though I don’t fully understand what it’s doing.
For remote caching, this doesn’t seem to work at all and there are no cache hits no matter how many times I run my_workflow
.Rahul Mehta
04/10/2023, 9:59 PMflytekit
versions? We have some other dependencies blocking upgrading flytekit
presently, but would like to keep the backend services as up-to-date as possibleSebastian Büttner
04/11/2023, 8:28 AMBryan Weber
04/11/2023, 2:19 PMNan Qin
04/11/2023, 3:35 PMFrank Shen
04/11/2023, 5:11 PM@workflow
def wf_train_parallel():
for tenure in range(1, 25, 1):
xgb_train(tenure=tenure)
downstream_task()
...
@task
def xgb_train(tenure: int)
...
@task
def downstream_task()
...
Niels Bantilan
04/11/2023, 5:59 PM@channel
, the core Flyte team has heard reports of long import times when simply running a script locally that does import flytekit
(the number I’ve heard of is ~8 seconds).
I’ve been unable to reproduce this locally, but can anyone with this experience provide instructions on how to reproduce this behavior? We’d like to look into reducing this import time.justin hallquist
04/11/2023, 6:25 PMViljem Skornik
04/11/2023, 8:45 PMLaunchPlan.get_or_create(… max_parallelism=100)
?Ketan (kumare3)
Jeongwon Song
04/11/2023, 11:01 PM