Accuracy, Parameters Count, Training Time (s), Inference Latency (ms).
Accuracy/Parameters Count, Accuracy/Training Time
Writing a bonus paragraph about pruning % and compression impact?
Phase-1 code starter template
The below code is for your reference; please feel free to change it partially or
fully.
Please make sure it does not have any bugs or mistakes. Code authors DO NOT claim
the code is bug-free. It is the student's responsibility to ensure its correctness.
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import fashion_mnist, cifar10
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
import time
import numpy as np
from tqdm import tqdm
def create_base_model(input_shape, num_classes):
model = models.Sequential([
layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_shape),
layers.MaxPooling2D((2, 2)),
layers.Flatten(),
layers.Dense(2, activation='relu'),
layers.Dense(num_classes, activation='softmax')
])
model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def prepare_dataset(dataset_name):
if dataset_name == 'fashion_mnist':
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
num_classes = 10
input_shape = (28, 28, 1)
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)
elif dataset_name == 'cifar10':
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
num_classes = 10
input_shape = (32, 32, 3)
else:
raise ValueError(f"Unsupported dataset: {dataset_name}")
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255
y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)
return (x_train, y_train), (x_test, y_test), input_shape, num_classes
def evaluate_model(dataset_name, max_epoch, device):
(x_train, y_train), (x_test, y_test), input_shape, num_classes =
prepare_dataset(dataset_name)
with tf.device(device):
model = create_base_model(input_shape, num_classes)
early_stop = EarlyStopping(monitor='val_loss', patience=3,
restore_best_weights=True)
start_time = time.time()
model.fit(x_train, y_train, epochs=max_epoch, batch_size=64,
validation_split=0.2,
callbacks=[early_stop], verbose=1)
train_time = time.time() - start_time
start_time = time.time()
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
eval_time = time.time() - start_time
num_param = model.count_params() # counting number of model's
parameters
print(f"{dataset_name.upper()} Test Accuracy: {test_accuracy * 100:.2f}
%")
print(f"{dataset_name.upper()} Number of Parameters: {num_param}")
return model, num_param, x_test, test_accuracy, train_time, eval_time
def profile_workload(model, device, dev_name, image, iterations=30):
print(f"Profiling on {dev_name}...")
latencies = []
with tf.device(device):
for _ in tqdm(range(10), desc="Warm-up..."):
start = time.time()
_ = model(image, training=False)
for _ in tqdm(range(iterations), desc="Profiling"):
start = time.time()
_ = model(image, training=False)
latencies.append((time.time() - start) * 1000)
avg_latency = np.mean(latencies)
print(f"Average Latency on {dev_name}: {avg_latency:.2f} ms")
prediction = model(image, training=False)
predicted_class = tf.argmax(prediction, axis=1).numpy()[0]
print(f"Predicted Class: {predicted_class}")
return avg_latency
# Device priority: CUDA > MPS > CPU
if tf.config.list_physical_devices('GPU'):
device = '/GPU:0'
dev_name = 'GPU'
elif tf.config.list_physical_devices('MPS'):
device = '/MPS:0'
dev_name = 'Apple MPS'
else:
device = '/CPU:0'
dev_name = 'CPU'
print(f'using {dev_name}')
datasets = ['fashion_mnist', 'cifar10']
for dataset in datasets:
print(f"\nProcessing {dataset}...")
model, num_param, x_test, acc, train_t, eval_t = evaluate_model(dataset,
max_epoch=25, device=device)
test_image = tf.convert_to_tensor(x_test[:1], dtype=tf.float32)
profile_workload(model, device, dev_name, test_image)
Phase-3 code starter template
The below code is for your reference; please feel free to change it partially or
fully.
Please make sure it does not have any bugs or mistakes. Code authors DO NOT claim
the code is bug-free. It is the student's responsibility to ensure its correctness.
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import fashion_mnist, cifar10
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
import time
import numpy as np
import os
from tqdm import tqdm
assert tf.__version__=='2.15.0', 'WARNING!!! different TensorFlow version may
produce an error while quantizing.'
def create_base_model(input_shape, num_classes):
model = models.Sequential([
layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_shape),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_shape),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_shape),
layers.MaxPooling2D((2, 2)),
layers.Flatten(),
layers.Dense(2, activation='relu'),
layers.Dense(num_classes, activation='softmax')
])
model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def prepare_dataset(dataset_name):
if dataset_name == 'fashion_mnist':
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
num_classes = 10
input_shape = (28, 28, 1)
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)
elif dataset_name == 'cifar10':
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
num_classes = 10
input_shape = (32, 32, 3)
else:
raise ValueError(f"Unsupported dataset: {dataset_name}")
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255
y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)
return (x_train, y_train), (x_test, y_test), input_shape, num_classes
def evaluate_model(model, x_test, y_test, device):
with tf.device(device):
start_time = time.time()
loss, acc = model.evaluate(x_test, y_test, verbose=0)
eval_time = time.time() - start_time
return acc, eval_time
def profile_workload(model, device, dev_name, image, iterations=30):
print(f"Profiling on {dev_name}...")
latencies = []
with tf.device(device):
for _ in tqdm(range(10), desc="Warm-up"):
_ = model(image, training=False)
for _ in tqdm(range(iterations), desc="Profiling"):
start = time.time()
_ = model(image, training=False)
latencies.append((time.time() - start) * 1000)
avg_latency = np.mean(latencies)
print(f"Average Latency on {dev_name}: {avg_latency:.2f} ms")
return avg_latency
def profile_tflite_model(interpreter, input_tensor, iterations=30):
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
latencies = []
for _ in range(10):
interpreter.set_tensor(input_details[0]['index'], input_tensor)
interpreter.invoke()
for _ in range(iterations):
start = time.time()
interpreter.set_tensor(input_details[0]['index'], input_tensor)
interpreter.invoke()
latencies.append((time.time() - start) * 1000)
avg_latency = np.mean(latencies)
return avg_latency
def quantize_model_to_int8(model, representative_data_gen,
save_path="model_int8.tflite"):
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_quant_model = converter.convert()
with open(save_path, "wb") as f:
f.write(tflite_quant_model)
print(f"INT8 Quantized model saved at {save_path}")
def get_file_size(file_path):
return os.path.getsize(file_path) / 1024 # KB
def evaluate_tflite_accuracy(tflite_model_path, x_test, y_test):
interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
correct = 0
total = x_test.shape[0]
for i in range(total):
input_data = np.round(x_test[i:i+1] * 255).astype(np.int8) # <- FIXED
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output = interpreter.get_tensor(output_details[0]['index'])
if np.argmax(output) == np.argmax(y_test[i]):
correct += 1
accuracy = correct / total
return accuracy
# -------------------
# Main Code
# -------------------
if tf.config.list_physical_devices('GPU'):
device = '/GPU:0'
dev_name = 'GPU'
elif tf.config.list_physical_devices('MPS'):
device = '/MPS:0'
dev_name = 'Apple MPS'
else:
device = '/CPU:0'
dev_name = 'CPU'
print(f'Using {dev_name}')
datasets = ['fashion_mnist', 'cifar10']
for dataset in datasets:
print(f"\nProcessing {dataset}...")
(x_train, y_train), (x_test, y_test), input_shape, num_classes =
prepare_dataset(dataset)
with tf.device(device):
model = create_base_model(input_shape, num_classes)
early_stop = EarlyStopping(monitor='val_loss', patience=3,
restore_best_weights=True)
start_train = time.time()
model.fit(x_train, y_train, epochs=25, batch_size=64,
validation_split=0.2,
callbacks=[early_stop], verbose=1)
train_time = time.time() - start_train
test_acc_fp32, eval_time_fp32 = evaluate_model(model, x_test, y_test,
device)
num_params = model.count_params()
test_image = tf.convert_to_tensor(x_test[:1], dtype=tf.float32)
orig_latency = profile_workload(model, device, dev_name, test_image)
# Save original model
model.save('model_fp32.h5')
fp32_size = get_file_size('model_fp32.h5')
print(f"Original Model Size: {fp32_size:.2f} KB")
# Quantization
def representative_data_gen():
for input_value in
tf.data.Dataset.from_tensor_slices(x_test).batch(1).take(100):
yield [tf.cast(input_value * 255.0, tf.float32)]
quantize_model_to_int8(model, representative_data_gen,
save_path="model_int8.tflite")
int8_size = get_file_size("model_int8.tflite")
print(f"Quantized Model Size: {int8_size:.2f} KB")
# Accuracy after quantization
test_acc_int8 = evaluate_tflite_accuracy("model_int8.tflite", x_test,
y_test)
# Inference latency after quantization
interpreter = tf.lite.Interpreter(model_path="model_int8.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
test_image_int8 = np.round(x_test[:1] * 255).astype(np.int8)
int8_latency = profile_tflite_model(interpreter, test_image_int8)
print("\nSummary:")
print(f"{'Metric':<25} {'Before Quantization':<20} {'After Quantization'}")
print(f"{'-'*70}")
print(f"{'Number of Parameters':<25} {num_params:<20} {num_params}")
print(f"{'Test Accuracy (%)':<25} {test_acc_fp32*100:.2f}%{'':<12}
{test_acc_int8*100:.2f}%")
print(f"{'Training Time (s)':<25} {train_time:.2f}{'':<16} {'-'}")
print(f"{'Evaluation Time (s)':<25} {eval_time_fp32:.4f}{'':<14} {'-'}")
print(f"{'Inference Latency (ms)':<25} {orig_latency:.2f}{'':<14}
{int8_latency:.2f}")
print(f"{'Model Size (KB)':<25} {fp32_size:.2f}{'':<14} {int8_size:.2f}")