Hi team.. so flytekit needs pyarrow >= 4.0.0 ex...
# ask-the-community
r
Hi team.. so flytekit needs pyarrow >= 4.0.0 except pyarrow>2.0.0 (required by awswrangler) has this issue that I’m currently running into when trying to chunk and read a large parquet file.. Has anyone else in the group encountered this and if yes how did you get around it?
k
is there any reason why don’t use structured dataset? it does blob operation underlying for you (read/write parquet file).
r
oh.. I didn’t realize I had that option.. is there some sample code on how to populate it from a file/folder in s3?
uri= ?
is that it?
k
yup
r
what about aws credentials? how do I pass those?
k
sd = structuredDataset(uri=<s3_path>)
edit flytepropeller config map, update the default env
Copy code
default-cpus: 100m
        default-env-vars:
        - FLYTE_AWS_ENDPOINT: <http://minio.flyte.svc.cluster.local:9000>
        - FLYTE_AWS_ACCESS_KEY_ID: minio
        - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
r
that wouldn’t work for us 😞
our aws credentials are coming from vault
to access the s3 buckets that have the files we are interested in processing
k
cc @Yee do you know what’s the best practice to read credential from vault in flytekit?
r
We are using Secret in task
the question is how to pass it to StructuredDataset
initialization
k
flytekit downloads input proto from s3 while running the task, so the pod should already have access to s3 bucket. if not, how do you download the input file from s3?
r
I was using awswrangler to read the parquet file from s3.. it takes a boto3_session as a parameter which I instantiate with the right credentials
flyte has access to whatever bucket it needs for its blob storage… for all other artifacts I provide credentials (usually boto3)
k
I means you don’t need to pass credentials to the pods by yourself. pods should already have that. Otherwise, the pod will fail to download the task input (input can be any python type, int, str).
y
are these in different buckets?
so you need another s3 connection to download data that you want? and those credentials are only available in vault?
r
yes.. and available to the pod using “secrets”
flytekit.secrets
y
that is a bit tricky… flytekit only assumes one s3 connection.
whatever credentials are used to download inputs are the same ones the default structured dataset readers and writers will use.
you can of course replace those.
r
argh.. that won’t work for us 😞
unless I use the credentials to download the files locally before use..
y
let me see
r
feature request: StructuredDataset also takes an optional boto3_session parameter and uses that if it’s provided 🙂
y
just out of curiousity… how long have you had boto3 as a pip dependency?
(i only ask because i’ve run into problems with it in the past)
r
a lot of our workflows have this flow… i.e. working with files in s3 that flyte directly doesn’t have access to
I don’t use s3fs so that hasn’t been an issue yet
y
sweet
cc @Ketan (kumare3)
can we get back to you tomorrow on how best to do this?
r
sure.. until then I’ll continue building and testing the workflow with my smaller dataset
thanks @Yee and @Kevin Su
k
hi @Rupsha Chaudhuri are you around for a quick chat?
r
sure
@Kevin Su for a very large parquet dataset (100 million records), is it possible to read the data using StructuredDataset in chunks so that the task does not run out of memory?
or do I need to manually handle the chunking and reading the files 1 by 1?
k
and use
sd.open(pd.Dataframe).iter()
to get the iterator
@Yee we don’t have any transformer that returns iterator. I think we should add some defaults for pandas and spark
y
yes
we should.
can we chat about this for five later today?
cc @Niels Bantilan
k
Yes
n
@Rupsha Chaudhuri @Kevin Su here’s an implementation of an iterable reader: https://github.com/flyteorg/flyte-demos/blob/main/flyte_demo/workflows/data_iter.py We should consider adding it to the default implementation
[flyte-core]
r
oh thank you.. I was trying to figure out how to do this
n
it’s a little bit hacky (see the partition_col is manually injected here) so we should generalize this so that the partition column is a valid kwarg in
StructuredDataset
initializer
[flyte-core]
116 Views