0% found this document useful (0 votes)
28 views

Chapter07 Working-With-Keras

This document provides a summary of 3 key points about Keras: 1. Keras has two main model building APIs - the Sequential API for linear stack models and the Functional API for more complex models. 2. The Sequential API builds models layer-by-layer while the Functional API allows specifying arbitrary connectivity between layers. 3. Keras models can be trained and evaluated using built-in training loops or by writing custom training logic and metrics.

Uploaded by

Jas Lim
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOC, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
28 views

Chapter07 Working-With-Keras

This document provides a summary of 3 key points about Keras: 1. Keras has two main model building APIs - the Sequential API for linear stack models and the Functional API for more complex models. 2. The Sequential API builds models layer-by-layer while the Functional API allows specifying arbitrary connectivity between layers. 3. Keras models can be trained and evaluated using built-in training loops or by writing custom training logic and metrics.

Uploaded by

Jas Lim
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOC, PDF, TXT or read online on Scribd
You are on page 1/ 12

This is a companion notebook for the book Deep Learning with Python, Second Edition.

For
readability, it only contains runnable code blocks and section titles, and omits everything
else in the book: text paragraphs, figures, and pseudocode.
If you want to be able to follow what's going on, I recommend reading the notebook
side by side with your copy of the book.
This notebook was generated for TensorFlow 2.6.

Working with Keras: A deep dive


A spectrum of workflows
Different ways to build Keras models
The Sequential model
The Sequential class
from tensorflow import keras
from tensorflow.keras import layers

model = keras.Sequential([
layers.Dense(64, activation="relu"),
layers.Dense(10, activation="softmax")
])

Incrementally building a Sequential model


model = keras.Sequential()
model.add(layers.Dense(64, activation="relu"))
model.add(layers.Dense(10, activation="softmax"))

Calling a model for the first time to build it


model.build(input_shape=(None, 3))
model.weights

The summary method


model.summary()

Naming models and layers with the name argument


model = keras.Sequential(name="my_example_model")
model.add(layers.Dense(64, activation="relu", name="my_first_layer"))
model.add(layers.Dense(10, activation="softmax", name="my_last_layer"))
model.build((None, 3))
model.summary()
Specifying the input shape of your model in advance
model = keras.Sequential()
model.add(keras.Input(shape=(3,)))
model.add(layers.Dense(64, activation="relu"))

model.summary()

model.add(layers.Dense(10, activation="softmax"))
model.summary()

The Functional API

A simple example
A simple Functional model with two Dense layers
inputs = keras.Input(shape=(3,), name="my_input")
features = layers.Dense(64, activation="relu")(inputs)
outputs = layers.Dense(10, activation="softmax")(features)
model = keras.Model(inputs=inputs, outputs=outputs)

inputs = keras.Input(shape=(3,), name="my_input")

inputs.shape

inputs.dtype

features = layers.Dense(64, activation="relu")(inputs)

features.shape

outputs = layers.Dense(10, activation="softmax")(features)


model = keras.Model(inputs=inputs, outputs=outputs)

model.summary()

Multi-input, multi-output models


A multi-input, multi-output Functional model
vocabulary_size = 10000
num_tags = 100
num_departments = 4

title = keras.Input(shape=(vocabulary_size,), name="title")


text_body = keras.Input(shape=(vocabulary_size,), name="text_body")
tags = keras.Input(shape=(num_tags,), name="tags")

features = layers.Concatenate()([title, text_body, tags])


features = layers.Dense(64, activation="relu")(features)

priority = layers.Dense(1, activation="sigmoid", name="priority")(features)


department = layers.Dense(
num_departments, activation="softmax", name="department")(features)

model = keras.Model(inputs=[title, text_body, tags], outputs=[priority,


department])

Training a multi-input, multi-output model


Training a model by providing lists of input & target arrays
import numpy as np

num_samples = 1280

title_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size))


text_body_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size))
tags_data = np.random.randint(0, 2, size=(num_samples, num_tags))

priority_data = np.random.random(size=(num_samples, 1))


department_data = np.random.randint(0, 2, size=(num_samples,
num_departments))

