Skip to content

Commit b5995d6

Browse files
authored
Add tutorial for parallel decoding (#778)
1 parent 79151f6 commit b5995d6

File tree

4 files changed

+275
-0
lines changed

4 files changed

+275
-0
lines changed

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ torchvision
88
ipython
99
fsspec
1010
aiohttp
11+
joblib
1112
-e git+https://github.com/pytorch/pytorch_sphinx_theme.git#egg=pytorch_sphinx_theme

docs/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def __call__(self, filename):
8080
"file_like.py",
8181
"approximate_mode.py",
8282
"sampling.py",
83+
"parallel_decoding.py",
8384
]
8485
else:
8586
assert "examples/encoding" in self.src_dir

docs/source/index.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ Decoding
6868

6969
How to efficiently decode videos from the cloud
7070

71+
.. grid-item-card:: :octicon:`file-code;1em`
72+
Parallel decoding
73+
:img-top: _static/img/card-background.svg
74+
:link: generated_examples/decoding/parallel_decoding.html
75+
:link-type: url
76+
77+
How to decode a video with multiple processes or threads.
78+
7179
.. grid-item-card:: :octicon:`file-code;1em`
7280
Clip sampling
7381
:img-top: _static/img/card-background.svg
@@ -76,6 +84,7 @@ Decoding
7684

7785
How to sample regular and random clips from a video
7886

