import os
import cv2
import numpy as np
from glob import glob
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
from torch.utils.data import DataLoader, Dataset
from timm import create_model
import torch
from torch import nn
from sklearn.mixture import GaussianMixture
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
from pgmpy.models import DynamicBayesianNetwork
from pgmpy.inference import DBNInference
# ========================================================
# Dataset-Specific Frame Extraction
# ========================================================
def extract_frames(video_path, output_dir, frame_rate=2):
"""
Extract frames from a video at the given frame rate.
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cap = cv2.VideoCapture(video_path)
frame_count = 0
success = True
while success:
success, frame = cap.read()
if success and frame_count % frame_rate == 0:
frame_filename = os.path.join(output_dir, f"frame_{frame_count}.jpg")
cv2.imwrite(frame_filename, frame)
frame_count += 1
cap.release()
# Example: Extract frames for UCF-Crime or Avenue
# for dataset in ["UCF-Crime/train", "Avenue/train"]:
# video_files = glob(os.path.join(dataset, "*.mp4"))
# for video_path in video_files:
# video_name = os.path.basename(video_path).split('.')[0]
# extract_frames(video_path, output_dir=f"{dataset}_frames/{video_name}")
# ========================================================
# Feature Extraction with TimeSformer
# ========================================================
# Load pre-trained TimeSformer model
model = create_model('timesformer_base_16x16', pretrained=True, num_classes=400)
model.head = nn.Identity() # Remove classification head
model.eval()
model = model.cuda()
# Dataset Loader for Frame Clips
class VideoDataset(Dataset):
def __init__(self, frame_paths, transform, clip_length=16):
self.frame_paths = frame_paths
self.transform = transform
self.clip_length = clip_length
def __len__(self):
return len(self.frame_paths) - self.clip_length + 1
def __getitem__(self, idx):
clip = [
self.transform(cv2.imread(self.frame_paths[i]))
for i in range(idx, idx + self.clip_length)
return torch.stack(clip)
# Preprocessing
transform = Compose([
Resize((224, 224)),
ToTensor(),
Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# Extract Features
def extract_features(dataset_dir):
frame_paths = sorted(glob(os.path.join(dataset_dir, "*.jpg")))
dataset = VideoDataset(frame_paths, transform)
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
features = []
with torch.no_grad():
for clips in dataloader:
clips = clips.squeeze(0).cuda()
feature = model(clips.unsqueeze(0))
features.append(feature.cpu().numpy())
return np.vstack(features)
# Example: Extract features
# train_features = extract_features("UCF-Crime/train_frames")
# test_features = extract_features("UCF-Crime/test_frames")
# ========================================================
# Anomaly Detection Models
# ========================================================
# Autoencoder for Reconstruction-Based Anomaly Detection
class Autoencoder(nn.Module):
def __init__(self, input_dim):
super(Autoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 512),
nn.ReLU(),
nn.Linear(512, 128),
nn.ReLU()
self.decoder = nn.Sequential(
nn.Linear(128, 512),
nn.ReLU(),
nn.Linear(512, input_dim)
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def train_autoencoder(features, input_dim, epochs=10):
autoencoder = Autoencoder(input_dim).cuda()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=1e-3)
criterion = nn.MSELoss()
features = torch.tensor(features).float().cuda()
for epoch in range(epochs):
for feature in features:
optimizer.zero_grad()
reconstructed = autoencoder(feature)
loss = criterion(reconstructed, feature)
loss.backward()
optimizer.step()
return autoencoder
# Gaussian Mixture Model
def train_gmm(features, n_components=5):
gmm = GaussianMixture(n_components=n_components, covariance_type='full', random_state=42)
gmm.fit(features)
return gmm
# Dynamic Bayesian Network
def train_dbn(features):
dbn = DynamicBayesianNetwork([
(('F1', 0), ('F1', 1)),
(('F2', 0), ('F2', 1)),
(('F1', 0), ('F2', 0)),
])
dbn.fit(features)
return dbn
# ========================================================
# Evaluate and Visualize Results
# ========================================================
def evaluate_anomalies(test_features, model, method="Autoencoder", threshold=95):
if method == "Autoencoder":
criterion = nn.MSELoss()
reconstruction_errors = []
for feature in test_features:
feature = torch.tensor(feature).float().cuda()
with torch.no_grad():
reconstructed = model(feature)
error = criterion(reconstructed, feature)
reconstruction_errors.append(error.item())
scores = np.array(reconstruction_errors)
elif method == "GMM":
scores = -model.score_samples(test_features)
elif method == "DBN":
dbn_inference = DBNInference(model)
log_likelihoods = []
for t in range(len(test_features) - 1):
observation = {'F1': test_features[t][0], 'F2': test_features[t][1]}
likelihood = dbn_inference.query(variables=['F1', 'F2'], evidence=observation)
log_likelihoods.append(likelihood.log_probability(observation))
scores = -np.array(log_likelihoods)
# Define threshold
threshold_value = np.percentile(scores, threshold)
anomalies = scores > threshold_value
return scores, threshold_value, anomalies
def visualize_anomalies(scores, threshold, title="Anomaly Detection"):
plt.figure(figsize=(10, 6))
plt.plot(scores, label="Anomaly Scores")
plt.axhline(y=threshold, color='r', linestyle='--', label="Threshold")
plt.legend()
plt.title(title)
plt.xlabel("Frames or Clips")
plt.ylabel("Anomaly Score")
plt.show()
# ========================================================
# Example Workflow for UCF-Crime and Avenue
# ========================================================
# UCF-Crime
# train_features = extract_features("UCF-Crime/train_frames")
# test_features = extract_features("UCF-Crime/test_frames")
# autoencoder = train_autoencoder(train_features, train_features.shape[1])
# scores, threshold, anomalies = evaluate_anomalies(test_features, autoencoder,
method="Autoencoder")
# visualize_anomalies(scores, threshold, "Autoencoder-Based Anomaly Detection (UCF-Crime)")
# Avenue
# train_features = extract_features("Avenue/train_frames")
# test_features = extract_features("Avenue/test_frames")
# gmm = train_gmm(train_features)
# scores, threshold, anomalies = evaluate_anomalies(test_features, gmm, method="GMM")
# visualize_anomalies(scores, threshold, "GMM-Based Anomaly Detection (Avenue)")