File size: 5,552 Bytes
af758d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import cv2
import argparse
def create_video_from_frames_opencv(frames, output_video_path, fps):
if len(frames) == 0:
raise ValueError("No frames to write to video")
height, width = frames[0].shape[:2]
if not output_video_path.lower().endswith(".mp4"):
output_video_path += ".mp4"
# Try avc1 fourcc for H264
fourcc = cv2.VideoWriter_fourcc(*"avc1")
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
if not video_writer.isOpened():
print("avc1 codec failed, trying H264...")
fourcc = cv2.VideoWriter_fourcc(*"H264")
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
if not video_writer.isOpened():
raise RuntimeError("Failed to open video writer with H264 codec. Check your OpenCV/ffmpeg installation.")
for frame in frames:
video_writer.write(frame)
video_writer.release()
def extract_frames_from_videos(base_path, sub_path_after_time_index, output_dir, max_time_idx, fps):
os.makedirs(output_dir, exist_ok=True)
# Find all scene indices by inspecting time_idx=0 folder for files like rgb_{scene_idx}_view_idx_0_0.4.mp4
time0_path = os.path.join(base_path, "0", sub_path_after_time_index)
if not os.path.isdir(time0_path):
raise FileNotFoundError(f"Time index 0 path does not exist: {time0_path}")
scene_files = [f for f in os.listdir(time0_path) if f.startswith("rgb_") and f.endswith(".mp4")]
# Extract scene_idx from filenames like rgb_{scene_idx}_view_idx_0_0.4.mp4
scene_indices = []
for f in scene_files:
parts = f.split("_")
if len(parts) >= 2 and parts[0] == "rgb":
try:
scene_idx = int(parts[1])
scene_indices.append(scene_idx)
except Exception:
continue
scene_indices = sorted(set(scene_indices))
print(f"Found scene indices: {scene_indices}")
# Collect existing time indices (folders) in base_path
existing_time_indices = []
for ti in range(max_time_idx + 1):
folder = os.path.join(base_path, str(ti), sub_path_after_time_index)
if os.path.isdir(folder):
existing_time_indices.append(ti)
print(f"Using existing time indices: {existing_time_indices}")
for scene_idx in scene_indices:
frames = []
for idx, time_idx in enumerate(existing_time_indices):
video_path = os.path.join(base_path, str(time_idx), sub_path_after_time_index,
f"rgb_{scene_idx}_view_idx_0_0.4.mp4")
if not os.path.isfile(video_path):
print(f"Missing video for scene {scene_idx}, time_idx {time_idx}: {video_path}")
continue
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Cannot open video {video_path}")
continue
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
print(f"No frames found in video {video_path}")
cap.release()
continue
frame_idx = idx % total_frames # wrap frame idx if needed
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret or frame is None:
print(f"Failed to read frame {frame_idx} from {video_path}")
cap.release()
continue
frames.append(frame)
cap.release()
if len(frames) == 0:
print(f"No frames extracted for scene {scene_idx}, skipping video creation.")
continue
output_video_path = os.path.join(output_dir, f"{scene_idx}.mp4")
print(f"Writing video for scene {scene_idx} with {len(frames)} frames to {output_video_path}")
create_video_from_frames_opencv(frames, output_video_path, fps)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Extract frames across time indices and create concatenated videos per scene.")
parser.add_argument("--base_path", type=str, required=True, help="Base path containing time index folders")
parser.add_argument("--sub_path_after_time_index", type=str, required=True, help="Sub path inside each time index folder")
parser.add_argument("--output_dir", type=str, required=True, help="Output directory for generated videos")
parser.add_argument("--max_time_idx", type=int, default=120, help="Maximum time index to consider")
parser.add_argument("--fps", type=float, default=10, help="Frames per second for output video")
args = parser.parse_args()
extract_frames_from_videos(
args.base_path,
args.sub_path_after_time_index,
args.output_dir,
args.max_time_idx,
args.fps
)
|