model.compile(optimizer="rmsprop",
loss=["mean_squared_error", "categorical_crossentropy"],
metrics=[["mean_absolute_error"], ["accuracy"]])
model.fit([title_data, text_body_data, tags_data],
[priority_data, department_data],
epochs=1)
model.evaluate([title_data, text_body_data, tags_data],
[priority_data, department_data])
priority_preds, department_preds = model.predict([title_data, text_body_data,
tags_data])

Training a model by providing dicts of input & target arrays


model.compile(optimizer="rmsprop",
loss={"priority": "mean_squared_error", "department":
"categorical_crossentropy"},
metrics={"priority": ["mean_absolute_error"], "department":
["accuracy"]})
model.fit({"title": title_data, "text_body": text_body_data, "tags":
tags_data},
{"priority": priority_data, "department": department_data},
epochs=1)
model.evaluate({"title": title_data, "text_body": text_body_data, "tags":
tags_data},
{"priority": priority_data, "department": department_data})
priority_preds, department_preds = model.predict(
{"title": title_data, "text_body": text_body_data, "tags": tags_data})
The power of the Functional API: Access to layer connectivity
keras.utils.plot_model(model, "ticket_classifier.png")

keras.utils.plot_model(model, "ticket_classifier_with_shape_info.png",
show_shapes=True)

Retrieving the inputs or outputs of a layer in a Functional model


model.layers

model.layers[3].input

model.layers[3].output

Creating a new model by reusing intermediate layer outputs


features = model.layers[4].output
difficulty = layers.Dense(3, activation="softmax", name="difficulty")
(features)

new_model = keras.Model(
inputs=[title, text_body, tags],
outputs=[priority, department, difficulty])

keras.utils.plot_model(new_model, "updated_ticket_classifier.png",
show_shapes=True)

Subclassing the Model class

Rewriting our previous example as a subclassed model


A simple subclassed model
class CustomerTicketModel(keras.Model):

def __init__(self, num_departments):


super().__init__()
self.concat_layer = layers.Concatenate()
self.mixing_layer = layers.Dense(64, activation="relu")
self.priority_scorer = layers.Dense(1, activation="sigmoid")
self.department_classifier = layers.Dense(
num_departments, activation="softmax")

def call(self, inputs):


title = inputs["title"]
text_body = inputs["text_body"]
tags = inputs["tags"]

features = self.concat_layer([title, text_body, tags])


features = self.mixing_layer(features)
priority = self.priority_scorer(features)
department = self.department_classifier(features)
return priority, department

model = CustomerTicketModel(num_departments=4)

priority, department = model(


{"title": title_data, "text_body": text_body_data, "tags": tags_data})

model.compile(optimizer="rmsprop",
loss=["mean_squared_error", "categorical_crossentropy"],
metrics=[["mean_absolute_error"], ["accuracy"]])
model.fit({"title": title_data,
"text_body": text_body_data,
"tags": tags_data},
[priority_data, department_data],
epochs=1)
model.evaluate({"title": title_data,
"text_body": text_body_data,
"tags": tags_data},
[priority_data, department_data])
priority_preds, department_preds = model.predict({"title": title_data,
"text_body":
text_body_data,
"tags": tags_data})

Beware: What subclassed models don't support

Mixing and matching different components


Creating a Functional model that includes a subclassed model
class Classifier(keras.Model):

def __init__(self, num_classes=2):


super().__init__()
if num_classes == 2:
num_units = 1
activation = "sigmoid"
else:
num_units = num_classes
activation = "softmax"
self.dense = layers.Dense(num_units, activation=activation)

def call(self, inputs):


return self.dense(inputs)

inputs = keras.Input(shape=(3,))
features = layers.Dense(64, activation="relu")(inputs)
outputs = Classifier(num_classes=10)(features)
model = keras.Model(inputs=inputs, outputs=outputs)
Creating a subclassed model that includes a Functional model
inputs = keras.Input(shape=(64,))
outputs = layers.Dense(1, activation="sigmoid")(inputs)
binary_classifier = keras.Model(inputs=inputs, outputs=outputs)

class MyModel(keras.Model):

def __init__(self, num_classes=2):


super().__init__()
self.dense = layers.Dense(64, activation="relu")
self.classifier = binary_classifier

def call(self, inputs):


features = self.dense(inputs)
return self.classifier(features)

model = MyModel()

Remember: Use the right tool for the job

