Project Source

Download as docx, pdf, or txt
Download as docx, pdf, or txt
You are on page 1of 21

Config.

py

from pathlib import Path

# create an configuration

def get_config():

return {

"batch_size": 8,

"num_epochs": 20,

"lr": 10**-4,

"seq_len": 350,

"d_model": 512,

"lang_src": "en",

"lang_tgt": "it",

"model_folder": "weights",

"model_basename": "tmodel_",

"preload": None,

"tokenizer_file": "tokenizer_{0}.json",

"experiment_name": "runs/tmodel_",

# get weights

def get_weights_file_path(config, epoch: str):

model_folder = config["model_folder"]

model_basename = config["model_basename"]

model_filename = f"{model_basename}{epoch}.pt"

return str(Path('.') / model_folder / model_filename)


Corpus.py

import torch

import torch.nn as nn

from torch.utils.data import Dataset

from typing import Any

class BillingualDataset(Dataset):

def __init__(self, dataset, tokenizer_src, tokenizer_tgt, src_lang, tgt_lang, seq_len):

super().__init__()

self.dataset = dataset

self.tokenizer_src = tokenizer_src

self.tokenizer_tgt = tokenizer_tgt

self.src_lang = src_lang

self.tgt_lang = tgt_lang

self.seq_len = seq_len

self.sos_token = torch.tensor([

tokenizer_src.token_to_id('[SOS]')

], dtype=torch.int64)

self.eos_token = torch.tensor([

tokenizer_src.token_to_id('[EOS]')

], dtype=torch.int64)

self.pad_token = torch.tensor([

tokenizer_src.token_to_id('[PAD]')

], dtype=torch.int64)

def __len__(self):

return len(self.dataset)
def __getitem__(self, index) -> Any:

src_target_pair = self.dataset[index]

src_text = src_target_pair["translation"][self.src_lang]

tgt_text = src_target_pair["translation"][self.tgt_lang]

enc_input_tokens = self.tokenizer_src.encode(src_text).ids

dec_input_tokens = self.tokenizer_tgt.encode(tgt_text).ids

enc_num_padding_tokens = self.seq_len - len(enc_input_tokens) - 2

dec_num_padding_tokens = self.seq_len - len(dec_input_tokens) - 1

if enc_num_padding_tokens < 0 or dec_num_padding_tokens < 0:

raise ValueError("Someone is too long")

# add SOS and EOS to the source text

encoder_input = torch.cat(

self.sos_token,

torch.tensor(enc_input_tokens, dtype=torch.int64),

self.eos_token,

torch.tensor([self.pad_token] * enc_num_padding_tokens, dtype=torch.int64)

# add SOS to the decoder input

decoder_input = torch.cat(

self.sos_token,

torch.tensor(dec_input_tokens, dtype=torch.int64),

torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype=torch.int64)

]
)

# add EOS to the label

label = torch.cat(

torch.tensor(dec_input_tokens, dtype=torch.int64),

self.eos_token,

torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype=torch.int64)

assert encoder_input.size(0) == self.seq_len

assert decoder_input.size(0) == self.seq_len

assert label.size(0) == self.seq_len

