Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions samples/dbt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ It includes basic configurations and sample models to help you get started quick
- `dbt_project.yml`: configures your dbt project - **dbt_sample_project**.
- `dbt_bigframes_code_sample_1.py`: An example to read BigQuery data and perform basic transformation.
- `dbt_bigframes_code_sample_2.py`: An example to build an incremental model that leverages BigFrames UDF capabilities.
- `prepare_table.py`: An ML example to consolidate various data sources into a single, unified table for later usage.
- `prediction.py`: An ML example to train models and then generate predictions using the prepared table.

## Requirements

Expand Down
67 changes: 67 additions & 0 deletions samples/dbt/dbt_sample_project/models/ml_example/prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This DBT Python model prepares and trains a machine learning model to predict
# ozone levels.
# 1. Data Preparation: The model first gets a prepared dataset and splits it
# into three subsets based on the year: training data (before 2017),
# testing data (2017-2019), and prediction data (2020 and later).
# 2. Model Training: It then uses the LinearRegression model from BigFrames
# ML library. The model is trained on the historical data, using other
# atmospheric parameters to predict the 'o3' (ozone) levels.
# 3. Prediction: Finally, the trained model makes predictions on the most
# recent data (from 2020 onwards) and returns the resulting DataFrame of
# predicted ozone values.
#
# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes


def model(dbt, session):
dbt.config(submission_method="bigframes", timeout=6000)

df = dbt.ref("prepare_table")

# Define the rules for separating the training, test and prediction data.
train_data_filter = (df.date_local.dt.year < 2017)
test_data_filter = (
(df.date_local.dt.year >= 2017) & (df.date_local.dt.year < 2020)
)
predict_data_filter = (df.date_local.dt.year >= 2020)

# Define index_columns again here in prediction.
index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"]

# Separate the training, test and prediction data.
df_train = df[train_data_filter].set_index(index_columns)
df_test = df[test_data_filter].set_index(index_columns)
df_predict = df[predict_data_filter].set_index(index_columns)

# Finalize the training dataframe.
X_train = df_train.drop(columns="o3")
y_train = df_train["o3"]

# Finalize the prediction dataframe.
X_predict = df_predict.drop(columns="o3")

# Import the LinearRegression model from bigframes.ml module.
from bigframes.ml.linear_model import LinearRegression

# Train the model.
model = LinearRegression()
model.fit(X_train, y_train)

# Make the prediction using the model.
df_pred = model.predict(X_predict)

return df_pred
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This DBT Python model processes EPA historical air quality data from BigQuery
# using BigFrames. The primary goal is to merge several hourly summary
# tables into a single, unified DataFrame for later prediction. It includes the
# following steps:
# 1. Reading and Cleaning: It reads individual hourly summary tables from
# BigQuery for various atmospheric parameters (like CO, O3, temperature,
# and wind speed). Each table is cleaned by sorting, removing duplicates,
# and renaming columns for clarity.
# 2. Combining Data: It then merges these cleaned tables into a single,
# comprehensive DataFrame. An inner join is used to ensure the final output
# only includes records with complete data across all parameters.
# 3. Final Output: The unified DataFrame is returned as the model's output,
# creating a corresponding BigQuery table for future use.
#
# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes


import bigframes.pandas as bpd

def model(dbt, session):
# Optional: override settings from dbt_project.yml.
# When both are set, dbt.config takes precedence over dbt_project.yml.
dbt.config(submission_method="bigframes", timeout=6000)

# Define the dataset and the columns of interest representing various parameters
# in the atmosphere.
dataset = "bigquery-public-data.epa_historical_air_quality"
index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"]
param_column = "parameter_name"
value_column = "sample_measurement"

# Initialize a list for collecting dataframes from individual parameters.
params_dfs = []

# Collect dataframes from tables which contain data for single parameter.
table_param_dict = {
"co_hourly_summary" : "co",
"no2_hourly_summary" : "no2",
"o3_hourly_summary" : "o3",
"pressure_hourly_summary" : "pressure",
"so2_hourly_summary" : "so2",
"temperature_hourly_summary" : "temperature",
}

for table, param in table_param_dict.items():
param_df = bpd.read_gbq(
f"{dataset}.{table}",
columns=index_columns + [value_column]
)
param_df = param_df\
.sort_values(index_columns)\
.drop_duplicates(index_columns)\
.set_index(index_columns)\
.rename(columns={value_column : param})
params_dfs.append(param_df)

# Collect dataframes from the table containing wind speed.
# Optionally: collect dataframes from other tables containing
# wind direction, NO, NOx, and NOy data as needed.
wind_table = f"{dataset}.wind_hourly_summary"
bpd.read_gbq(wind_table, columns=[param_column]).value_counts()

wind_speed_df = bpd.read_gbq(
wind_table,
columns=index_columns + [value_column],
filters=[(param_column, "==", "Wind Speed - Resultant")]
)
wind_speed_df = wind_speed_df\
.sort_values(index_columns)\
.drop_duplicates(index_columns)\
.set_index(index_columns)\
.rename(columns={value_column: "wind_speed"})
params_dfs.append(wind_speed_df)

# Combine data for all the selected parameters.
df = bpd.concat(params_dfs, axis=1, join="inner")
df = df.reset_index()

return df