-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathelvi.py
executable file
·144 lines (128 loc) · 4.27 KB
/
elvi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
#
# ELVI: Extremely Lightweight Video Interleaved
#
# ELVI is a very naive container format for audio and video, designed
# to be as close as possible to a raw stream of pixels and audio
# samples. It's basically AVI, but with all the portability features
# left out and the interleave period set to one frame. The file format
# is as follows:
#
# header {
# <magic>
# <video width>
# <video height>
# <video bits per pixel>
# <audio sample rate>
# <audio channel count>
# <audio bits per sample>
# <max audio size in bytes (rounded up to multiple of 4)>
# }
# repeat {
# <video data>
# <audio size>
# <audio data>
# <padding to max audio size>
# }
#
# Each body chunk contains the pixel data for one video frame,
# followed by the size in bytes of the audio samples corresponding
# to that video frame, and then finally the audio sample data padded
# to the max audio size. The number of audio samples may vary from
# frame to frame, because there can be a non-integer number of audio
# samples for each video frame. All fields other than the pixel/sample
# data are 32-bit ints, little endian.
import argparse
import fractions
import math
import struct
import subprocess
import sys
ELVI_MAGIC = 0x49564c45
VIDEO_FORMATS = {
16: "rgb565",
24: "bgr24",
32: "rgb32",
}
AUDIO_FORMATS = {
8: "u8",
16: "s16le",
}
parser = argparse.ArgumentParser()
parser.add_argument("--video-width", type=int, required=True)
parser.add_argument("--video-height", type=int, required=True)
parser.add_argument("--video-bits-per-pixel", type=int, required=True)
parser.add_argument("--audio-sample-rate", type=int, required=True)
parser.add_argument("--audio-channel-count", type=int, required=True)
parser.add_argument("--audio-bits-per-sample", type=int, required=True)
parser.add_argument("file")
args = parser.parse_args()
input_path = args.file
output_file = sys.stdout.buffer
video_bytes_per_pixel = (args.video_bits_per_pixel + 1) // 8
video_frame_size = args.video_width * args.video_height * video_bytes_per_pixel
audio_bytes_per_sample = args.audio_channel_count * (args.audio_bits_per_sample // 8)
frame_rate = fractions.Fraction(subprocess.run([
"ffprobe",
"-v", "fatal",
"-i", input_path,
"-select_streams", "v:0",
"-print_format", "default=noprint_wrappers=1:nokey=1",
"-show_entries", "stream=r_frame_rate",
], stdout=subprocess.PIPE, check=True, text=True).stdout)
audio_samples_per_frame = args.audio_sample_rate / frame_rate
max_audio_size = (math.ceil(audio_samples_per_frame) * audio_bytes_per_sample + 3) & -4
video_stream = subprocess.Popen([
"ffmpeg",
"-v", "fatal",
"-nostdin",
"-i", input_path,
"-map", "0:v:0",
"-vf", f"scale={args.video_width}x{args.video_height}",
"-pix_fmt", f"+{VIDEO_FORMATS[args.video_bits_per_pixel]}",
"-c", "rawvideo",
"-f", "rawvideo",
"-",
], stdout=subprocess.PIPE)
audio_stream = subprocess.Popen([
"ffmpeg",
"-v", "fatal",
"-nostdin",
"-i", input_path,
"-map", "0:a:0",
"-ar", f"{args.audio_sample_rate}",
"-ac", f"{args.audio_channel_count}",
"-c", f"pcm_{AUDIO_FORMATS[args.audio_bits_per_sample]}",
"-f", f"{AUDIO_FORMATS[args.audio_bits_per_sample]}",
"-",
], stdout=subprocess.PIPE)
output_file.write(
struct.pack(
"<IIIIIIII",
ELVI_MAGIC,
args.video_width,
args.video_height,
args.video_bits_per_pixel,
args.audio_sample_rate,
args.audio_channel_count,
args.audio_bits_per_sample,
max_audio_size,
)
)
video_buf = bytearray(video_frame_size)
audio_samples_read = fractions.Fraction(0)
while video_stream.stdout.readinto(video_buf):
output_file.write(video_buf)
old_audio_samples_read = audio_samples_read
audio_samples_read += audio_samples_per_frame
audio_samples_to_read = int(audio_samples_read) - int(old_audio_samples_read)
audio_bytes_to_read = audio_samples_to_read * audio_bytes_per_sample
audio_buf = audio_stream.stdout.read(audio_bytes_to_read)
audio_size = len(audio_buf)
audio_padding_size = max_audio_size - audio_size
output_file.write(struct.pack("<I", audio_size))
output_file.write(audio_buf)
output_file.write(b"\x00" * audio_padding_size)
video_stream.wait()
audio_stream.terminate()
audio_stream.wait()