return {

"encoder_input": encoder_input, # seq_len

"decoder_input": decoder_input, # seq_len

"encoder_mask": (encoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int(), # (1, 1,


seq_len)

"decoder_mask": (decoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int() &


casual_mask(decoder_input.size(0)), # (seq_len, 1) & (1, seq_len, seq_len)

"label": label, # (seq_len)

"src_text": src_text,

"tgt_text": tgt_text

def casual_mask(size):

mask = torch.triu(torch.ones(1, size, size), diagonal=1).type(torch.int)

return mask == 0
Model.py

# import libraries

import torch

import math

import torch.nn as nn

# d_model -> size of embbeded vector

# h -> number of heads

class InputEmbeddings(nn.Module):

def __init__(self, d_model: int, vocab_size: int):

super().__init__()

self.d_model = d_model

self.vocab_size = vocab_size

self.embedding = nn.Embedding(vocab_size, d_model)

def forward(self, x):

return self.embedding(x) * math.sqrt(self.d_model)

class PositionalEncoding(nn.Module):

def __init__(self, d_model: int, sen_len: int, dropout: float) -> None:

super().__init__()

self.d_model = d_model

self.sen_len = sen_len

self.dropout = nn.Dropout(dropout)

# create a matrix of shape (sen_len, d_model)


pe = torch.zeros(sen_len, d_model)

# create a vector of shape (sen_len)

position = torch.arange(0, sen_len, dtype=torch.float).unsqueeze(1) # shape: (sen_len, 1)

# formula

div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

# apply sin to even positions

pe[:, 0::2] = torch.sin(position * div_term)

# apply cos to odd positions

pe[:, 1::2] = torch.cos(position * div_term)

pe = pe.unsqueeze(0) # shape: (1, sen_len, d_model)

self.register_buffer("pe", pe)

def forward(self, x):

x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)

return self.dropout(x)

class LayerNormalization(nn.Module):

def __init__(self, eps: float = 10**-6) -> None:

super().__init__()

self.eps = eps

# alpha -> Multiplicative

self.alpha = nn.Parameter(torch.ones(1))

# beta -> Addictive

self.beta = nn.Parameter(torch.zeros(1))
def forward(self, x):

mean = x.mean(dim = -1, keepdim=True)

std = x.std(dim = -1, keepdim=True)

return self.alpha * (x - mean) / (std + self.eps) + self.beta

class FeedForwardBlock(nn.Module):

def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:

super().__init__()

self.linear_01 = nn.Linear(d_model, d_ff) # w1 and b1

self.dropout = nn.Dropout(dropout)

self.linear_02 = nn.Linear(d_ff, d_model) # w2 and b2

def forward(self, x):

# (Batch, sen_len, d_model) --> (Batch, sen_len, d_ff) --> (Batch, sen_len, d_model)

return self.linear_02(self.dropout(torch.relu(self.linear_01(x))))

class MultiHeadAttention(nn.Module):

def __init__(self, d_model: int, heads: int, dropout: float) -> None:

super().__init__()

self.d_model = d_model

self.heads = heads

assert d_model % heads == 0, "d_model is not divisible by heads"

self.d_k = d_model // heads


# set the query, key and value vector

self.w_q = nn.Linear(d_model, d_model) # w_q

self.w_k = nn.Linear(d_model, d_model) # w_k

self.w_v = nn.Linear(d_model, d_model) # w_v

# output

self.w_o = nn.Linear(d_model, d_model) # w_o

self.dropout = nn.Dropout(dropout)

@staticmethod

def Attention(query, key, value, mask, dropout: nn.Dropout):

d_k = query.shape[-1]

# (Batch, h, sen_len, d_k) --> (Batch, h, sen_len, sen_len)

attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)

if mask is not None:

attention_scores.masked_fill_(mask == 0, -1e9)

attention_scores = attention_scores.softmax(dim = -1) # (Batch, h, sen_len, sen_len)

if dropout is not None:

attention_scores = dropout(attention_scores)

return (attention_scores @ value), attention_scores

def forward(self, q, k, v, mask):

# (Batch, sen_len, d_model) --> (Batch, sen_len, d_model)

query = self.w_q(q)
key = self.w_k(k)

value = self.w_v(v)

# (Batch, sen_len, d_model) --> (Batch, sen_len, heads, d_k) --> (Batch, heads, sen_len, d_k)

query = query.view(query.shape[0], query.shape[1], self.heads, self.d_k).transpose(1, 2)

key = key.view(key.shape[0], key.shape[1], self.heads, self.d_k).transpose(1, 2)

value = value.view(value.shape[0], value.shape[1], self.heads, self.d_k).transpose(1, 2)

# call the attention mechanism

x, self.attention_scores = MultiHeadAttention.Attention(query, key, value, mask, self.dropout)

# (Batch, heads, sen_len, d_k) --> (Batch, sen_len, heads, d_k) --> (Batch, sen_len, d_model)

x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.heads * self.d_k)

# (Batch, sen_len, d_model) --> (Batch, sen_len, d_model)

return self.w_o(x)

class ResidualConnection(nn.Module):

def __init__(self, dropout: float):

super().__init__()

self.dropout = nn.Dropout(dropout)

self.norm = LayerNormalization()

def forward(self, x, sublayer):

return x + self.dropout(sublayer(self.norm(x)))

class EncoderBlock(nn.Module):
def __init__(self, self_attention_block: MultiHeadAttention, feed_forward_network:
FeedForwardBlock, dropout: float):

super().__init__()

self.self_attention_block = self_attention_block

self.feed_forward_block = feed_forward_network

self.residual_connection = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])

def forward(self, x, src_mask):

x = self.residual_connection[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))

x = self.residual_connection[1](x, self.feed_forward_block)

return x

class Encoder(nn.Module):

def __init__(self, layers: nn.ModuleList):

super().__init__()

self.layers = layers

self.norm = LayerNormalization()

def forward(self, x, mask):

for layer in self.layers:

x = layer(x, mask)

return self.norm(x)

class DecoderBlock(nn.Module):

def __init__(self, self_attention_block: MultiHeadAttention, cross_attention_block:


MultiHeadAttention, feed_forward_block: FeedForwardBlock, dropout: float):

super().__init__()
self.self_attention_block = self_attention_block

self.cross_attention_block = cross_attention_block

