Greg Linklater
02/07/2024, 2:00 PMGlime
02/07/2024, 2:01 PMfrom flytekit import task, workflow from flytekitplugins.great_expectations import BatchRequestConfig, GreatExpectationsTask from pandas import DataFrame # Define a Great Expectations task @task def my_ge_task(dataframe: DataFrame) -> DataFrame: # Define the batch request configuration batch_request_config = BatchRequestConfig( datasource_name="my_datasource", data_connector_name="my_data_connector", data_asset_name="my_data_asset", batch_identifiers={"default_identifier_name": "default_identifier_value"}, ) # Define the Great Expectations task ge_task = GreatExpectationsTask( name="great_expectations_task", inputs={"dataframe": DataFrame}, expectation_suite_name="my_expectation_suite", data_context_root_dir="my_data_context_root_dir", batch_request_config=batch_request_config, ) # Execute the Great Expectations task validated_df = ge_task(dataframe=dataframe) return validated_df # Define a workflow that uses the Great Expectations task @workflow def my_workflow(input_df: DataFrame) -> DataFrame: validated_df = my_ge_task(dataframe=input_df) return validated_df
In this example, my_ge_task
is a task that uses Great Expectations to validate a DataFrame
. The GreatExpectationsTask
is configured with necessary parameters such as the expectation suite name, data context root directory, and batch request configuration. This task can then be used within a Flyte workflow (my_workflow
) to ensure that the data being processed meets the defined expectations. Please note, this is a simplified example to illustrate the concept. The actual implementation may vary based on your specific requirements, such as the data source, data connectors, and expectation suites you are using with Great Expectations. For more detailed information on integrating Great Expectations with Flyte, including setting up data sources and expectation suites, refer to the Flyte documentation and Great Expectations documentation. ###S