Using built-in training and evaluation loops


The standard workflow: compile(), fit(), evaluate(), predict()
from tensorflow.keras.datasets import mnist

def get_mnist_model():
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = keras.Model(inputs, outputs)
return model

(images, labels), (test_images, test_labels) = mnist.load_data()


images = images.reshape((60000, 28 * 28)).astype("float32") / 255
test_images = test_images.reshape((10000, 28 * 28)).astype("float32") / 255
train_images, val_images = images[10000:], images[:10000]
train_labels, val_labels = labels[10000:], labels[:10000]

model = get_mnist_model()
model.compile(optimizer="rmsprop",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
model.fit(train_images, train_labels,
epochs=3,
validation_data=(val_images, val_labels))
test_metrics = model.evaluate(test_images, test_labels)
predictions = model.predict(test_images)
Writing your own metrics
Implementing a custom metric by subclassing the Metric class
import tensorflow as tf

class RootMeanSquaredError(keras.metrics.Metric):

def __init__(self, name="rmse", **kwargs):


super().__init__(name=name, **kwargs)
self.mse_sum = self.add_weight(name="mse_sum", initializer="zeros")
self.total_samples = self.add_weight(
name="total_samples", initializer="zeros", dtype="int32")

def update_state(self, y_true, y_pred, sample_weight=None):


y_true = tf.one_hot(y_true, depth=tf.shape(y_pred)[1])
mse = tf.reduce_sum(tf.square(y_true - y_pred))
self.mse_sum.assign_add(mse)
num_samples = tf.shape(y_pred)[0]
self.total_samples.assign_add(num_samples)

def result(self):
return tf.sqrt(self.mse_sum / tf.cast(self.total_samples,
tf.float32))

def reset_state(self):
self.mse_sum.assign(0.)
self.total_samples.assign(0)

model = get_mnist_model()
model.compile(optimizer="rmsprop",
loss="sparse_categorical_crossentropy",
metrics=["accuracy", RootMeanSquaredError()])
model.fit(train_images, train_labels,
epochs=3,
validation_data=(val_images, val_labels))
test_metrics = model.evaluate(test_images, test_labels)

Using callbacks

The EarlyStopping and ModelCheckpoint callbacks


Using the callbacks argument in the fit() method
callbacks_list = [
keras.callbacks.EarlyStopping(
monitor="val_accuracy",
patience=2,
),
keras.callbacks.ModelCheckpoint(
filepath="checkpoint_path.keras",
monitor="val_loss",
save_best_only=True,
)
]
model = get_mnist_model()
model.compile(optimizer="rmsprop",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
model.fit(train_images, train_labels,
epochs=10,
callbacks=callbacks_list,
validation_data=(val_images, val_labels))

model = keras.models.load_model("checkpoint_path.keras")

Writing your own callbacks


Creating a custom callback by subclassing the Callback class
from matplotlib import pyplot as plt

class LossHistory(keras.callbacks.Callback):
def on_train_begin(self, logs):
self.per_batch_losses = []

def on_batch_end(self, batch, logs):


self.per_batch_losses.append(logs.get("loss"))

def on_epoch_end(self, epoch, logs):


plt.clf()
plt.plot(range(len(self.per_batch_losses)), self.per_batch_losses,
label="Training loss for each batch")
plt.xlabel(f"Batch (epoch {epoch})")
plt.ylabel("Loss")
plt.legend()
plt.savefig(f"plot_at_epoch_{epoch}")
self.per_batch_losses = []

model = get_mnist_model()
model.compile(optimizer="rmsprop",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
model.fit(train_images, train_labels,
epochs=10,
callbacks=[LossHistory()],
validation_data=(val_images, val_labels))

Monitoring and visualization with TensorBoard


model = get_mnist_model()
model.compile(optimizer="rmsprop",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])

tensorboard = keras.callbacks.TensorBoard(
log_dir="/full_path_to_your_log_dir",
)
model.fit(train_images, train_labels,
epochs=10,
validation_data=(val_images, val_labels),
callbacks=[tensorboard])

%load_ext tensorboard
%tensorboard --logdir /full_path_to_your_log_dir

Writing your own training and evaluation loops


Training versus inference

Low-level usage of metrics


metric = keras.metrics.SparseCategoricalAccuracy()
targets = [0, 1, 2]
predictions = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
metric.update_state(targets, predictions)
current_result = metric.result()
print(f"result: {current_result:.2f}")

values = [0, 1, 2, 3, 4]
mean_tracker = keras.metrics.Mean()
for value in values:
mean_tracker.update_state(value)
print(f"Mean of values: {mean_tracker.result():.2f}")

A complete training and evaluation loop


Writing a step-by-step training loop: the training step function
model = get_mnist_model()

loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = keras.optimizers.RMSprop()
metrics = [keras.metrics.SparseCategoricalAccuracy()]
loss_tracking_metric = keras.metrics.Mean()

def train_step(inputs, targets):


with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
loss = loss_fn(targets, predictions)
gradients = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(gradients, model.trainable_weights))

logs = {}
for metric in metrics:
metric.update_state(targets, predictions)
logs[metric.name] = metric.result()

loss_tracking_metric.update_state(loss)
logs["loss"] = loss_tracking_metric.result()
return logs

Writing a step-by-step training loop: resetting the metrics


def reset_metrics():
for metric in metrics:
metric.reset_state()
loss_tracking_metric.reset_state()

Writing a step-by-step training loop: the loop itself


training_dataset = tf.data.Dataset.from_tensor_slices((train_images,
train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3
for epoch in range(epochs):
reset_metrics()
for inputs_batch, targets_batch in training_dataset:
logs = train_step(inputs_batch, targets_batch)
print(f"Results at the end of epoch {epoch}")
for key, value in logs.items():
print(f"...{key}: {value:.4f}")

Writing a step-by-step evaluation loop


def test_step(inputs, targets):
predictions = model(inputs, training=False)
loss = loss_fn(targets, predictions)

logs = {}
for metric in metrics:
metric.update_state(targets, predictions)
logs["val_" + metric.name] = metric.result()

loss_tracking_metric.update_state(loss)
logs["val_loss"] = loss_tracking_metric.result()
return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))


val_dataset = val_dataset.batch(32)
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
logs = test_step(inputs_batch, targets_batch)
print("Evaluation results:")
for key, value in logs.items():
print(f"...{key}: {value:.4f}")
Make it fast with tf.function
Adding a tf.function decorator to our evaluation-step function
@tf.function
def test_step(inputs, targets):
predictions = model(inputs, training=False)
loss = loss_fn(targets, predictions)

logs = {}
for metric in metrics:
metric.update_state(targets, predictions)
logs["val_" + metric.name] = metric.result()

loss_tracking_metric.update_state(loss)
logs["val_loss"] = loss_tracking_metric.result()
return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))


