Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update random_perspective transform_matrix logic #20885

Closed
wants to merge 12 commits into from
289 changes: 233 additions & 56 deletions keras/src/layers/preprocessing/image_preprocessing/random_perspective.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RandomPerspective(BaseImagePreprocessingLayer):
def __init__(
self,
factor=1.0,
scale=0.3,
scale=0.5,
interpolation="bilinear",
fill_value=0.0,
seed=None,
Expand Down Expand Up @@ -122,9 +122,9 @@ def get_random_transformation(self, data, training=True, seed=None):
apply_perspective = random_threshold < transformation_probability

perspective_factor = self.backend.random.uniform(
minval=-self.scale,
maxval=self.scale,
shape=[batch_size, 4],
shape=(batch_size, 4, 2),
minval=-0.5 * self.scale,
maxval=0.5 * self.scale,
seed=seed,
dtype=self.compute_dtype,
)
Expand All @@ -138,18 +138,160 @@ def get_random_transformation(self, data, training=True, seed=None):
def transform_images(self, images, transformation, training=True):
images = self.backend.cast(images, self.compute_dtype)
if training and transformation is not None:
apply_perspective = transformation["apply_perspective"]
perspective_images = self._perspective_inputs(
images, transformation
)

images = self.backend.numpy.where(
apply_perspective[:, None, None, None],
perspective_images,
images,
)
images = self._perspective_inputs(images, transformation)
images = self.backend.cast(images, self.compute_dtype)
return images

def get_matrix_by_points(self, source_points, target_points):
batch_size = self.backend.shape(source_points)[0]

target_points = self.backend.numpy.tile(
target_points, [batch_size, 1, 1]
)

src_x1, src_y1 = source_points[:, 0, 0], source_points[:, 0, 1]
src_x2, src_y2 = source_points[:, 1, 0], source_points[:, 1, 1]
src_x3, src_y3 = source_points[:, 2, 0], source_points[:, 2, 1]
src_x4, src_y4 = source_points[:, 3, 0], source_points[:, 3, 1]

tgt_x1, tgt_y1 = target_points[:, 0, 0], target_points[:, 0, 1]
tgt_x2, tgt_y2 = target_points[:, 1, 0], target_points[:, 1, 1]
tgt_x3, tgt_y3 = target_points[:, 2, 0], target_points[:, 2, 1]
tgt_x4, tgt_y4 = target_points[:, 3, 0], target_points[:, 3, 1]

coefficient_matrix = self.backend.numpy.stack(
[
self.backend.numpy.stack(
[
src_x1,
src_y1,
self.backend.numpy.ones_like(src_x1),
self.backend.numpy.zeros_like(src_x1),
self.backend.numpy.zeros_like(src_x1),
self.backend.numpy.zeros_like(src_x1),
-tgt_x1 * src_x1,
-tgt_x1 * src_y1,
],
axis=-1,
),
self.backend.numpy.stack(
[
self.backend.numpy.zeros_like(src_x1),
self.backend.numpy.zeros_like(src_x1),
self.backend.numpy.zeros_like(src_x1),
src_x1,
src_y1,
self.backend.numpy.ones_like(src_x1),
-tgt_y1 * src_x1,
-tgt_y1 * src_y1,
],
axis=-1,
),
self.backend.numpy.stack(
[
src_x2,
src_y2,
self.backend.numpy.ones_like(src_x2),
self.backend.numpy.zeros_like(src_x2),
self.backend.numpy.zeros_like(src_x2),
self.backend.numpy.zeros_like(src_x2),
-tgt_x2 * src_x2,
-tgt_x2 * src_y2,
],
axis=-1,
),
self.backend.numpy.stack(
[
self.backend.numpy.zeros_like(src_x2),
self.backend.numpy.zeros_like(src_x2),
self.backend.numpy.zeros_like(src_x2),
src_x2,
src_y2,
self.backend.numpy.ones_like(src_x2),
-tgt_y2 * src_x2,
-tgt_y2 * src_y2,
],
axis=-1,
),
self.backend.numpy.stack(
[
src_x3,
src_y3,
self.backend.numpy.ones_like(src_x3),
self.backend.numpy.zeros_like(src_x3),
self.backend.numpy.zeros_like(src_x3),
self.backend.numpy.zeros_like(src_x3),
-tgt_x3 * src_x3,
-tgt_x3 * src_y3,
],
axis=-1,
),
self.backend.numpy.stack(
[
self.backend.numpy.zeros_like(src_x3),
self.backend.numpy.zeros_like(src_x3),
self.backend.numpy.zeros_like(src_x3),
src_x3,
src_y3,
self.backend.numpy.ones_like(src_x3),
-tgt_y3 * src_x3,
-tgt_y3 * src_y3,
],
axis=-1,
),
self.backend.numpy.stack(
[
src_x4,
src_y4,
self.backend.numpy.ones_like(src_x4),
self.backend.numpy.zeros_like(src_x4),
self.backend.numpy.zeros_like(src_x4),
self.backend.numpy.zeros_like(src_x4),
-tgt_x4 * src_x4,
-tgt_x4 * src_y4,
],
axis=-1,
),
self.backend.numpy.stack(
[
self.backend.numpy.zeros_like(src_x4),
self.backend.numpy.zeros_like(src_x4),
self.backend.numpy.zeros_like(src_x4),
src_x4,
src_y4,
self.backend.numpy.ones_like(src_x4),
-tgt_y4 * src_x4,
-tgt_y4 * src_y4,
],
axis=-1,
),
],
axis=1,
)

target_vector = self.backend.numpy.stack(
[tgt_x1, tgt_y1, tgt_x2, tgt_y2, tgt_x3, tgt_y3, tgt_x4, tgt_y4],
axis=-1,
)
target_vector = self.backend.numpy.expand_dims(target_vector, axis=-1)

coefficient_matrix = self.backend.cast(
coefficient_matrix, dtype="float32"
)
target_vector = self.backend.cast(target_vector, dtype="float32")

homography_matrix = self.backend.linalg.solve(
coefficient_matrix, target_vector
)
homography_matrix = self.backend.numpy.reshape(
homography_matrix, [-1, 8]
)
homography_matrix = self.backend.cast(
homography_matrix, dtype=self.compute_dtype
)

return homography_matrix

def _perspective_inputs(self, inputs, transformation):
if transformation is None:
return inputs
Expand All @@ -159,53 +301,83 @@ def _perspective_inputs(self, inputs, transformation):
if unbatched:
inputs = self.backend.numpy.expand_dims(inputs, axis=0)

perspective_factor = self.backend.core.convert_to_tensor(
transformation["perspective_factor"], dtype=self.compute_dtype
)
outputs = self.backend.image.affine_transform(
inputs,
transform=self._get_perspective_matrix(perspective_factor),
transform=self._get_perspective_matrix(transformation),
interpolation=self.interpolation,
fill_mode="constant",
fill_value=self.fill_value,
data_format=self.data_format,
)

apply_perspective = transformation["apply_perspective"]
outputs = self.backend.numpy.where(
apply_perspective[:, None, None, None],
outputs,
inputs,
)

if unbatched:
outputs = self.backend.numpy.squeeze(outputs, axis=0)
return outputs

def _get_perspective_matrix(self, perspectives):
perspectives = self.backend.core.convert_to_tensor(
perspectives, dtype=self.compute_dtype
def _get_perspective_matrix(self, transformation):
perspective_factor = self.backend.core.convert_to_tensor(
transformation["perspective_factor"], dtype=self.compute_dtype
)
input_shape = self.backend.core.convert_to_tensor(
transformation["input_shape"], dtype=self.compute_dtype
)
num_perspectives = self.backend.shape(perspectives)[0]
return self.backend.numpy.concatenate(

height, width = (
input_shape[self.height_axis],
input_shape[self.width_axis],
)
start_points = self.backend.convert_to_tensor(
[
self.backend.numpy.ones(
(num_perspectives, 1), dtype=self.compute_dtype
)
+ perspectives[:, :1],
perspectives[:, :1],
perspectives[:, 2:3],
perspectives[:, 1:2],
self.backend.numpy.ones(
(num_perspectives, 1), dtype=self.compute_dtype
)
+ perspectives[:, 1:2],
perspectives[:, 3:4],
self.backend.numpy.zeros((num_perspectives, 2)),
[
[0.0, 0.0],
[width - 1, 0.0],
[0.0, height - 1],
[width - 1, height - 1],
]
],
axis=1,
dtype=self.compute_dtype,
)

def _get_transformed_coordinates(self, x, y, transform):
a0, a1, a2, b0, b1, b2, c0, c1 = self.backend.numpy.split(
transform, 8, axis=-1
end_points = start_points + start_points * perspective_factor
return self.get_matrix_by_points(end_points, start_points)

def _get_transformed_coordinates(
self, x_coords, y_coords, transformation_matrix
):
backend = self.backend

batch_size = backend.shape(transformation_matrix)[0]

homogeneous_transform = backend.numpy.concatenate(
[transformation_matrix, backend.numpy.ones((batch_size, 1, 1))],
axis=-1,
)
homogeneous_transform = backend.numpy.reshape(
homogeneous_transform, (batch_size, 3, 3)
)

inverse_transform = backend.linalg.inv(homogeneous_transform)

ones_column = backend.numpy.ones_like(x_coords)
homogeneous_coords = backend.numpy.concatenate(
[x_coords, y_coords, ones_column], axis=-1
)

homogeneous_coords = backend.numpy.moveaxis(homogeneous_coords, -1, -2)
transformed_coords = backend.numpy.matmul(
inverse_transform, homogeneous_coords
)
transformed_coords = backend.numpy.moveaxis(transformed_coords, -1, -2)

x_transformed = (a1 * (y - b2) - b1 * (x - a2)) / (a1 * b0 - a0 * b1)
y_transformed = (b0 * (x - a2) - a0 * (y - b2)) / (a1 * b0 - a0 * b1)
x_transformed = transformed_coords[..., 0] / transformed_coords[..., 2]
y_transformed = transformed_coords[..., 1] / transformed_coords[..., 2]

return x_transformed, y_transformed

Expand All @@ -215,7 +387,7 @@ def transform_bounding_boxes(
transformation,
training=True,
):
if training:
if training and transformation is not None:
if backend_utils.in_tf_graph():
self.backend.set_backend("tensorflow")

Expand All @@ -233,26 +405,29 @@ def transform_bounding_boxes(
)

boxes = bounding_boxes["boxes"]

x0, y0, x1, y1 = self.backend.numpy.split(boxes, 4, axis=-1)

perspective_factor = transformation["perspective_factor"]
transform = self._get_perspective_matrix(perspective_factor)
transform = self._get_perspective_matrix(transformation)
transform = self.backend.numpy.expand_dims(transform, axis=1)
transform = self.backend.cast(transform, dtype=self.compute_dtype)

x_1, y_1 = self._get_transformed_coordinates(x0, y0, transform)
x_2, y_2 = self._get_transformed_coordinates(x1, y1, transform)
x_3, y_3 = self._get_transformed_coordinates(x0, y1, transform)
x_4, y_4 = self._get_transformed_coordinates(x1, y0, transform)
corners = [
self._get_transformed_coordinates(x, y, transform)
for x, y in [(x0, y0), (x1, y1), (x0, y1), (x1, y0)]
]
x_corners, y_corners = zip(*corners)

xs = self.backend.numpy.concatenate([x_1, x_2, x_3, x_4], axis=-1)
ys = self.backend.numpy.concatenate([y_1, y_2, y_3, y_4], axis=-1)
xs = self.backend.numpy.stack(x_corners, axis=-1)
ys = self.backend.numpy.stack(y_corners, axis=-1)

min_x = self.backend.numpy.min(xs, axis=-1)
max_x = self.backend.numpy.max(xs, axis=-1)
min_y = self.backend.numpy.min(ys, axis=-1)
max_y = self.backend.numpy.max(ys, axis=-1)
min_x, max_x = (
self.backend.numpy.min(xs, axis=-1),
self.backend.numpy.max(xs, axis=-1),
)
min_y, max_y = (
self.backend.numpy.min(ys, axis=-1),
self.backend.numpy.max(ys, axis=-1),
)

min_x = self.backend.numpy.expand_dims(min_x, axis=-1)
max_x = self.backend.numpy.expand_dims(max_x, axis=-1)
Expand Down Expand Up @@ -280,6 +455,8 @@ def transform_bounding_boxes(
bounding_box_format="xyxy",
)

self.backend.reset()

return bounding_boxes

def transform_labels(self, labels, transformation, training=True):
Expand Down
Loading