|
|
from PIL import Image
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
import os
|
|
|
from pathlib import Path
|
|
|
import scipy.io
|
|
|
import shutil
|
|
|
import torch
|
|
|
import time
|
|
|
import cv2
|
|
|
from torchvision import models, transforms
|
|
|
|
|
|
from utils.logger_setup import logger
|
|
|
from extractor import visualise_vgg_layer, visualise_resnet_layer, visualise_vit_layer, vf_extract
|
|
|
|
|
|
|
|
|
def load_metadata(video_type):
|
|
|
print(f'video_type: {video_type}\n')
|
|
|
|
|
|
if video_type == 'test':
|
|
|
return pd.read_csv("../metadata/test_videos.csv")
|
|
|
|
|
|
elif video_type == 'resolution_ugc':
|
|
|
resolution = '360P'
|
|
|
return pd.read_csv(f"../metadata/YOUTUBE_UGC_{resolution}_metadata.csv")
|
|
|
else:
|
|
|
return pd.read_csv(f'../metadata/{video_type.upper()}_metadata.csv')
|
|
|
|
|
|
def get_video_paths(network_name, video_type, videodata, i):
|
|
|
video_name = videodata['vid'][i]
|
|
|
video_width = videodata['width'][i]
|
|
|
video_height = videodata['height'][i]
|
|
|
pixfmt = videodata['pixfmt'][i]
|
|
|
framerate = videodata['framerate'][i]
|
|
|
common_path = os.path.join('..', 'video_sampled_frame')
|
|
|
|
|
|
|
|
|
if video_type == 'test':
|
|
|
video_path = f"../ugc_original_videos/{video_name}.mp4"
|
|
|
|
|
|
|
|
|
elif video_type == 'konvid_1k':
|
|
|
video_path = Path("D:/video_dataset/KoNViD_1k/KoNViD_1k_videos") / f"{video_name}.mp4"
|
|
|
elif video_type == 'lsvq_train' or video_type == 'lsvq_test' or video_type == 'lsvq_test_1080P':
|
|
|
print(f'video_name: {video_name}')
|
|
|
video_path = Path("D:/video_dataset/LSVQ") / f"{video_name}.mp4"
|
|
|
print(f'video_path: {video_path}')
|
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
|
elif video_type == 'live_vqc':
|
|
|
video_path = Path("D:/video_dataset/LIVE-VQC/video") / f"{video_name}.mp4"
|
|
|
elif video_type == 'live_qualcomm':
|
|
|
video_path = Path("D:/video_dataset/LIVE-Qualcomm") / f"{video_name}.yuv"
|
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
|
elif video_type == 'cvd_2014':
|
|
|
video_path = Path("D:/video_dataset/CVD2014") / f"{video_name}.avi"
|
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
|
elif video_type == 'youtube_ugc':
|
|
|
video_path = Path("D:/video_dataset/ugc-dataset/youtube_ugc/") / f"{video_name}.mkv"
|
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
|
sampled_frame_path = os.path.join(common_path, f'pool', f'video_{str(i + 1)}')
|
|
|
feature_name = f"{network_name}_feature_map"
|
|
|
|
|
|
if video_type == 'resolution_ugc':
|
|
|
resolution = '360P'
|
|
|
|
|
|
video_path = Path(f"D:/video_dataset/ugc-dataset/youtube_ugc/original_videos/{resolution}") / f"{video_name}.mkv"
|
|
|
sampled_frame_path = os.path.join(common_path, f'ytugc_sampled_frame_{resolution}', f'video_{str(i + 1)}')
|
|
|
feature_name = f"{network_name}_feature_map_{resolution}"
|
|
|
|
|
|
return video_name, video_path, sampled_frame_path, feature_name, video_width, video_height, pixfmt, framerate
|
|
|
|
|
|
def get_deep_feature(network_name, video_name, frame, frame_number, model, device, layer_name):
|
|
|
png_path = f'../visualisation/{network_name}_{layer_name}/{video_name}/'
|
|
|
os.makedirs(png_path, exist_ok=True)
|
|
|
|
|
|
if network_name == 'resnet50':
|
|
|
if layer_name == 'pool':
|
|
|
visual_layer = 'resnet50.avgpool'
|
|
|
resnet50 = model
|
|
|
activations_dict, _, total_flops, total_params = visualise_resnet_layer.process_video_frame(video_name, frame, frame_number, visual_layer, resnet50, device)
|
|
|
|
|
|
elif network_name == 'vgg16':
|
|
|
if layer_name == 'pool':
|
|
|
|
|
|
visual_layer = 'fc2'
|
|
|
vgg16 = model
|
|
|
activations_dict, _, total_flops, total_params = visualise_vgg_layer.process_video_frame(video_name, frame, frame_number, visual_layer, vgg16, device)
|
|
|
|
|
|
elif network_name == 'vit':
|
|
|
patch_size = 16
|
|
|
activations_dict, _, total_flops, total_params = visualise_vit_layer.process_video_frame(video_name, frame, frame_number, model, patch_size, device)
|
|
|
|
|
|
return png_path, activations_dict, total_flops, total_params
|
|
|
|
|
|
def process_video_feature(video_feature, network_name, layer_name):
|
|
|
|
|
|
|
|
|
|
|
|
averaged_frames = []
|
|
|
|
|
|
for frame in video_feature:
|
|
|
frame_features = []
|
|
|
|
|
|
if layer_name == 'pool':
|
|
|
if network_name == 'vit':
|
|
|
|
|
|
global_mean = torch.mean(frame, dim=0)
|
|
|
global_max = torch.max(frame, dim=0)[0]
|
|
|
global_std = torch.std(frame, dim=0)
|
|
|
|
|
|
combined_features = torch.hstack([global_mean, global_max, global_std])
|
|
|
frame_features.append(combined_features)
|
|
|
|
|
|
elif network_name == 'resnet50':
|
|
|
frame = torch.squeeze(torch.tensor(frame))
|
|
|
|
|
|
global_mean = torch.mean(frame, dim=0)
|
|
|
global_max = torch.max(frame, dim=0)[0]
|
|
|
global_std = torch.std(frame, dim=0)
|
|
|
|
|
|
combined_features = torch.hstack([frame, global_mean, global_max, global_std])
|
|
|
frame_features.append(combined_features)
|
|
|
|
|
|
|
|
|
processed_frame = torch.hstack(frame_features)
|
|
|
averaged_frames.append(processed_frame)
|
|
|
averaged_frames = torch.stack(averaged_frames)
|
|
|
|
|
|
|
|
|
logger.debug(f"Shape of feature vector after global pooling: {averaged_frames.shape}")
|
|
|
return averaged_frames
|
|
|
|
|
|
def flow_to_rgb(flow):
|
|
|
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
|
|
|
mag = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
|
|
|
|
|
|
hue = ang * 180 / np.pi / 2
|
|
|
|
|
|
|
|
|
hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
|
|
|
hsv[..., 0] = hue
|
|
|
hsv[..., 1] = 255
|
|
|
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
|
|
|
|
|
|
rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
|
|
|
return rgb
|
|
|
|
|
|
|
|
|
def compute_frame_difference(frame_tensor, frame_next_tensor):
|
|
|
residual = torch.abs(frame_next_tensor - frame_tensor)
|
|
|
return residual
|
|
|
|
|
|
|
|
|
def compute_optical_flow(frame, frame_next, device):
|
|
|
flow = cv2.calcOpticalFlowFarneback(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY),
|
|
|
cv2.cvtColor(frame_next, cv2.COLOR_BGR2GRAY),
|
|
|
None, 0.5, 3, 15, 3, 5, 1.2, 0)
|
|
|
opticalflow_rgb = flow_to_rgb(flow)
|
|
|
opticalflow_rgb_tensor = transforms.ToTensor()(opticalflow_rgb).unsqueeze(0).to(device)
|
|
|
return opticalflow_rgb_tensor
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
|
|
|
if device.type == "cuda":
|
|
|
torch.cuda.set_device(0)
|
|
|
|
|
|
|
|
|
video_type = 'test'
|
|
|
|
|
|
|
|
|
frame_name = 'sampled_frame'
|
|
|
network_name = 'vit'
|
|
|
layer_name = 'pool'
|
|
|
if network_name == 'vit':
|
|
|
model = visualise_vit_layer.VitGenerator('vit_base', 16, device, evaluate=True, random=False, verbose=True)
|
|
|
elif network_name == 'resnet50':
|
|
|
model = models.resnet50(pretrained=True).to(device)
|
|
|
else:
|
|
|
model = models.vgg16(pretrained=True).to(device)
|
|
|
|
|
|
logger.info(f"video type: {video_type}, frame name: {frame_name}, network name: {network_name}, layer name: {layer_name}")
|
|
|
logger.info(f"torch cuda: {torch.cuda.is_available()}")
|
|
|
|
|
|
videodata = load_metadata(video_type)
|
|
|
valid_video_types = ['test',
|
|
|
'resolution_ugc', 'konvid_1k', 'live_vqc', 'cvd_2014', 'live_qualcomm',
|
|
|
'lsvq_train', 'lsvq_test', 'lsvq_test_1080P']
|
|
|
|
|
|
begin_time = time.time()
|
|
|
if video_type in valid_video_types:
|
|
|
for i in range(len(videodata)):
|
|
|
start_time = time.time()
|
|
|
|
|
|
video_name, video_path, sampled_frame_path, feature_name, video_width, video_height, pixfmt, framerate = get_video_paths(network_name, video_type, videodata, i)
|
|
|
frames, frames_next = vf_extract.process_video_residual(video_type, video_name, framerate, video_path, sampled_frame_path)
|
|
|
|
|
|
logger.info(f'{video_name}')
|
|
|
all_frame_activations_feats = []
|
|
|
for j, (frame, frame_next) in enumerate(zip(frames, frames_next)):
|
|
|
frame_number = j + 1
|
|
|
|
|
|
|
|
|
if frame_name in ['frame_diff', 'optical_flow']:
|
|
|
|
|
|
frame_tensor = transforms.ToTensor()(frame).unsqueeze(0).to(device)
|
|
|
frame_next_tensor = transforms.ToTensor()(frame_next).unsqueeze(0).to(device)
|
|
|
|
|
|
if frame_name == 'frame_diff':
|
|
|
residual = compute_frame_difference(frame_tensor, frame_next_tensor)
|
|
|
png_path, frag_activations, total_flops, total_params = get_deep_feature(network_name, video_name, residual, frame_number, model, device, layer_name)
|
|
|
|
|
|
elif frame_name == 'optical_flow':
|
|
|
opticalflow_rgb_tensor = compute_optical_flow(frame, frame_next, device)
|
|
|
png_path, frag_activations, total_flops, total_params = get_deep_feature(network_name, video_name, opticalflow_rgb_tensor, frame_number, model, device, layer_name)
|
|
|
|
|
|
elif frame_name == 'sampled_frame':
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
frame_rgb_tensor = transforms.ToTensor()(frame_rgb).unsqueeze(0).to(device)
|
|
|
png_path, frag_activations, total_flops, total_params = get_deep_feature(network_name, video_name, frame_rgb_tensor, frame_number, model, device, layer_name)
|
|
|
|
|
|
|
|
|
all_frame_activations_feats.append(frag_activations)
|
|
|
|
|
|
averaged_frames_feats = process_video_feature(all_frame_activations_feats, network_name, layer_name)
|
|
|
print("Features shape:", averaged_frames_feats.shape)
|
|
|
|
|
|
shutil.rmtree(png_path)
|
|
|
shutil.rmtree(sampled_frame_path)
|
|
|
|
|
|
averaged_npy = averaged_frames_feats.cpu().numpy()
|
|
|
|
|
|
output_npy_path = f'../features/{video_type}/{frame_name}_{network_name}_{layer_name}/'
|
|
|
os.makedirs(output_npy_path, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
run_time = time.time() - start_time
|
|
|
print(f"Execution time for {video_name} feature extraction: {run_time:.4f} seconds\n")
|
|
|
|
|
|
|
|
|
average_data = np.mean(averaged_npy, axis=0)
|
|
|
if i == 0:
|
|
|
feats_matrix = np.zeros((len(videodata),) + average_data.shape)
|
|
|
feats_matrix[i] = average_data
|
|
|
|
|
|
print((f'All features shape: {feats_matrix.shape}'))
|
|
|
logger.debug(f'\n All features shape: {feats_matrix.shape}')
|
|
|
mat_file_path = f'../features/{video_type}/'
|
|
|
mat_file_name = f'{mat_file_path}{video_type}_{frame_name}_{network_name}_{layer_name}_feats.mat'
|
|
|
scipy.io.savemat(mat_file_name, {video_type: feats_matrix})
|
|
|
logger.debug(f'Successfully created {mat_file_name}')
|
|
|
logger.debug(f"Execution time for all feature extraction: {time.time() - begin_time:.4f} seconds\n") |