87+
7988
Encoding
8089
^^^^^^^^
8190

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
"""
8+
=============================================================
9+
Parallel video decoding: multi-processing and multi-threading
10+
=============================================================
11+
12+
In this tutorial, we'll explore different approaches to parallelize video
13+
decoding of a large number of frames from a single video. We'll compare three
14+
parallelization strategies:
15+
16+
1. **FFmpeg-based parallelism**: Using FFmpeg's internal threading capabilities
17+
2. **Joblib multiprocessing**: Distributing work across multiple processes
18+
3. **Joblib multithreading**: Using multiple threads within a single process
19+
20+
We'll use `joblib <https://joblib.readthedocs.io/en/latest/>`_ for
21+
parallelization, as it provides very convenient APIs for distributing work
22+
across multiple processes or threads. But this is just one of many ways to
23+
parallelize work in Python. You can absolutely use a different thread or process
24+
pool manager.
25+
"""
26+
27+
# %%
28+
# Let's first define some utility functions for benchmarking and data
29+
# processing. We'll also download a video and create a longer version by
30+
# repeating it multiple times. This simulates working with long videos that
31+
# require efficient processing. You can ignore that part and jump right below to
32+
# :ref:`start_parallel_decoding`.
33+
34+
from typing import List
35+
import torch
36+
import requests
37+
import tempfile
38+
from pathlib import Path
39+
import subprocess
40+
from time import perf_counter_ns
41+
from datetime import timedelta
42+
43+
from joblib import Parallel, delayed, cpu_count
44+
from torchcodec.decoders import VideoDecoder
45+
46+
47+
def bench(f, *args, num_exp=3, warmup=1, **kwargs):
48+
"""Benchmark a function by running it multiple times and measuring execution time."""
49+
for _ in range(warmup):
50+
f(*args, **kwargs)
51+
52+
times = []
53+
for _ in range(num_exp):
54+
start = perf_counter_ns()
55+
result = f(*args, **kwargs)
56+
end = perf_counter_ns()
57+
times.append(end - start)
58+
59+
return torch.tensor(times).float(), result
60+
61+
62+
def report_stats(times, unit="s"):
63+
"""Report median and standard deviation of benchmark times."""
64+
mul = {
65+
"ns": 1,
66+
"µs": 1e-3,
67+
"ms": 1e-6,
68+
"s": 1e-9,
69+
}[unit]
70+
times = times * mul
71+
std = times.std().item()
72+
med = times.median().item()
73+
print(f"median = {med:.2f}{unit} ± {std:.2f}")
74+
return med
75+
76+
77+
def split_indices(indices: List[int], num_chunks: int) -> List[List[int]]:
78+
"""Split a list of indices into approximately equal chunks."""
79+
chunk_size = len(indices) // num_chunks
80+
chunks = []
81+
82+
for i in range(num_chunks - 1):
83+
chunks.append(indices[i * chunk_size:(i + 1) * chunk_size])
84+
85+
# Last chunk may be slightly larger
86+
chunks.append(indices[(num_chunks - 1) * chunk_size:])
87+
return chunks
88+
89+
90+
def generate_long_video(temp_dir: str):
91+
# Video source: https://www.pexels.com/video/dog-eating-854132/
92+
# License: CC0. Author: Coverr.
93+
url = "https://videos.pexels.com/video-files/854132/854132-sd_640_360_25fps.mp4"
94+
response = requests.get(url, headers={"User-Agent": ""})
95+
if response.status_code != 200:
96+
raise RuntimeError(f"Failed to download video. {response.status_code = }.")
97+
98+
short_video_path = Path(temp_dir) / "short_video.mp4"
99+
with open(short_video_path, 'wb') as f:
100+
for chunk in response.iter_content():
101+
f.write(chunk)
102+
103+
# Create a longer video by repeating the short one 50 times
104+
long_video_path = Path(temp_dir) / "long_video.mp4"
105+
ffmpeg_command = [
106+
"ffmpeg", "-y",
107+
"-stream_loop", "49", # repeat video 50 times
108+
"-i", str(short_video_path),
109+
"-c", "copy",
110+
str(long_video_path)
111+
]
112+
subprocess.run(ffmpeg_command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
113+
114+
return short_video_path, long_video_path
115+
116+
117+
temp_dir = tempfile.mkdtemp()
118+
short_video_path, long_video_path = generate_long_video(temp_dir)
119+
120+
decoder = VideoDecoder(long_video_path, seek_mode="approximate")
121+
metadata = decoder.metadata
122+
123+
short_duration = timedelta(seconds=VideoDecoder(short_video_path).metadata.duration_seconds)
124+
long_duration = timedelta(seconds=metadata.duration_seconds)
125+
print(f"Original video duration: {int(short_duration.total_seconds() // 60)}m{int(short_duration.total_seconds() % 60):02d}s")
126+
print(f"Long video duration: {int(long_duration.total_seconds() // 60)}m{int(long_duration.total_seconds() % 60):02d}s")
127+
print(f"Video resolution: {metadata.width}x{metadata.height}")
128+
print(f"Average FPS: {metadata.average_fps:.1f}")
129+
print(f"Total frames: {metadata.num_frames}")
130+
131+
132+
# %%
133+
# .. _start_parallel_decoding:
134+
#
135+
# Frame sampling strategy
136+
# -----------------------
137+
#
138+
# For this tutorial, we'll sample a frame every 2 seconds from our long video.
139+
# This simulates a common scenario where you need to process a subset of frames
140+
# for LLM inference.
141+
142+
TARGET_FPS = 2
143+
step = max(1, round(metadata.average_fps / TARGET_FPS))
144+
all_indices = list(range(0, metadata.num_frames, step))
145+
146+
print(f"Sampling 1 frame every {TARGET_FPS} seconds")
147+
print(f"We'll skip every {step} frames")
148+
print(f"Total frames to decode: {len(all_indices)}")
149+
150+
151+
# %%
152+
# Method 1: Sequential decoding (baseline)
153+
# ----------------------------------------
154+
#
155+
# Let's start with a sequential approach as our baseline. This processes
156+
# frames one by one without any parallelization.
157+
158+
def decode_sequentially(indices: List[int], video_path=long_video_path):
159+
"""Decode frames sequentially using a single decoder instance."""
160+
decoder = VideoDecoder(video_path, seek_mode="approximate")
161+
return decoder.get_frames_at(indices)
162+
163+
164+
times, result_sequential = bench(decode_sequentially, all_indices)
165+
sequential_time = report_stats(times, unit="s")
166+
167+
168+
# %%
169+
# Method 2: FFmpeg-based parallelism
170+
# ----------------------------------
171+
#
172+
# FFmpeg has built-in multithreading capabilities that can be controlled
173+
# via the ``num_ffmpeg_threads`` parameter. This approach uses multiple
174+
# threads within FFmpeg itself to accelerate decoding operations.
175+
176+
def decode_with_ffmpeg_parallelism(
177+
indices: List[int],
178+
num_threads: int,
179+
video_path=long_video_path
180+
):
181+
"""Decode frames using FFmpeg's internal threading."""
182+
decoder = VideoDecoder(video_path, num_ffmpeg_threads=num_threads, seek_mode="approximate")
183+
return decoder.get_frames_at(indices)
184+
185+
186+
NUM_CPUS = cpu_count()
187+
188+
times, result_ffmpeg = bench(decode_with_ffmpeg_parallelism, all_indices, num_threads=NUM_CPUS)
189+
ffmpeg_time = report_stats(times, unit="s")
190+
speedup = sequential_time / ffmpeg_time
191+
print(f"Speedup compared to sequential: {speedup:.2f}x with {NUM_CPUS} FFmpeg threads.")
192+
193+
194+
# %%
195+
# Method 3: multiprocessing
196+
# -------------------------
197+
#
198+
# Process-based parallelism distributes work across multiple Python processes.
199+
200+
def decode_with_multiprocessing(
201+
indices: List[int],
202+
num_processes: int,
203+
video_path=long_video_path
204+
):
205+
"""Decode frames using multiple processes with joblib."""
206+
chunks = split_indices(indices, num_chunks=num_processes)
207+
208+
# loky is a multi-processing backend for joblib: https://github.com/joblib/loky
209+
results = Parallel(n_jobs=num_processes, backend="loky", verbose=0)(
210+
delayed(decode_sequentially)(chunk, video_path) for chunk in chunks
211+
)
212+
213+
return torch.cat([frame_batch.data for frame_batch in results], dim=0)
214+
215+
216+
times, result_multiprocessing = bench(decode_with_multiprocessing, all_indices, num_processes=NUM_CPUS)
217+
multiprocessing_time = report_stats(times, unit="s")
218+
speedup = sequential_time / multiprocessing_time
219+
print(f"Speedup compared to sequential: {speedup:.2f}x with {NUM_CPUS} processes.")
220+
221+
222+
# %%
223+
# Method 4: Joblib multithreading
224+
# -------------------------------
225+
#
226+
# Thread-based parallelism uses multiple threads within a single process.
227+
# TorchCodec releases the GIL, so this can be very effective.
228+
229+
def decode_with_multithreading(
230+
indices: List[int],
231+
num_threads: int,
232+
video_path=long_video_path
233+
):
234+
"""Decode frames using multiple threads with joblib."""
235+
chunks = split_indices(indices, num_chunks=num_threads)
236+
237+
results = Parallel(n_jobs=num_threads, prefer="threads", verbose=0)(
238+
delayed(decode_sequentially)(chunk, video_path) for chunk in chunks
239+
)
240+
241+
# Concatenate results from all threads
242+
return torch.cat([frame_batch.data for frame_batch in results], dim=0)
243+
244+
245+
times, result_multithreading = bench(decode_with_multithreading, all_indices, num_threads=NUM_CPUS)
246+
multithreading_time = report_stats(times, unit="s")
247+
speedup = sequential_time / multithreading_time
248+
print(f"Speedup compared to sequential: {speedup:.2f}x with {NUM_CPUS} threads.")
249+
250+
251+
# %%
252+
# Validation and correctness check
253+
# --------------------------------
254+
#
255+
# Let's verify that all methods produce identical results.
256+
257+
torch.testing.assert_close(result_sequential.data, result_ffmpeg.data, atol=0, rtol=0)
258+
torch.testing.assert_close(result_sequential.data, result_multiprocessing, atol=0, rtol=0)
259+
torch.testing.assert_close(result_sequential.data, result_multithreading, atol=0, rtol=0)
260+
print("All good!")
261+
262+
# %%
263+
import shutil
264+
shutil.rmtree(temp_dir)

0 commit comments

Comments
 (0)