val_dataset = val_dataset.batch(32)
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
logs = test_step(inputs_batch, targets_batch)
print("Evaluation results:")
for key, value in logs.items():
print(f"...{key}: {value:.4f}")

Leveraging fit() with a custom training loop


Implementing a custom training step to use with fit()
loss_fn = keras.losses.SparseCategoricalCrossentropy()
loss_tracker = keras.metrics.Mean(name="loss")

class CustomModel(keras.Model):
def train_step(self, data):
inputs, targets = data
with tf.GradientTape() as tape:
predictions = self(inputs, training=True)
loss = loss_fn(targets, predictions)
gradients = tape.gradient(loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(gradients,
self.trainable_weights))

loss_tracker.update_state(loss)
return {"loss": loss_tracker.result()}

@property
def metrics(self):
return [loss_tracker]
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)

model.compile(optimizer=keras.optimizers.RMSprop())
model.fit(train_images, train_labels, epochs=3)

class CustomModel(keras.Model):
def train_step(self, data):
inputs, targets = data
with tf.GradientTape() as tape:
predictions = self(inputs, training=True)
loss = self.compiled_loss(targets, predictions)
gradients = tape.gradient(loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(gradients,
self.trainable_weights))
self.compiled_metrics.update_state(targets, predictions)
return {m.name: m.result() for m in self.metrics}

inputs = keras.Input(shape=(28 * 28,))


features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)

model.compile(optimizer=keras.optimizers.RMSprop(),
loss=keras.losses.SparseCategoricalCrossentropy(),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.fit(train_images, train_labels, epochs=3)

Summary

You might also like