https://flyte.org logo
#ask-the-community
Title
# ask-the-community
r

Rupsha Chaudhuri

12/29/2022, 8:13 PM
Hi team.. so flytekit needs pyarrow >= 4.0.0 except pyarrow>2.0.0 (required by awswrangler) has this issue that I’m currently running into when trying to chunk and read a large parquet file.. Has anyone else in the group encountered this and if yes how did you get around it?
k

Kevin Su

12/29/2022, 9:35 PM
is there any reason why don’t use structured dataset? it does blob operation underlying for you (read/write parquet file).
r

Rupsha Chaudhuri

12/29/2022, 9:42 PM
oh.. I didn’t realize I had that option.. is there some sample code on how to populate it from a file/folder in s3?
uri= ?
is that it?
k

Kevin Su

12/29/2022, 9:43 PM
yup
r

Rupsha Chaudhuri

12/29/2022, 9:44 PM
what about aws credentials? how do I pass those?
k

Kevin Su

12/29/2022, 9:44 PM
sd = structuredDataset(uri=<s3_path>)
edit flytepropeller config map, update the default env
Copy code
default-cpus: 100m
        default-env-vars:
        - FLYTE_AWS_ENDPOINT: <http://minio.flyte.svc.cluster.local:9000>
        - FLYTE_AWS_ACCESS_KEY_ID: minio
        - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
r

Rupsha Chaudhuri

12/29/2022, 9:45 PM
that wouldn’t work for us 😞
our aws credentials are coming from vault
to access the s3 buckets that have the files we are interested in processing
k

Kevin Su

12/29/2022, 9:48 PM
cc @Yee do you know what’s the best practice to read credential from vault in flytekit?
r

Rupsha Chaudhuri

12/29/2022, 9:48 PM
We are using Secret in task
the question is how to pass it to StructuredDataset
initialization
k

Kevin Su

12/29/2022, 10:02 PM
flytekit downloads input proto from s3 while running the task, so the pod should already have access to s3 bucket. if not, how do you download the input file from s3?
r

Rupsha Chaudhuri

12/29/2022, 10:03 PM
I was using awswrangler to read the parquet file from s3.. it takes a boto3_session as a parameter which I instantiate with the right credentials
flyte has access to whatever bucket it needs for its blob storage… for all other artifacts I provide credentials (usually boto3)
k

Kevin Su

12/29/2022, 10:08 PM
I means you don’t need to pass credentials to the pods by yourself. pods should already have that. Otherwise, the pod will fail to download the task input (input can be any python type, int, str).
y

Yee

12/29/2022, 10:08 PM
are these in different buckets?
so you need another s3 connection to download data that you want? and those credentials are only available in vault?
r

Rupsha Chaudhuri

12/29/2022, 10:09 PM
yes.. and available to the pod using “secrets”
flytekit.secrets
y

Yee

12/29/2022, 10:09 PM
that is a bit tricky… flytekit only assumes one s3 connection.
whatever credentials are used to download inputs are the same ones the default structured dataset readers and writers will use.
you can of course replace those.
r

Rupsha Chaudhuri

12/29/2022, 10:10 PM
argh.. that won’t work for us 😞
unless I use the credentials to download the files locally before use..
y

Yee

12/29/2022, 10:11 PM
let me see
r

Rupsha Chaudhuri

12/29/2022, 10:12 PM
feature request: StructuredDataset also takes an optional boto3_session parameter and uses that if it’s provided 🙂
y

Yee

12/29/2022, 10:12 PM
just out of curiousity… how long have you had boto3 as a pip dependency?
(i only ask because i’ve run into problems with it in the past)
r

Rupsha Chaudhuri

12/29/2022, 10:13 PM
a lot of our workflows have this flow… i.e. working with files in s3 that flyte directly doesn’t have access to
I don’t use s3fs so that hasn’t been an issue yet
y

Yee

12/29/2022, 10:14 PM
sweet
cc @Ketan (kumare3)
can we get back to you tomorrow on how best to do this?
r

Rupsha Chaudhuri

12/29/2022, 10:15 PM
sure.. until then I’ll continue building and testing the workflow with my smaller dataset
thanks @Yee and @Kevin Su
k

Ketan (kumare3)

12/29/2022, 11:44 PM
hi @Rupsha Chaudhuri are you around for a quick chat?
r

Rupsha Chaudhuri

12/29/2022, 11:45 PM
sure
@Kevin Su for a very large parquet dataset (100 million records), is it possible to read the data using StructuredDataset in chunks so that the task does not run out of memory?
or do I need to manually handle the chunking and reading the files 1 by 1?
k

Kevin Su

01/09/2023, 8:06 PM
and use
sd.open(pd.Dataframe).iter()
to get the iterator
@Yee we don’t have any transformer that returns iterator. I think we should add some defaults for pandas and spark
y

Yee

01/09/2023, 8:26 PM
yes
we should.
can we chat about this for five later today?
cc @Niels Bantilan
k

Kevin Su

01/09/2023, 8:35 PM
Yes
n

Niels Bantilan

01/09/2023, 8:43 PM
@Rupsha Chaudhuri @Kevin Su here’s an implementation of an iterable reader: https://github.com/flyteorg/flyte-demos/blob/main/flyte_demo/workflows/data_iter.py We should consider adding it to the default implementation
[flyte-core]
r

Rupsha Chaudhuri

01/09/2023, 8:44 PM
oh thank you.. I was trying to figure out how to do this
n

Niels Bantilan

01/09/2023, 8:46 PM
it’s a little bit hacky (see the partition_col is manually injected here) so we should generalize this so that the partition column is a valid kwarg in
StructuredDataset
initializer
[flyte-core]
9 Views