-
Notifications
You must be signed in to change notification settings - Fork 52
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Vivek Jayaram
authored and
Vivek Jayaram
committed
Apr 30, 2018
1 parent
ee0a886
commit 267c210
Showing
5 changed files
with
325 additions
and
172 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,25 @@ | ||
import argparse | ||
|
||
from pychorus.helpers import find_chorus | ||
from __future__ import division | ||
|
||
import argparse | ||
|
||
from pychorus.helpers import find_and_output_chorus | ||
|
||
|
||
def main(args): | ||
find_chorus(args.input_file, args.min_clip_length) | ||
find_and_output_chorus(args.input_file, args.output_file, args.min_clip_length) | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser(description="Select and output the chorus of a piece of music") | ||
parser.add_argument("input_file", help="Path to input audio file") | ||
parser.add_argument("--min_clip_length", default=10, help="Minimum length (in seconds) to be considered a chorus") | ||
parser = argparse.ArgumentParser( | ||
description="Select and output the chorus of a piece of music") | ||
parser.add_argument("input_file", help="Path to input audio file") | ||
parser.add_argument( | ||
"--output_file", | ||
default="chorus.wav", | ||
help="Output file") | ||
parser.add_argument( | ||
"--min_clip_length", | ||
default=15, | ||
help="Minimum length (in seconds) to be considered a chorus") | ||
|
||
main(parser.parse_args()) | ||
main(parser.parse_args()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from pychorus.helpers import find_and_output_chorus, find_chorus |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,14 @@ | ||
# Denoising size in seconds | ||
SMOOTHING_SIZE_SEC = 1.2 | ||
SMOOTHING_SIZE_SEC = 2.5 | ||
|
||
# Number of samples to consider in one chunk. | ||
# Smaller values take more time, but are more accurate | ||
N_FFT = 2**14 | ||
|
||
# For line detection | ||
LINE_THRESHOLD = 0.15 | ||
MIN_LINES = 8 | ||
NUM_ITERATIONS = 8 | ||
|
||
# We allow an error proportional to the length of the clip | ||
OVERLAP_PERCENT_MARGIN = 0.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,143 +1,178 @@ | ||
import librosa | ||
import librosa.display | ||
import numpy as np | ||
import scipy.signal | ||
import soundfile as sf | ||
|
||
from math import sqrt | ||
|
||
import matplotlib.pyplot as plt | ||
from pychorus.similarity_matrix import TimeTimeSimilarityMatrix, TimeLagSimilarityMatrix, Line | ||
from pychorus.constants import N_FFT, SMOOTHING_SIZE_SEC, LINE_THRESHOLD, MIN_LINES, \ | ||
NUM_ITERATIONS, OVERLAP_PERCENT_MARGIN | ||
|
||
from constants import N_FFT, SMOOTHING_SIZE_SEC | ||
|
||
|
||
class Line(object): | ||
def __init__(self, start, end, lag): | ||
self.start = start | ||
self.end = end | ||
self.lag = lag | ||
|
||
def __repr__(self): | ||
return "Line ({} {} {})".format(self.start, self.end, self.lag) | ||
|
||
def compute_time_lag_matrix(chroma): | ||
num_samples = chroma.shape[1] | ||
broadcast_x = np.repeat(np.expand_dims(chroma, 2) , num_samples + 1, axis=2) | ||
circulant_y = np.tile(chroma, (1, num_samples + 1)).reshape(12, num_samples, num_samples + 1) | ||
time_lag_similarity = 1 - (np.linalg.norm((broadcast_x - circulant_y), axis=0) / sqrt(12)) | ||
time_lag_similarity = np.rot90(time_lag_similarity, k=1, axes=(0,1)) | ||
return time_lag_similarity[:num_samples, :num_samples] | ||
|
||
|
||
def compute_time_time_matrix(chroma): | ||
broadcast_x = np.expand_dims(chroma, 2) # (12 x n x 1) | ||
broadcast_y = np.swapaxes(np.expand_dims(chroma, 2), 1, 2) # (12 x 1 x n) | ||
time_time_matrix = 1 - (np.linalg.norm((broadcast_x - broadcast_y), axis=0) / sqrt(12)) | ||
return time_time_matrix | ||
|
||
def local_maxima_rows(denoised_time_lag): | ||
row_sums = np.sum(denoised_time_lag, axis=1) | ||
divisor = np.arange(row_sums.shape[0], 0, -1) | ||
normalized_rows = row_sums / divisor | ||
local_minima_rows = scipy.signal.argrelextrema(normalized_rows, np.greater) | ||
return local_minima_rows[0] | ||
|
||
|
||
def detect_lines(denoised_time_lag, rows): | ||
num_samples = denoised_time_lag.shape[0] | ||
line_segments = [] | ||
cur_segment_start = None | ||
for row in rows: | ||
if row < 50: | ||
continue | ||
for col in range(row, num_samples): | ||
if denoised_time_lag[row, col] > 0.15: | ||
if cur_segment_start is None: | ||
cur_segment_start = col | ||
else: | ||
if (cur_segment_start is not None) and (col - cur_segment_start) > 50: | ||
line_segments.append(Line(cur_segment_start, col, row)) | ||
cur_segment_start = None | ||
|
||
return line_segments | ||
|
||
|
||
def covering_lines(lines, margin): | ||
lines_dict = {} | ||
for line in lines: | ||
lines_dict[line] = 0 | ||
|
||
# Check if line2 completely covers line 1 | ||
for line_1 in lines: | ||
for line_2 in lines: | ||
if (line_2.start < (line_1.start + margin)) and (line_2.end > (line_1.end - margin)) and (abs(line_2.lag - line_1.lag) > 50): | ||
lines_dict[line_1] += 1 | ||
if ((line_2.start - line_2.lag) < (line_1.start - line_1.lag + margin)) and ((line_2.end - line_2.lag) > (line_1.end - line_1.lag - margin)) and (abs(line_2.lag - line_1.lag) > 50): | ||
lines_dict[line_1] += 1 | ||
return lines_dict | ||
|
||
|
||
def denoise_time_lag(input_matrix, time_time_matrix, smoothing_size): | ||
n = input_matrix.shape[0] | ||
horizontal_smoothing_window = np.ones((1, smoothing_size)) / smoothing_size | ||
horizontal_moving_average = scipy.signal.convolve2d(input_matrix, horizontal_smoothing_window, mode="full") | ||
left_average = horizontal_moving_average[:, 0:n] | ||
right_average = horizontal_moving_average[:, smoothing_size - 1:] | ||
max_horizontal_average = np.maximum(left_average, right_average) | ||
|
||
vertical_smoothing_window = np.ones((smoothing_size, 1)) / smoothing_size | ||
vertical_moving_average = scipy.signal.convolve2d(input_matrix, vertical_smoothing_window, mode="full") | ||
down_average = vertical_moving_average[0:n, :] | ||
up_average = vertical_moving_average[smoothing_size - 1:, :] | ||
|
||
|
||
diagonal_moving_average = scipy.signal.convolve2d(time_time_matrix, horizontal_smoothing_window, mode="full") | ||
ur = np.zeros((n,n)) | ||
ll = np.zeros((n,n)) | ||
for x in range(n): | ||
for y in range(x): | ||
ll[y,x] = diagonal_moving_average[x-y, x] | ||
ur[y,x] = diagonal_moving_average[x-y, x+smoothing_size - 1] | ||
|
||
non_horizontal_max = np.maximum.reduce([down_average, up_average, ll, ur]) | ||
non_horizontal_min = np.minimum.reduce([up_average, down_average, ll, ur]) | ||
|
||
suppression = (max_horizontal_average > non_horizontal_max) * non_horizontal_min + (max_horizontal_average <= non_horizontal_max) * non_horizontal_max | ||
denoised_matrix = scipy.ndimage.filters.gaussian_filter1d(np.triu(input_matrix - suppression), 5*smoothing_size, axis=1) | ||
denoised_matrix = np.maximum(denoised_matrix, 0) | ||
denoised_matrix[0:5, :] = 0 | ||
return denoised_matrix | ||
|
||
|
||
def find_chorus(input_file, clip_length): | ||
print("Loading file") | ||
y, sr = librosa.load(input_file) | ||
song_length_sec = y.shape[0]/float(sr) | ||
S = np.abs(librosa.stft(y, n_fft=N_FFT))**2 | ||
chroma = librosa.feature.chroma_stft(S=S, sr=sr) | ||
num_samples = chroma.shape[1] | ||
|
||
print("Calculating time lag similarity matrix") | ||
time_time_similarity = compute_time_time_matrix(chroma) | ||
time_lag_similarity = compute_time_lag_matrix(chroma) | ||
|
||
chroma_sr = num_samples/song_length_sec | ||
smoothing_size_samples = int(SMOOTHING_SIZE_SEC * chroma_sr) | ||
denoised_time_lag = denoise_time_lag(time_lag_similarity, time_time_similarity, smoothing_size_samples) | ||
rows = local_maxima_rows(denoised_time_lag) | ||
lines = detect_lines(denoised_time_lag, rows) | ||
|
||
covered_lines = covering_lines(lines, 10) | ||
import pdb | ||
pdb.set_trace() | ||
|
||
# librosa.display.specshow(time_lag_similarity) | ||
# plt.show() | ||
# plt.figure(figsize=(10, 4)) | ||
librosa.display.specshow(denoised_time_lag, y_axis='time', x_axis='time', sr=2756.25) #sr=(2**14)/6) | ||
plt.colorbar() | ||
plt.set_cmap("hot_r") | ||
plt.show() | ||
# plt.colorbar() | ||
# plt.title('Chromagram') | ||
# plt.tight_layout() | ||
# plt.show() | ||
"""Find rows whose normalized sum is a local maxima""" | ||
row_sums = np.sum(denoised_time_lag, axis=1) | ||
divisor = np.arange(row_sums.shape[0], 0, -1) | ||
normalized_rows = row_sums / divisor | ||
local_minima_rows = scipy.signal.argrelextrema(normalized_rows, np.greater) | ||
return local_minima_rows[0] | ||
|
||
|
||
def detect_lines(denoised_time_lag, rows, min_length_samples): | ||
"""Detect lines in the time lag matrix. Reduce the threshold until we find enough lines""" | ||
cur_threshold = LINE_THRESHOLD | ||
for _ in range(NUM_ITERATIONS): | ||
line_segments = detect_lines_helper(denoised_time_lag, rows, | ||
cur_threshold, min_length_samples) | ||
if len(line_segments) >= MIN_LINES: | ||
return line_segments | ||
cur_threshold *= 0.95 | ||
|
||
return line_segments | ||
|
||
|
||
def detect_lines_helper(denoised_time_lag, rows, threshold, | ||
min_length_samples): | ||
"""Detect lines where at least min_length_samples are above threshold""" | ||
num_samples = denoised_time_lag.shape[0] | ||
line_segments = [] | ||
cur_segment_start = None | ||
for row in rows: | ||
if row < min_length_samples: | ||
continue | ||
for col in range(row, num_samples): | ||
if denoised_time_lag[row, col] > threshold: | ||
if cur_segment_start is None: | ||
cur_segment_start = col | ||
else: | ||
if (cur_segment_start is not None | ||
) and (col - cur_segment_start) > min_length_samples: | ||
line_segments.append(Line(cur_segment_start, col, row)) | ||
cur_segment_start = None | ||
return line_segments | ||
|
||
|
||
def count_overlapping_lines(lines, margin, min_length_samples): | ||
"""Look at all pairs of lines and see which ones overlap vertically and diagonally""" | ||
line_scores = {} | ||
for line in lines: | ||
line_scores[line] = 0 | ||
|
||
# Iterate over all pairs of lines | ||
for line_1 in lines: | ||
for line_2 in lines: | ||
# If line_2 completely covers line_1 (with some margin), line_1 gets a point | ||
lines_overlap_vertically = ( | ||
line_2.start < (line_1.start + margin)) and ( | ||
line_2.end > (line_1.end - margin)) and ( | ||
abs(line_2.lag - line_1.lag) > min_length_samples) | ||
|
||
lines_overlap_diagonally = ( | ||
(line_2.start - line_2.lag) < (line_1.start - line_1.lag + margin)) and ( | ||
(line_2.end - line_2.lag) > (line_1.end - line_1.lag - margin)) and ( | ||
abs(line_2.lag - line_1.lag) > min_length_samples) | ||
|
||
if lines_overlap_vertically or lines_overlap_diagonally: | ||
line_scores[line_1] += 1 | ||
|
||
return line_scores | ||
|
||
|
||
def best_segment(line_scores): | ||
"""Return the best line, sorted first by chorus matches, then by duration""" | ||
lines_to_sort = [] | ||
for line in line_scores: | ||
lines_to_sort.append((line, line_scores[line], line.end - line.start)) | ||
|
||
lines_to_sort.sort(key=lambda x: (x[1], x[2]), reverse=True) | ||
best_tuple = lines_to_sort[0] | ||
return best_tuple[0] | ||
|
||
|
||
def draw_lines(num_samples, sample_rate, lines): | ||
"""Debugging function to draw detected lines in black""" | ||
lines_matrix = np.zeros((num_samples, num_samples)) | ||
for line in lines: | ||
lines_matrix[line.lag:line.lag + 4, line.start:line.end + 1] = 1 | ||
|
||
# Import here since this function is only for debugging | ||
import librosa.display | ||
import matplotlib.pyplot as plt | ||
librosa.display.specshow( | ||
lines_matrix, | ||
y_axis='time', | ||
x_axis='time', | ||
sr=sample_rate / (N_FFT / 2048)) | ||
plt.colorbar() | ||
plt.set_cmap("hot_r") | ||
plt.show() | ||
|
||
|
||
def find_chorus(chroma, sr, song_length_sec, clip_length): | ||
""" | ||
Find the most repeated chorus | ||
Args: | ||
chroma: 12 x n frequency chromogram | ||
sr: sample rate of the song, usually 22050 | ||
song_length_sec: length in seconds of the song (lost in processing chroma) | ||
clip_length: minimum length in seconds we want our chorus to be (at least 10-15s) | ||
""" | ||
num_samples = chroma.shape[1] | ||
|
||
print("Calculating time lag similarity matrix") | ||
time_time_similarity = TimeTimeSimilarityMatrix(chroma, sr) | ||
time_lag_similarity = TimeLagSimilarityMatrix(chroma, sr) | ||
|
||
# Denoise the time lag matrix | ||
chroma_sr = num_samples / song_length_sec | ||
smoothing_size_samples = int(SMOOTHING_SIZE_SEC * chroma_sr) | ||
time_lag_similarity.denoise(time_time_similarity.matrix, | ||
smoothing_size_samples) | ||
|
||
# Detect lines in the image | ||
clip_length_samples = clip_length * chroma_sr | ||
candidate_rows = local_maxima_rows(time_lag_similarity.matrix) | ||
lines = detect_lines(time_lag_similarity.matrix, candidate_rows, | ||
clip_length_samples) | ||
if len(lines) == 0: | ||
print("No choruses were detected. Try a smaller search duration") | ||
return None | ||
line_scores = count_overlapping_lines( | ||
lines, OVERLAP_PERCENT_MARGIN * clip_length_samples, | ||
clip_length_samples) | ||
best_chorus = best_segment(line_scores) | ||
return best_chorus.start / chroma_sr | ||
|
||
|
||
def find_and_output_chorus(input_file, output_file, clip_length): | ||
""" | ||
Finds the most repeated chorus from input_file and outputs to output file. | ||
Args: | ||
input_file: string specifying the input file | ||
output_file: string where to write the chorus (wav only) | ||
None means don't write anything | ||
clip_length: minimum length in seconds of the chorus | ||
Returns: Time in seconds of the start of the best chorus | ||
""" | ||
print("Loading file") | ||
y, sr = librosa.load(input_file) | ||
song_length_sec = y.shape[0] / float(sr) | ||
S = np.abs(librosa.stft(y, n_fft=N_FFT))**2 | ||
chroma = librosa.feature.chroma_stft(S=S, sr=sr) | ||
|
||
chorus_start = find_chorus(chroma, sr, song_length_sec, clip_length) | ||
if chorus_start is None: | ||
return | ||
|
||
print("Best chorus found at {0:g} min {1:.2f} sec".format( | ||
chorus_start // 60, chorus_start % 60)) | ||
|
||
if output_file is not None: | ||
chorus_wave_data = y[int(chorus_start*sr) : int((chorus_start+clip_length)*sr)] | ||
sf.write(output_file, chorus_wave_data, sr) | ||
#librosa.output.write_wav(output_file, chorus_wave_data, sr) | ||
|
||
return chorus_start |
Oops, something went wrong.