Skip to content

Commit

Permalink
[ADDED] Encode and decode separated for compression
Browse files Browse the repository at this point in the history
  • Loading branch information
danperazzo committed Jun 15, 2023
1 parent 4038a39 commit 221cc1e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 66 deletions.
152 changes: 103 additions & 49 deletions compression_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,78 +22,100 @@ def __init__(self, order, block_size):
self.block_size = block_size

@abstractmethod
def compression_block(self,image):
def encode_block(self,image):
pass

@abstractmethod
def decode_block(self,image):
pass

# Code based on https://github.com/SonuDileep/KL-transform-for-Image-Data-Compression/blob/master/KL_Transform%20for%20data:image%20compression.ipynb
def im2block(self, image): # to divide the image into blocks
image_block = []

def im2encoded_blocks(self, image): # to divide the image into blocks
image_block_list = []
for j in range(0, image.shape[1], self.block_size):
for i in range(0, image.shape[0], self.block_size):
image_block.append(image[i:i+self.block_size, j:j+self.block_size])
image_block = np.asarray(image_block).astype(float)
image_block = image[i:i+self.block_size, j:j+self.block_size]
encoded_blocks = self.encode_block(image_block)
image_block_list.append(encoded_blocks)

image_block = image_block_list

return image_block

def block2im(self, mtx, image_size): # to combine the blocks back into image
sx = image_size[1]
sy = image_size[0]
def encoded_block2im(self, block_list, image_size): # to combine the blocks back into image
width = image_size[1]
height = image_size[0]
result = np.zeros(image_size)
col = 0
for j in range(0,sx,self.block_size):
for i in range(0,sy,self.block_size):
result[i:i+self.block_size, j:j+self.block_size] = mtx[col]
col += 1
index_block = 0
for j in range(0,width,self.block_size):
for i in range(0,height,self.block_size):
result[i:i+self.block_size, j:j+self.block_size] = self.decode_block(block_list[index_block])
index_block += 1
return result

def block_compressor(self,image_block_list):
image_blocks_compressed = []
for image_block in image_block_list:

image_compressed = self.compression_block(image_block)
image_blocks_compressed.append(image_compressed)

image_compressed = np.stack(image_blocks_compressed)

return image_compressed

def compreses_channel(self,image):
def encode_channel(self,image):
image = image.astype(np.double)
image_blocks = self.im2block(image)
image_compressed = self.block_compressor(image_blocks)

block_codes = self.im2encoded_blocks(image)

image_comp = self.block2im(image_compressed, (image.shape[0],image.shape[1]))
encoded_image = {'block_codes':block_codes, 'orig_dims':image.shape}
return encoded_image

def decode_channel(self,encoded_image):

block_codes = encoded_image['block_codes']
orig_dims = encoded_image['orig_dims']

image_comp = self.encoded_block2im(block_codes, orig_dims)

return image_comp


def compress_rgb(self, img_rgb):
def encode_rgb(self, img_rgb):

img_c_stack = []
img_encoded_rgb_list = []
for i in range(3):
img_c = img_rgb[:,:,i]
img_c_compressed = self.compreses_channel(img_c)
img_c_stack.append(img_c_compressed)
img_c_coded = self.encode_channel(img_c)
img_encoded_rgb_list.append(img_c_coded)

return img_encoded_rgb_list

def decode_rgb(self, img_encoded_rgb_list):

img_decoded_rgb_list = []
for i in range(3):
img_c_decoded = self.decode_channel(img_encoded_rgb_list[i])
img_decoded_rgb_list.append(img_c_decoded)

img_rgb = np.dstack(img_c_stack)
img_rgb = np.dstack(img_decoded_rgb_list)
img_rgb = np.around(img_rgb).astype(int)

return img_rgb


class DFTCompression(BaseBlockCompression):

def compression_block(self, image):
def encode_block(self, image):
image_dft = np.fft.fft2(image)

image_filter = np.zeros(image.shape)

num_freq = int(self.block_size*((self.order)**0.5))
image_filter[:num_freq,:num_freq] = image_dft[:num_freq,:num_freq]
image_compressed = image_dft[:num_freq,:num_freq]

code_dict = {'image_compressed':image_compressed, 'orig_dim':image.shape}

return code_dict

def decode_block(self, code_dict):
image_compressed = code_dict['image_compressed']
orig_dim = code_dict['orig_dim']

num_freq = image_compressed.shape[0]

image_reconstruction_freq = np.zeros(orig_dim)
image_reconstruction_freq[:num_freq,:num_freq] = image_compressed

image_compressed = np.fft.ifft2(image_filter).real
image_compressed = np.fft.ifft2(image_reconstruction_freq).real

return image_compressed

Expand All @@ -114,21 +136,33 @@ def idct2(self,f):
return np.transpose(fftpack.idct(
np.transpose(fftpack.idct(f, norm = "ortho")), norm = "ortho"))

def compression_block(self, image):
def encode_block(self, image):
image_dct = self.dct2(image)

image_filter = np.zeros(image_dct.shape)
num_freq = int(self.block_size*((self.order)**0.5))
image_filter[:num_freq,:num_freq] = image_dct[:num_freq,:num_freq]
image_compressed = image_dct[:num_freq,:num_freq]