self.feed_forward_block = feed_forward_block

self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(3)])

def forward(self, x, encoder_output, src_mask, tgt_mask):

x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))

x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output,


encoder_output, src_mask))

x = self.residual_connections[2](x, self.feed_forward_block)

return x

class Decoder(nn.Module):

def __init__(self, layers: nn.ModuleList):

super().__init__()

self.layers = layers

self.norm = LayerNormalization()

def forward(self, x,encoder_output, src_mask, tgt_mask):

for layer in self.layers:

x = layer(x, encoder_output, src_mask, tgt_mask)

return self.norm(x)

class ProjectionLayer(nn.Module):

def __init__(self, d_model: int, vocab_size: int):

super().__init__()
self.linear = nn.Linear(d_model, vocab_size)

def forward(self, x):

# (Batch, sen_len, d_model) --> (Batch, sen_len, vocab_size)

return torch.log_softmax(self.linear(x), dim=-1)

class Transformer(nn.Module):

def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed:


InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, proj: ProjectionLayer):

super().__init__()

self.encoder = encoder

self.decoder = decoder

self.src_embed = src_embed

self.tgt_embed = tgt_embed

self.src_pos = src_pos

self.tgt_pos = tgt_pos

self.projection = proj

def encode(self, src, src_mask):

src = self.src_embed(src)

src = self.src_pos(src)

return self.encoder(src, src_mask)

def decode(self, encoder_output, src_mask, tgt, tgt_mask):

tgt = self.tgt_embed(tgt)

tgt = self.tgt_pos(tgt)

return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

def project(self, x):


return self.projection(x)

# build the transformer

# N -> number of encoder and decoder blocks

def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int,


d_model: int = 512, N: int = 6, heads: int = 8, dropout: float = 0.1, d_ff: int = 2048) -> Transformer:

# create the embedding layers

src_embed = InputEmbeddings(d_model, src_vocab_size)

tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)

# create the positional encodings

src_pos = PositionalEncoding(d_model, src_seq_len, dropout)

tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)

# create the encoder blocks

encoder_blocks = []

for _ in range(N):

encoder_self_attention_block = MultiHeadAttention(d_model, heads, dropout)

feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)

encoder_block = EncoderBlock(encoder_self_attention_block, feed_forward_block, dropout)

encoder_blocks.append(encoder_block)

# create the decoder blocks

decoder_blocks = []

for _ in range(N):

decoder_self_attention_block = MultiHeadAttention(d_model, heads, dropout)


decoder_cross_attention_block = MultiHeadAttention(d_model, heads, dropout)

feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)

decoder_block = DecoderBlock(decoder_self_attention_block, decoder_cross_attention_block,


feed_forward_block, dropout)

decoder_blocks.append(decoder_block)

# create encoder and decoder blocks

encoder = Encoder(nn.ModuleList(encoder_blocks))

decoder = Decoder(nn.ModuleList(decoder_blocks))

# create the projection layer

projection_layer = ProjectionLayer(d_model, tgt_vocab_size)

# create the transformer

transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos,


projection_layer)

# Intialize the parameters

for p in transformer.parameters():

if p.dim() > 1:

nn.init.xavier_uniform_(p)

return transformer
Train.py

import torch

import warnings

import torch.nn as nn

from torch.utils.data import Dataset, DataLoader, random_split

from torch.utils.tensorboard import SummaryWriter

from tqdm import tqdm

from datasets import load_dataset

from tokenizers import Tokenizer

from tokenizers.models import WordLevel

from tokenizers.trainers import WordLevelTrainer

from tokenizers.pre_tokenizers import Whitespace

from pathlib import Path

from corpus import BillingualDataset, casual_mask

from model import build_transformer

from config import get_weights_file_path, get_config

def get_all_sentences(dataset, lang):

for item in dataset:

yield item['translation'][lang]

# build the tokenizer

def get_or_build_tokenizer(config, dataset, lang):

# tokenizer path

Tokenizer_path = Path(config["tokenizer_file"].format(lang))
if not Path.exists(Tokenizer_path):

tokenizer = Tokenizer(WordLevel(unk_token='[UNK]'))

tokenizer.pre_tokenizer = Whitespace()

trainer = WordLevelTrainer(special_tokens=["[UNK]","[PAD]","[SOS]","[EOS]"], min_frequency=2)

tokenizer.train_from_iterator(get_all_sentences(dataset, lang), trainer=trainer)

tokenizer.save(str(Tokenizer_path))

else:

tokenizer = Tokenizer.from_file(str(Tokenizer_path))

return tokenizer

# get the dataset

def get_dataset(config):

dataset_raw = load_dataset("opus_books", f"{config['lang_src']}-{config['lang_tgt']}", split="train")

