diff --git a/samples/dbt/README.md b/samples/dbt/README.md index c52b633116..986aa2eae3 100644 --- a/samples/dbt/README.md +++ b/samples/dbt/README.md @@ -10,6 +10,8 @@ It includes basic configurations and sample models to help you get started quick - `dbt_project.yml`: configures your dbt project - **dbt_sample_project**. - `dbt_bigframes_code_sample_1.py`: An example to read BigQuery data and perform basic transformation. - `dbt_bigframes_code_sample_2.py`: An example to build an incremental model that leverages BigFrames UDF capabilities. +- `prepare_table.py`: An ML example to consolidate various data sources into a single, unified table for later usage. +- `prediction.py`: An ML example to train models and then generate predictions using the prepared table. ## Requirements diff --git a/samples/dbt/dbt_sample_project/models/ml_example/prediction.py b/samples/dbt/dbt_sample_project/models/ml_example/prediction.py new file mode 100644 index 0000000000..d2fb54b384 --- /dev/null +++ b/samples/dbt/dbt_sample_project/models/ml_example/prediction.py @@ -0,0 +1,67 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This DBT Python model prepares and trains a machine learning model to predict +# ozone levels. +# 1. Data Preparation: The model first gets a prepared dataset and splits it +# into three subsets based on the year: training data (before 2017), +# testing data (2017-2019), and prediction data (2020 and later). +# 2. Model Training: It then uses the LinearRegression model from BigFrames +# ML library. The model is trained on the historical data, using other +# atmospheric parameters to predict the 'o3' (ozone) levels. +# 3. Prediction: Finally, the trained model makes predictions on the most +# recent data (from 2020 onwards) and returns the resulting DataFrame of +# predicted ozone values. +# +# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes + + +def model(dbt, session): + dbt.config(submission_method="bigframes", timeout=6000) + + df = dbt.ref("prepare_table") + + # Define the rules for separating the training, test and prediction data. + train_data_filter = (df.date_local.dt.year < 2017) + test_data_filter = ( + (df.date_local.dt.year >= 2017) & (df.date_local.dt.year < 2020) + ) + predict_data_filter = (df.date_local.dt.year >= 2020) + + # Define index_columns again here in prediction. + index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"] + + # Separate the training, test and prediction data. + df_train = df[train_data_filter].set_index(index_columns) + df_test = df[test_data_filter].set_index(index_columns) + df_predict = df[predict_data_filter].set_index(index_columns) + + # Finalize the training dataframe. + X_train = df_train.drop(columns="o3") + y_train = df_train["o3"] + + # Finalize the prediction dataframe. + X_predict = df_predict.drop(columns="o3") + + # Import the LinearRegression model from bigframes.ml module. + from bigframes.ml.linear_model import LinearRegression + + # Train the model. + model = LinearRegression() + model.fit(X_train, y_train) + + # Make the prediction using the model. + df_pred = model.predict(X_predict) + + return df_pred diff --git a/samples/dbt/dbt_sample_project/models/ml_example/prepare_table.py b/samples/dbt/dbt_sample_project/models/ml_example/prepare_table.py new file mode 100644 index 0000000000..23b54a9122 --- /dev/null +++ b/samples/dbt/dbt_sample_project/models/ml_example/prepare_table.py @@ -0,0 +1,93 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This DBT Python model processes EPA historical air quality data from BigQuery +# using BigFrames. The primary goal is to merge several hourly summary +# tables into a single, unified DataFrame for later prediction. It includes the +# following steps: +# 1. Reading and Cleaning: It reads individual hourly summary tables from +# BigQuery for various atmospheric parameters (like CO, O3, temperature, +# and wind speed). Each table is cleaned by sorting, removing duplicates, +# and renaming columns for clarity. +# 2. Combining Data: It then merges these cleaned tables into a single, +# comprehensive DataFrame. An inner join is used to ensure the final output +# only includes records with complete data across all parameters. +# 3. Final Output: The unified DataFrame is returned as the model's output, +# creating a corresponding BigQuery table for future use. +# +# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes + + +import bigframes.pandas as bpd + +def model(dbt, session): + # Optional: override settings from dbt_project.yml. + # When both are set, dbt.config takes precedence over dbt_project.yml. + dbt.config(submission_method="bigframes", timeout=6000) + + # Define the dataset and the columns of interest representing various parameters + # in the atmosphere. + dataset = "bigquery-public-data.epa_historical_air_quality" + index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"] + param_column = "parameter_name" + value_column = "sample_measurement" + + # Initialize a list for collecting dataframes from individual parameters. + params_dfs = [] + + # Collect dataframes from tables which contain data for single parameter. + table_param_dict = { + "co_hourly_summary" : "co", + "no2_hourly_summary" : "no2", + "o3_hourly_summary" : "o3", + "pressure_hourly_summary" : "pressure", + "so2_hourly_summary" : "so2", + "temperature_hourly_summary" : "temperature", + } + + for table, param in table_param_dict.items(): + param_df = bpd.read_gbq( + f"{dataset}.{table}", + columns=index_columns + [value_column] + ) + param_df = param_df\ + .sort_values(index_columns)\ + .drop_duplicates(index_columns)\ + .set_index(index_columns)\ + .rename(columns={value_column : param}) + params_dfs.append(param_df) + + # Collect dataframes from the table containing wind speed. + # Optionally: collect dataframes from other tables containing + # wind direction, NO, NOx, and NOy data as needed. + wind_table = f"{dataset}.wind_hourly_summary" + bpd.read_gbq(wind_table, columns=[param_column]).value_counts() + + wind_speed_df = bpd.read_gbq( + wind_table, + columns=index_columns + [value_column], + filters=[(param_column, "==", "Wind Speed - Resultant")] + ) + wind_speed_df = wind_speed_df\ + .sort_values(index_columns)\ + .drop_duplicates(index_columns)\ + .set_index(index_columns)\ + .rename(columns={value_column: "wind_speed"}) + params_dfs.append(wind_speed_df) + + # Combine data for all the selected parameters. + df = bpd.concat(params_dfs, axis=1, join="inner") + df = df.reset_index() + + return df