-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
246 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import unittest | ||
import io | ||
import imghdr | ||
|
||
from PIL import Image as PILImage | ||
|
||
from willow.image import ( | ||
GIFImageFile, BadImageOperationError, WebMVP9ImageFile, OggTheoraImageFile, MP4H264ImageFile | ||
) | ||
from willow.plugins.ffmpeg import FFMpegLazyVideo, probe | ||
|
||
|
||
class TestFFMpegOperations(unittest.TestCase): | ||
def setUp(self): | ||
self.f = open('tests/images/newtons_cradle.gif', 'rb') | ||
self.image = FFMpegLazyVideo.open(GIFImageFile(self.f)) | ||
|
||
def tearDown(self): | ||
self.f.close() | ||
|
||
def test_get_size(self): | ||
width, height = self.image.get_size() | ||
self.assertEqual(width, 480) | ||
self.assertEqual(height, 360) | ||
|
||
def test_get_frame_count(self): | ||
frames = self.image.get_frame_count() | ||
self.assertEqual(frames, 34) | ||
|
||
def test_resize(self): | ||
resized_image = self.image.resize((100, 75)) | ||
self.assertEqual(resized_image.get_size(), (100, 75)) | ||
|
||
def test_crop(self): | ||
cropped_image = self.image.crop((10, 10, 100, 100)) | ||
|
||
# Cropping not supported, but image will be resized | ||
self.assertEqual(cropped_image.get_size(), (90, 90)) | ||
|
||
def test_rotate(self): | ||
rotated_image = self.image.rotate(90) | ||
width, height = rotated_image.get_size() | ||
|
||
# Not supported, image will not be rotated | ||
self.assertEqual((width, height), (480, 360)) | ||
|
||
def test_set_background_color_rgb(self): | ||
# Not supported, would do nothing | ||
red_background_image = self.image.set_background_color_rgb((255, 0, 0)) | ||
self.assertFalse(red_background_image.has_alpha()) | ||
|
||
def test_save_as_webm_vp9(self): | ||
output = io.BytesIO() | ||
return_value = self.image.save_as_webm_vp9(output) | ||
output.seek(0) | ||
|
||
probe_data = probe(output) | ||
|
||
self.assertEqual(probe_data['format']['format_name'], 'matroska,webm') | ||
self.assertEqual(probe_data['streams'][0]['codec_name'], 'vp9') | ||
self.assertIsInstance(return_value, WebMVP9ImageFile) | ||
self.assertEqual(return_value.f, output) | ||
|
||
def test_save_as_ogg_theora(self): | ||
output = io.BytesIO() | ||
return_value = self.image.save_as_ogg_theora(output) | ||
output.seek(0) | ||
|
||
probe_data = probe(output) | ||
|
||
self.assertEqual(probe_data['format']['format_name'], 'ogg') | ||
self.assertEqual(probe_data['streams'][0]['codec_name'], 'theora') | ||
self.assertIsInstance(return_value, OggTheoraImageFile) | ||
self.assertEqual(return_value.f, output) | ||
|
||
def test_save_as_mp4_h264(self): | ||
output = io.BytesIO() | ||
return_value = self. image.save_as_mp4_h264(output) | ||
output.seek(0) | ||
|
||
probe_data = probe(output) | ||
|
||
self.assertEqual(probe_data['format']['format_name'], 'mov,mp4,m4a,3gp,3g2,mj2') | ||
self.assertEqual(probe_data['streams'][0]['codec_name'], 'h264') | ||
self.assertIsInstance(return_value, MP4H264ImageFile) | ||
self.assertEqual(return_value.f, output) | ||
|
||
def test_has_alpha(self): | ||
has_alpha = self.image.has_alpha() | ||
self.assertFalse(has_alpha) | ||
|
||
def test_has_animation(self): | ||
has_animation = self.image.has_animation() | ||
self.assertTrue(has_animation) | ||
|
||
def test_transparent_gif(self): | ||
with open('tests/images/transparent.gif', 'rb') as f: | ||
image = FFMpegLazyVideo.open(GIFImageFile(f)) | ||
|
||
# Transparency not supported | ||
self.assertFalse(image.has_alpha()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import subprocess | ||
import os.path | ||
from itertools import product | ||
import json | ||
from tempfile import NamedTemporaryFile, TemporaryDirectory | ||
|
||
from willow.image import Image | ||
|
||
from willow.image import ( | ||
GIFImageFile, | ||
WebPImageFile, | ||
WebMVP9ImageFile, | ||
OggTheoraImageFile, | ||
MP4H264ImageFile, | ||
) | ||
|
||
|
||
def probe(file): | ||
with NamedTemporaryFile() as src: | ||
src.write(file.read()) | ||
result = subprocess.run(["ffprobe", "-show_format", "-show_streams", "-loglevel", "quiet", "-print_format", "json", src.name], capture_output=True) | ||
return json.loads(result.stdout) | ||
|
||
|
||
def transcode(source_file, output_file, output_resolution, format, codec): | ||
with NamedTemporaryFile() as src, TemporaryDirectory() as outdir: | ||
src.write(source_file.read()) | ||
|
||
args = ["ffmpeg", "-i", src.name, "-f", format, "-codec:v", codec] | ||
|
||
if output_resolution: | ||
args += ["-s", f"{output_resolution[0]}x{output_resolution[1]}"] | ||
|
||
args.append(os.path.join(outdir, 'out')) | ||
|
||
subprocess.run(args) | ||
|
||
with open(os.path.join(outdir, 'out'), 'rb') as out: | ||
output_file.write(out.read()) | ||
|
||
|
||
class FFMpegLazyVideo(Image): | ||
def __init__(self, source_file, output_resolution=None): | ||
self.source_file = source_file | ||
self.output_resolution = output_resolution | ||
|
||
@Image.operation | ||
def get_size(self): | ||
if self.output_resolution: | ||
return self.output_resolution | ||
|
||
# Find the size from the source file | ||
data = probe(self.source_file.f) | ||
for stream in data['streams']: | ||
if stream['codec_type'] == 'video': | ||
return stream['width'], stream['height'] | ||
|
||
@Image.operation | ||
def get_frame_count(self): | ||
# Find the frame count from the source file | ||
data = probe(self.source_file.f) | ||
for stream in data['streams']: | ||
if stream['codec_type'] == 'video': | ||
return int(stream['nb_frames']) | ||
|
||
@Image.operation | ||
def has_alpha(self): | ||
# Alpha not supported | ||
return False | ||
|
||
@Image.operation | ||
def has_animation(self): | ||
return True | ||
|
||
@Image.operation | ||
def resize(self, size): | ||
return FFMpegLazyVideo(self.source_file, size) | ||
|
||
@Image.operation | ||
def crop(self, rect): | ||
# Not supported, but resize the image to match the crop rect size | ||
left, top, right, bottom = rect | ||
width = right - left | ||
height = bottom - top | ||
return FFMpegLazyVideo(self.source_file, (width, height)) | ||
|
||
@Image.operation | ||
def rotate(self, angle): | ||
# Not supported | ||
return self | ||
|
||
@Image.operation | ||
def set_background_color_rgb(self, color): | ||
# Alpha not supported | ||
return self | ||
|
||
@classmethod | ||
@Image.converter_from(GIFImageFile) | ||
@Image.converter_from(WebPImageFile) | ||
@Image.converter_from(WebMVP9ImageFile) | ||
@Image.converter_from(OggTheoraImageFile) | ||
@Image.converter_from(MP4H264ImageFile) | ||
def open(cls, file): | ||
return cls(file) | ||
|
||
@Image.operation | ||
def save_as_webm_vp9(self, f): | ||
transcode(self.source_file.f, f, self.output_resolution, 'webm', 'libvpx-vp9') | ||
return WebMVP9ImageFile(f) | ||
|
||
@Image.operation | ||
def save_as_ogg_theora(self, f): | ||
transcode(self.source_file.f, f, self.output_resolution, 'ogg', 'libtheora') | ||
return OggTheoraImageFile(f) | ||
|
||
@Image.operation | ||
def save_as_mp4_h264(self, f): | ||
transcode(self.source_file.f, f, self.output_resolution, 'mp4', 'libx264') | ||
return MP4H264ImageFile(f) | ||
|
||
|
||
willow_image_classes = [FFMpegLazyVideo] |