Skip to content

Commit fd0baf2

Browse files
committed
Large_Amendment
1. rename and add the annotation of all the code. 2. the whole code is more readable.
1 parent 63cb515 commit fd0baf2

File tree

8 files changed

+574
-258981
lines changed

8 files changed

+574
-258981
lines changed

H2S_clip_mediapipe.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import os
2+
import cv2
3+
import mediapipe as mp
4+
import numpy as np
5+
import logging
6+
from glob import glob
7+
from typing import List, Dict
8+
from concurrent.futures import ProcessPoolExecutor
9+
import pandas as pd
10+
11+
import conf as c # Keeping original conf import name
12+
13+
logging.basicConfig(level=logging.DEBUG)
14+
logger = logging.getLogger(__name__)
15+
16+
mp_holistic = mp.solutions.holistic
17+
18+
19+
def read_timestamp_data(csv_file: str) -> Dict[str, List[float]]:
20+
"""
21+
Reads and processes timestamp data from a CSV file.
22+
23+
Args:
24+
csv_file (str): Path to the CSV file containing timestamp information
25+
26+
Returns:
27+
Dict[str, List[float]]: Dictionary mapping segment names to [start, end] timestamps
28+
"""
29+
try:
30+
df = pd.read_csv(csv_file, delimiter=",", on_bad_lines="skip")[
31+
["SENTENCE_NAME", "START", "END"]
32+
].dropna()
33+
return (
34+
df.set_index("SENTENCE_NAME")[["START", "END"]]
35+
.apply(lambda row: [row["START"], row["END"]], axis=1)
36+
.to_dict()
37+
)
38+
except Exception as e:
39+
logger.error(f"Error loading CSV file {csv_file}: {e}")
40+
return {}
41+
42+
43+
def get_video_filenames(directory: str, pattern="*.mp4") -> List[str]:
44+
"""
45+
Retrieves video filenames from specified directory without extensions.
46+
47+
Args:
48+
directory (str): Directory path to search for video files
49+
pattern (str): File pattern to match (default: "*.mp4")
50+
51+
Returns:
52+
List[str]: List of filenames without extensions
53+
"""
54+
return [
55+
os.path.splitext(os.path.basename(f))[0]
56+
for f in glob(os.path.join(directory, pattern))
57+
]
58+
59+
60+
def process_mediapipe_detection(image, model):
61+
"""
62+
Processes an image through MediaPipe detection model.
63+
64+
Args:
65+
image: Input image in BGR format
66+
model: MediaPipe model instance
67+
68+
Returns:
69+
MediaPipe detection results
70+
"""
71+
return model.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
72+
73+
74+
def extract_landmark_coordinates(results):
75+
"""
76+
Extracts landmark coordinates from MediaPipe detection results.
77+
78+
Args:
79+
results: MediaPipe detection results
80+
81+
Returns:
82+
np.ndarray: Concatenated array of pose, face, and hand landmarks
83+
"""
84+
85+
def convert_landmarks_to_array(landmarks, indices):
86+
return (
87+
np.array(
88+
[[landmarks[i].x, landmarks[i].y, landmarks[i].z] for i in indices]
89+
)
90+
if landmarks
91+
else np.zeros((len(indices), 3))
92+
)
93+
94+
# Extract landmarks for different body parts
95+
pose_landmarks = convert_landmarks_to_array(
96+
getattr(results.pose_landmarks, "landmark", None), c.POSE_IDX
97+
)
98+
left_hand = convert_landmarks_to_array(
99+
getattr(results.left_hand_landmarks, "landmark", None), c.HAND_IDX
100+
)
101+
right_hand = convert_landmarks_to_array(
102+
getattr(results.right_hand_landmarks, "landmark", None), c.HAND_IDX
103+
)
104+
face_landmarks = convert_landmarks_to_array(
105+
getattr(results.face_landmarks, "landmark", None), c.FACE_IDX
106+
)
107+
108+
return np.concatenate(
109+
[
110+
pose_landmarks.flatten(),
111+
face_landmarks.flatten(),
112+
left_hand.flatten(),
113+
right_hand.flatten(),
114+
]
115+
)
116+
117+
118+
def process_complete_video(video_path: str, output_file: str):
119+
"""
120+
Processes an entire video to extract holistic keypoints and saves them.
121+
122+
Args:
123+
video_path (str): Path to input video file
124+
output_file (str): Path to save extracted landmarks
125+
"""
126+
cap = cv2.VideoCapture(video_path)
127+
if not cap.isOpened():
128+
logger.error(f"Error opening video: {video_path}")
129+
return
130+
131+
# Get video properties and determine frame skip rate
132+
fps = cap.get(cv2.CAP_PROP_FPS)
133+
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
134+
135+
# Calculate frame skip based on configuration
136+
if c.LENGTH_BASED_MAX_FRAME and c.LENGTH_BASED_FRAME_SKIP:
137+
logger.error("Both LENGTH_BASED_MAX_FRAME and LENGTH_BASED_CONST are True.")
138+
return
139+
140+
frame_skip = (
141+
np.ceil(total_frames / c.MAX_FRAME)
142+
if c.LENGTH_BASED_MAX_FRAME
143+
else c.FRAME_SKIP if c.LENGTH_BASED_FRAME_SKIP else 1
144+
)
145+
146+
landmark_sequences = []
147+
with mp_holistic.Holistic(
148+
model_complexity=1,
149+
refine_face_landmarks=True,
150+
min_detection_confidence=0.5,
151+
min_tracking_confidence=0.5,
152+
) as holistic:
153+
current_frame = 0
154+
while current_frame < total_frames:
155+
ret, frame = cap.read()
156+
if not ret:
157+
break
158+
if current_frame % frame_skip == 0:
159+
results = process_mediapipe_detection(frame, holistic)
160+
landmark_sequences.append(extract_landmark_coordinates(results))
161+
current_frame += 1
162+
163+
cap.release()
164+
165+
# Save landmarks if valid data exists
166+
landmark_array = np.array(landmark_sequences)
167+
if landmark_array.size > 0 and np.any(landmark_array):
168+
os.makedirs(os.path.dirname(output_file), exist_ok=True)
169+
np.save(output_file, landmark_array)
170+
logger.info(f"Saved landmarks to {output_file}")
171+
else:
172+
logger.info(f"No valid landmarks for video {video_path}, not saving.")
173+
174+
175+
def main():
176+
"""
177+
Main function to orchestrate video processing and landmark extraction.
178+
Handles file management and parallel processing of complete videos.
179+
"""
180+
video_files = get_video_filenames(c.H2S_VIDEO_DIR)
181+
processed_files = get_video_filenames(c.H2S_OUTPUT_DIR, pattern="*.npy")
182+
183+
processing_tasks = []
184+
for video_name in video_files:
185+
video_path = os.path.join(c.H2S_VIDEO_DIR, f"{video_name}.mp4")
186+
output_path = os.path.join(c.H2S_OUTPUT_DIR, f"{video_name}.npy")
187+
if video_name not in processed_files:
188+
processing_tasks.append((video_path, output_path))
189+
else:
190+
logger.info(f"Skipping existing file: {output_path}")
191+
192+
# Process videos in parallel
193+
with ProcessPoolExecutor(max_workers=c.MAX_WORKERS) as executor:
194+
for video_path, output_path in processing_tasks:
195+
executor.submit(process_complete_video, video_path, output_path)
196+
197+
198+
if __name__ == "__main__":
199+
main()

H2S_mediapipe.py

Lines changed: 0 additions & 141 deletions
This file was deleted.

0 commit comments

Comments
 (0)