Hey all :) TLDR - Did anyone develop a robust, ge...
# flyte-support
b
Hey all :) TLDR - Did anyone develop a robust, generic workflow for model development that can be shared across data scientists? we are currently upgrading our model development environment. we would like to create a workflow with the task structure of: • preprocessing data • processing features • model train • model evaluation this wf will be used by all our data scientists so it has to be very flexible in terms of different types of models and code. the researcher basically should define a generic model config, alter some of the code, then run it. did anyone develop this already? would love to learn from your experience and exchange thoughts.
f
Folks usually create reference tasks and then point folks to it
We built something like this with union ml take a look
But we stopped that project
b
where can I see that? why did it stop?
f
Just focus
Honestly I would love to put simmering like this in Flyte, if you have ideas
b
So unionML is not usable? sounds pretty much what I'm searching for. Basically my idea was integrating flyte with Neptune.ai for experiment tracking. On the flyte side there would be: an ExperimentExecutor class which takes the following inputs: • local/cluster execution. • config dataclass for each step (allowing for grid execute) • programmatically builds a workflow. • presents easy API for querying results and caching the initial dataset locally for followup local examination.
f
We already have a Neptune ai integration
Cc @broad-monitor-993 @flaky-parrot-42438 @astonishing-eve-54331
a
Hi Tom, this make a lot of sense. Programmatically executing extensible workflows from a dynamic set of local configurations (managed by Hydra), which are all locally validated with Pydantic. This is something I have been tinkering with for a few years now and I have a draft of a tutorial on how to accomplish this with Flyte + Hydra + Pydantic here. In short, you may automatically, recursively instantiate your local
yaml
configurations and programmatically execute workflows from them. You do not need a rigorous "config dataclass for each step" - Flyte allows you to access attributes of the dataclasses in the Flyte workflow DSL. Otherwise, if you want to also programmatically construct / define a workflow from the configs (in addition to programmatically executing them from configs), I think you would be best served by exploring how to integrate Hydra + Pydantic with Flyte's "Imperative Workflows". My tutorial above should also play nicely with imperative workflows, but this will take a little bit more work to set up, as you will need to "bind" your defined tasks to your configurations in some way. I have also been thinking about an automated way to do this with some metaprogramming but it can be a little messy at times. Does this make sense for your use case?
b
@freezing-airport-6809 awesome thanks, will check it out. I wanna check again about unionML. is it operable although it was archived?
@astonishing-eve-54331 Wow that's really good! thanks a lot. This will definitely help us with standardisation of experiment execution and configuration ease, especially with multi-runs. Together with Neptune that should cover 90% of what I need. The only thing to consider is how to run different models simply by changing a parameter. My options are: • creating a task/workflow for each model type e.g train_xgboost, Then create a dynamic_workflow that calls the relevant task/wf. • create a generic "train_model" task inside a generic "model_flow" workflow, the problem is that different models have different inputs. currently thinking about 2 options: ◦ The generic task takes a model config dataclass as input, which contains an str field of "model import path" and different parameters. ◦ Every model has a non-task function - e.g "train_xgboost" that is called from the task "train_model" but the first option perhaps makes more sense. to have a workflow for each model. What do you think?
a
I am glad to hear! This pattern is pretty powerful IMO. Getting configurations "done right" is surprisingly challenging. I also want to highlight that, given the above recommendation, the Union Console provides a built-in UI ("Launch Forms") such that model developers may create / modify such workflow executions, even for such complex, arbitrarily nested data class configurations! (Screen shot below). I personally think that you can currently address this last piece (programmatic workflow authoring) via Imperative Workflows. You could do this by programmatically constructing the Imperative workflow inside of a python match-case block as a function of the dataclass type (IE
XGBoostConfig
vs
SVMConfig
) in order to select the appropriate task. This will programmatically construct a new workflow on the fly locally from your configurations. Perhaps with some clever abstractions you could automate the mapping between configurations and tasks, as well as creating unique workflow names per unique DAG structure. In the future, I believe that your needs would be better suited by "Eager workflows". Eager workflows will be more literal, ergonomic and extensible. These are a still a work in progress however (so I would not recommend them just yet), but my brilliant colleagues are investing resources into them over the next several months @wide-vegetable-51116 @flaky-parrot-42438
Screenshot 2024-11-10 at 11.47.48 AM.png
f
Isn’t the config static? Why not generate the workflow statically using imperative?
a
The config is static, but model developers may opt to change the configs via Hydra to, say, swap from a GBM to a GLM, SVM, NN, RF, etc. This means that they would need to register different workflows for each of the unique variations that the imperative workflow may programmatically author. Imperative workflows is definitely the way to manage all of this right now, I 100% agree, but Eager could effectively offload all of this programmatic authoring logic inside of the workflow. This will be much easier to manage and version. It will also be a bit more ergonomic, I think. For this reason, I think that Eager will be a better long term solution for you and your team. But, once again, I would not currently recommend Eager because it is still experimental.
f
+1
one problem with eager is the onus of reproducibility lies on the implementer of the eager logic
a
Oh, yeah that is a very good point! With imperative workflows, it is static, compiled, and reproducible. With eager, it could be reproducible, but it will be more ergonomic and more easily managed I think the ideal solution is going to be a function of how complex you need the programmatic authoring to be. If you want to support hundreds of potential permutation among multiple configuration groups (different model types, metric types, feature engineering logic, etc) then Eager is more favorable. If you have like, a dozen or so different model types, and everything else can be managed with like dynamic workflows or conditionals, Imperative Workflows is going to be more principled. You can use that “multi-run” hydra feature to programmatically register the variations. But, well, that gets more complex when you have a large set of permutations.
f
Maybe this is a Flyte plugin that ships out of the box and then he community makes it reproducible
a
TBH I feel like config-driven imperative workflow authoring would be very difficult to develop as a plugin. Maptasks (with partials), dynamic WF, IO, overrides. Lots of nuances. I’m sure it’s possible. We do have an “omegaconf” plugin that is compatible with Hydra but this loses the fine-grained caching, UI support, attribute-access, and type checking you get with the automatic, recursive dataclass instantiation method I shared above.
f
I am not saying a literal plugin but an additional layer / like a supporting lib
b
Update @astonishing-eve-54331 @freezing-airport-6809 The current architecture is: • Experiment Runner class • static model development flow steps ( tasks) : ◦ dataset_fetching ◦ map_task signal_processing (each tasks handle separate data source) ◦ map_task transform_data_source (each tasks handle separate data source) ◦ train_test_transform(given train test split strategy, fit_transform on train data, then transform on test data) ◦ model training with 2 options: ▪︎ one type of task takes import path to model and runs a generic training for classical model/ DL model ▪︎ custom task for a specific model. ◦ model evaluation (same as training structure) ◦ Also - a "generate labels" task • researcher flow: ◦ initiate experiment runner class : ▪︎ config remote/local execution env, caching, neptune project. ▪︎ pass tasks that match the static model development steps e.g tranform_dataset, while using default tasks that can be edited. ▪︎ config with dataclass per static flow step • each flow step has static input types ( config dataclasses) so the inputs need to be configured only in the dataclass, not in the task signature. ◦ run experiment, track progress, get output dataclass • next step - integrate with hydra for ease of configuration
a
Hi Tom! Sounds like you have a good handle on this. I would like to provide a few tips around some "gotchas" here though! 1. Assuming you are inputting some
list[str]
or
list[MyDataSource]
to represent your data sources, I would highly recommend creating a small task to
sort
the items in the list. You want to ensure that downstream users don't unintentionally create unnecessary "cache misses" of the
transform
task by simply changing the order of data sources. 2. It may not be applicable or feasible in your case, but if you can, it would be very beneficial to ensure that in the
transform
task, if each data source you are trying to merge together are already sorted row-wise, you might be able to simply
concatenate
instead of
join
/
merge
. This would likely be much faster and require less memory. 3. Given the configurations to
train_test_transform
, I would highly recommend trying to discretize the options here. IE: only allow users to select
80/10/10
,
70/15/15
, or
60/20/20
, or other such discrete strategies. I guess, you don't really want to give each user the ability to define continuous train-test split hyperparameters because this could result in effectively useless cache-misses. Also, you should ensure that this operation is idempotent / reproducible. 4. "one type of task takes import path to model" ... if you haven't already, I would highly recommend checking out this neat functionality from hydra that will convert the path to model and load that in for you. But right, you will need to pass in the name of the model path and instantiate it inside of the task, whereas with my previous recommendation Hydra would try to instantiate the model in your local environment (which could work but might result in weird behavior). 5. You might run into some issues around Pydantic's support for type unions and Flyte's requirement around strict typing. In other words: a config of type
XGBoostConfig|RandomForestConfig
might throw an error. I would instead recommend a "parent" dataclass that contains all of the possible children dataclasses, each of which are optional, and then you would override the
None
to your config for your specific model type.
Really enjoying your project! Excited to hear more updates.
b
Thanks a lot for the tips, they do help 🙂 will keep you posted
a
Awesome! Thinking a bit more, I would also try to break each model training config into its own task that you stitch together with Imperative. I say that because I think you’ll be better off by setting the resources for each model type. For example, GBMs are going to take way more memory than GLMs. Also, you might even consider trying to bootstrap your own resource allocations dynamically based on the configurations. Say, if you are training a GBM with some max depth, and some max iterations, then request X amount of memory, whereas if you have higher max depth and max iterations, use 2X the amount of memory. I am just thinking you could end up wasting a lot of money if you try to make a “one size fits all” approach here without some thought around memory requirements
b
I see what you are saying. To start with i think that it's good to have a mapping between model type and resources that can be accessed when creating the wf, with a default resource configuration for classical/deep models. Btw - I'm currently trying to figure out how to perform the train-test split and the consequences that it brings with it. I think I could have a task that returns sample indices for each fold, then run the cross validation in a map_task. However -I believe models with short training time(like classical models) will be better off just performing the train/test split and training in a single task in terms of overhead. Perhaps if no train_test_transform task is provided, the cross validation fold will be skipped right into the training task. What's your view on that?
a
Yeah I have encountered the same problem before and I found a good solution to it. Firstly, this is all under the assumption you only want to split your data randomly. That is, you don’t care about out-of-time (OOT) splits. Given that assumption, you can do a pretty clever trick to get the best of both worlds. First, you’ll want to identify the primary key for each observation. In my past modeling experience, I would use the customer ID as the primary key, even if I was using multiple transactions for each customer. But that is aside the point. You just need to identify the key along which you want to uniquely stratify your data. Once you have that, you can “hash” the key into a floating point value uniformly between 0 and 1. This guarantees reproducibility. I then just store this value (the “seed”) alongside my data as a unique column. You only have to do this once for each dataset, because it is reproducible, and it is effectively instantaneous. You can then use this seed to determine the “split” of any observation at runtime. For example, any value about .8 is test, otherwise it should be use for training/validation in your cross-folding.
This worked wonders for me.
b
Can you clarify what added valued does this method bring?
a
Sorry if I wasn’t clear. Well, let’s break down the naive method. Say,
train_test_split
from
sklearn
Executing this in its own separate task means that you are doubling the memory requirements and doubling the disk space of the data that you are backing to blob storage. This is pretty inefficient. If you instead use the aforementioned method, you can lazily evaluate your splits during model training, but for larger-than-memory observation streaming (torch IterableDataset or TF datasets) as well as in-memory data loading for model training. For example, you could use a Polars LazyFrame to load in your unsplit data, and then evaluate a filter that will hash each observation’s primary key. Because this is lazily evaluated, you only have to actually load your training / validation data for model training, and you only have to load your testing data during OOS evaluation. You get all the same functionality of
train_test_split
without having to duplicate your data or load in more data than necessary at any point in time. You also don’t need to create “indices” for which observation belongs to each strata, because that information is available given the hash of the primary key. It also guarantees reproducibility. This same method can be used for iterable observation filtering for your DL models.
Does that make sense?
b
Thanks for the clarification 🙂 So you're suggesting to add a hash column to the data, then each fold gets a floating point range and uses only a part of the data? I'm still trying to figure out 2 things: 1. if the hash wont be evenly distributed we might end up with unpredictable behaviour, no? 2. I think If I use sklearn.model_selection.KFold I wont really need to load the data and save parts of it. I can simply present a list of client_ids, get a list of integer indices for each fold, then feed that list into a map_task, so that the data selection will happen only during the training.
a
Not necessarily. My point is that you could create the hash anywhere, virtually instantaneously, while guaranteeing it is reproducible. So, if you wanted to train 10 different models, you would be creating this hash 10 different times whenever you load the data. This means you never duplicate the data nor do you have to store a list of indices and pass that around among your tasks. Just create a function that will do this and then filter out your data via a lazy expression. 1. I am assuming you have a large set of data. I mean, it might be a little wonky around thousands of observations. But at millions? Nope. And even if it was a little off (71% / 29%), the 70% / 30% for train and validation/test is all arbitrary anyways :) 2. Your requirements might be different, but what I had in mind was this: you use this hash to determine whether something belongs in the training / validation split OR the testing split. In other words, in your model training tasks you would only ever load in the data where “seed<0.70”. The KFold could be used for the training and validation split, but never the testing split (out of sample split). Your validation split would be used in different ways, but basically it would help with preventing overfitting your model. So, for GBMs you would use it to determine when you should stop adding trees, for NNs you would use to it determine when you should halt training and to which previous checkpoint you should use, and for GLMs you might use it to help prune features. You can KFold over this or whatever - that is all happening inside your training task and so you only have 70% of your data loaded in because the other 30% was filtered out via the lazy expression. Now, after you create your final validated model, you then return it from your training task and then execute a “evaluation” task that uses that model with your 30% testing split. This split will never change for the same dataset even if you use completely different models. It is a fair, reproducible collection of observations that you can absolutely guarantee your model never had a chance to ever use. Once again, this approach is guaranteed to be reproducible, you never have to duplicate your data, and is super fast and memory efficient.
I created this exact set up for a very large experiment and it was very pleasant and effective. Zero chance of overfitting, super memory efficient, and prevents you from having to duplicate your data backed to blob storage.