# build the tokenizer

tokenizer_src = get_or_build_tokenizer(config, dataset_raw, config["lang_src"])

tokenizer_tgt = get_or_build_tokenizer(config, dataset_raw, config["lang_tgt"])

# keep the 90% data for training and 10% for testing

train_ds_size = int(0.9 * len(dataset_raw))

val_ds_size = len(dataset_raw) - train_ds_size

train_ds_raw, val_ds_raw = random_split(dataset_raw, [train_ds_size, val_ds_size])

train_ds = BillingualDataset(train_ds_raw, tokenizer_src, tokenizer_tgt, config["lang_src"],


config["lang_tgt"], config["seq_len"])

val_ds = BillingualDataset(val_ds_raw, tokenizer_src, tokenizer_tgt, config["lang_src"],


config["lang_tgt"], config["seq_len"])

max_len_src = 0
max_len_tgt = 0

for item in dataset_raw:

src_ids = tokenizer_src.encode(item["translation"][config["lang_src"]]).ids

tgt_ids = tokenizer_tgt.encode(item["translation"][config["lang_tgt"]]).ids

max_len_src = max(max_len_src, len(src_ids))

max_len_tgt = max(max_len_tgt, len(tgt_ids))

print(f"Max length of source sentence: {max_len_src}")

print(f"Max length of target sentence: {max_len_tgt}")

# create a data loader

train_dataloader = DataLoader(

train_ds,

batch_size=config["batch_size"],

shuffle=True

val_dataloader = DataLoader(

val_ds,

batch_size=1,

shuffle=True

return train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt

def get_model(config, vocab_src_len, vocab_tgt_len):

# build the model


model = build_transformer(vocab_src_len, vocab_tgt_len, config["seq_len"], config["seq_len"],
config["d_model"])

return model

# create an model training loop

def train_model(config):

# define the device

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using device: {device}")

# create an model folder

Path(config["model_folder"]).mkdir(parents=True, exist_ok=True)

# get the data loaders

train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = get_dataset(config)

# get the tokenizer model

model = get_model(config, tokenizer_src.get_vocab_size(), tokenizer_tgt.get_vocab_size())

# change the model into device

model = model.to(device)

# Tensorboard

writer = SummaryWriter(config["experiment_name"])

# set the optimizer

optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"], eps=1e-9)

intial_epoch = 0
global_step = 0

if config["preload"]:

model_filename = get_weights_file_path(config, config["preload"])

print(f"Preloading model: {model_filename}")

state = torch.load(model_filename)

intial_epoch = state["epoch"] + 1

optimizer.load_state_dict(state["optimizer_state_dict"])

global_step = state["global_step"]

# set the loss function

loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer_src.token_to_id("[PAD]"),
label_smoothing=0.1).to(device)

for epoch in range(intial_epoch, config["num_epochs"]):

model.train()

batch_iterator = tqdm(train_dataloader, desc=f"Processing epoch {epoch:02d}")

for batch in batch_iterator:

encoder_input = batch["encoder_input"].to(device) # (Batch_size, seq_len)

decoder_input = batch["decoder_input"].to(device) # (Batch_size, seq_len)

encoder_mask = batch["encoder_mask"].to(device) # (Batch_size, 1, 1, seq_len)

decoder_mask = batch["decoder_mask"].to(device) # (Batch_size, 1, seq_len, seq_len)

# run the tensors through transformers

encoder_output = model.encode(encoder_input, encoder_mask) # (Batch_size, seq_len,


d_model)

decoder_output = model.decode(encoder_output, encoder_mask, decoder_input,


decoder_mask) # (Batch_size, seq_len, d_model)

projec_output = model.project(decoder_output) # (Batch_size, seq_len, tgt_vocab_size)


label = batch['label'].to(device) # (Batch, seq_len)

# (Batch_size, seq_len, tgt_vocab_size) --> (Batch_size, seq_len, tgt_vocab_size)

loss = loss_fn(projec_output.view(-1, tokenizer_tgt.get_vocab_size()), label.view(-1))

batch_iterator.set_postfix({f"loss": f"{loss.item():6.3f}"})

# log the loss

writer.add_scalar("train loss", loss.item(), global_step)

writer.flush()

# backpropagation the loss

loss.backward()

# update the weights

optimizer.step()

optimizer.zero_grad()

global_step += 1

# save the model

model_filename = get_weights_file_path(config, f"{epoch:02d}")

torch.save(

"epoch": epoch,

"model_state_dict": model.state_dict(),

"optimizer_state_dict": optimizer.state_dict(),

"global_step": global_step

},

model_filename

)
if __name__ == "__main__":

warnings.filterwarnings("ignore")

config = get_config()

train_model(config)

Project Structure

You might also like