code_dict = {'image_compressed':image_compressed, 'orig_dim':image.shape}

return code_dict

def decode_block(self, code_dict):
image_compressed = code_dict['image_compressed']
orig_dim = code_dict['orig_dim']

num_freq = image_compressed.shape[0]

image_reconstruction_freq = np.zeros(orig_dim)
image_reconstruction_freq[:num_freq,:num_freq] = image_compressed

image_compressed = self.idct2(image_filter)
image_compressed = self.idct2(image_reconstruction_freq)

return image_compressed


class PCACompression(BaseBlockCompression):

def compression_block(self, image):
def encode_block(self, image):
size = image.shape[1]

mean = np.mean(image,axis=0)
Expand All @@ -142,23 +176,43 @@ def compression_block(self, image):
# Quantization of the transformed image
D = int(self.order*self.block_size)
PCA_compressed = PCA[:,size-D:size]
eig_compressed = eig[:,size-D:size]
eig_compressed = eig[:,size-D:size]

code_dict = {'PCA_compressed':PCA_compressed, 'eig_compressed':eig_compressed, 'mean':mean }

return code_dict

def decode_block(self, code_dict):
PCA_compressed = code_dict['PCA_compressed']
eig_compressed = code_dict['eig_compressed']
mean = code_dict['mean']

image_comp = (PCA_compressed @ np.transpose(eig_compressed)) + mean
return image_comp


class SVDCompression(BaseBlockCompression):

def compression_block(self, image):
def encode_block(self, image):
D = int(self.order*self.block_size)
[Umat,Smat,Vmat] = np.linalg.svd(image, full_matrices=False, compute_uv=True, hermitian=False)


S_compressed = Smat[:D]
U_compressed = Umat[:,:D]
V_compressed = Vmat[:D,:]

code_dict = {'S_compressed':S_compressed, 'U_compressed':U_compressed, 'V_compressed':V_compressed }

return code_dict

def decode_block(self, code_dict):
S_compressed = code_dict['S_compressed']
U_compressed = code_dict['U_compressed']
V_compressed = code_dict['V_compressed']


img_compressed = np.dot(U_compressed * S_compressed, V_compressed)

return np.round(img_compressed)


Expand Down
36 changes: 23 additions & 13 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
from tqdm import tqdm



orders = [0.20, 0.30, 0.50]
all_methods = ['svd', 'dft', 'pca']
all_methods = ['dft', 'dct', 'pca', 'svd']
block_shapes = [8, 32, 64]
data_folder = 'datasets/kodak/'

Expand All @@ -22,39 +21,50 @@ def test_for_parameters(compressor, data_folder):
ssim_list = []
psnr_list = []
lpips_list = []
time_elapsed = []
time_encode = []
time_decode = []

for imname in tqdm(img_list):

img_rgb = imread(data_folder, imname)
start = time.time()
img_rgb_compress = compressor.compress_rgb(img_rgb)
end = time.time()
imwrite(results_folder , method ,imname,img_rgb_compress)
start_encode = time.time()
img_rgb_code = compressor.encode_rgb(img_rgb)
end_encode = time.time()

start_decode = time.time()
img_rgb_compress = compressor.decode_rgb(img_rgb_code)
end_decode = time.time()

ssim_val = ssim(img_rgb,img_rgb_compress)
psnr_val = PSNR(img_rgb,img_rgb_compress)
imwrite(results_folder, method, imname, img_rgb_compress)

ssim_val = ssim(img_rgb, img_rgb_compress)
psnr_val = PSNR(img_rgb, img_rgb_compress)
lpips_val = lpips(img_rgb, img_rgb_compress)
time_val = end - start

time_encode_val = end_encode - start_encode
time_decode_val = end_decode - start_decode


ssim_list.append(ssim_val)
psnr_list.append(psnr_val)
lpips_list.append(lpips_val)
time_elapsed.append(time_val)
time_encode.append(time_encode_val)
time_decode.append(time_decode_val)


ssim_avg = average_l(ssim_list)
psnr_avg = average_l(psnr_list)
lpips_avg = average_l(lpips_list)
time_avg = average_l(time_elapsed)
time_encode_avg = average_l(time_encode)
time_decode_avg = average_l(time_decode)


df = pd.DataFrame({'image_name': (img_list+['mean']),
'ssim': ssim_list + [ssim_avg],
'psnr': psnr_list + [psnr_avg],
'lpips': lpips_list + [lpips_avg],
'time':time_elapsed + [time_avg]})
'time_encode':time_encode + [time_encode_avg],
'time_decode':time_decode + [time_decode_avg]})

csv_path = results_folder + '/' + method +'/results_num.csv'
df.to_csv(csv_path)
Expand Down
8 changes: 4 additions & 4 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def prepare_results_folder(order, block_shape, all_methods):
if not os.path.exists(results_folder):
os.mkdir(results_folder)

for method in all_methods:
method_folder = results_folder+'/'+method
if not os.path.exists(method_folder):
os.mkdir(method_folder)
for method in all_methods:
method_folder = results_folder+'/'+method
if not os.path.exists(method_folder):
os.mkdir(method_folder)

return results_folder

Expand Down

0 comments on commit 221cc1e

Please sign in to comment.