Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing #4

Open
wants to merge 5 commits into
base: visualization
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified SJ_Haar_CNV/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file modified SJ_Haar_CNV/__pycache__/base_search.cpython-311.pyc
Binary file not shown.
20 changes: 11 additions & 9 deletions SJ_Haar_CNV/decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ def decompose (signal, base):
# Initial set of coefficient
for wavelet in base:
# Compute the wavelet coefficients.
coefficients.append(signal * generate_wavelet_function (wavelet))
coefficients.append((signal * generate_wavelet_function (wavelet)).sum())

#Normalize the coefficients
coefficients = np.array(coefficients)
coefficients = coefficients / np.sum(coefficients)

def difference (coefficients, signal, base, difference_transformation = lambda x: np.abs(x)):
def difference (coefficients, signal, base, difference_transformation = lambda x: x**2):
"""
Compute the difference between the signal and the sum of wavelets.
"""
Expand All @@ -32,24 +32,26 @@ def generate_wavelet_function (wavelet):
"""
Generate a wavelet function from a wavelet.
"""
wavelet_parts = [np.repeat (v, l) for v, l in wavelet[2:]]
wavelet_parts = [np.repeat (v, l) for v, l in wavelet[3:]]
return np.concatenate (wavelet_parts)

def generate_function_from_wavelets (coefficients, base):
"""
Generate a function from a set of wavelets.
"""
wf = []
assert base[0][0] == 0 & base[0][1] == 0; "The base is not ordered as expected."
wf = np.zeros (base[0][3][1])
for c, b in zip(coefficients, base):
wf.append (c * generate_wavelet_function (b))
_,_,start, (va, na), (vb, nb) = b
wf[start:start+na+nb] += (c * generate_wavelet_function (b))

return np.sum(np.array(wf), axis=1)
return np.array(wf)

def test ():
SA = np.sqrt (1/(250 - 0) - 1/(1000 - 0 + 1))
SB = np.sqrt (1/(1000 - 250) - 1/(1000 - 0 + 1))
base = [[0,0,(0,0), (10,1000),(0,0),(0,0)],
[1,0,(0,0), (SA,250), (SB, 750), (0,0)]]
base = [[0,0, 0, (10,1000),(0,0)],
[1,0, 0, (SA,250), (SB, 750)]]

coefficients = [1, 10]

Expand Down
5 changes: 4 additions & 1 deletion SJ_Haar_CNV/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pandas as pd, numpy as np, plotly.express as px
import scipy.stats as sts


"""
I'm just shoving some functions here for now. I'll clean this up later.
Expand All @@ -13,4 +15,5 @@ def expand_wavelet(df):

def visualize_wavelet(df):
data = expand_wavelet(df)
px.line(data, y = 'value',line_shape='hv').show()
px.line(data, y = 'value',line_shape='hv').show()

84 changes: 84 additions & 0 deletions Transfer_Space/rle_difference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import matplotlib.pyplot as plt

# Decode RLE sequence into a regular sequence
def decode_rle(rle):
decoded = []
for value, count in rle:
decoded.extend([value] * count)
return decoded

# Encode a sequence into RLE format
def encode_rle(sequence):
if not sequence:
return []
rle = []
current_value = sequence[0]
current_count = 1
for value in sequence[1:]:
if value == current_value:
current_count += 1
else:
rle.append((current_value, current_count))
current_value = value
current_count = 1
rle.append((current_value, current_count))
return rle

# Compare two RLEs and return the RLE of their differences
def compare_rle_as_vectors(rle1, rle2):
decoded1 = decode_rle(rle1)
decoded2 = decode_rle(rle2)

max_len = max(len(decoded1), len(decoded2))
decoded1 += [0] * (max_len - len(decoded1))
decoded2 += [0] * (max_len - len(decoded2))

differences = [a - b for a, b in zip(decoded1, decoded2)]
return encode_rle(differences)

# Plot the RLE comparison and highlight differences with horizontal red lines
def plot_rle_comparison(rle1, rle2, rle_diff):
decoded1 = decode_rle(rle1)
decoded2 = decode_rle(rle2)
decoded_diff = decode_rle(rle_diff)

# Create the x-axis positions for each value in the decoded sequences
x1 = list(range(len(decoded1)))
x2 = list(range(len(decoded2)))

plt.figure(figsize=(12, 6))

# Plot RLE 1 and RLE 2 as step plots
plt.step(x1, decoded1, where='mid', label='RLE 1', alpha=0.7)
plt.step(x2, decoded2, where='mid', label='RLE 2', alpha=0.7)

# Highlight the differences with horizontal red lines across ranges
start_diff = None
for i in range(len(decoded_diff)):
if decoded_diff[i] != 0 and start_diff is None:
start_diff = i # Start of a difference
elif decoded_diff[i] == 0 and start_diff is not None:
# End of a difference range, plot a horizontal line
plt.hlines(y=decoded1[start_diff], xmin=start_diff, xmax=i, color='red', lw=3)
start_diff = None

# If there's a difference at the very end
if start_diff is not None:
plt.hlines(y=decoded1[start_diff], xmin=start_diff, xmax=len(decoded_diff), color='red', lw=3)

plt.legend()
plt.title('Comparison of Two RLE Sequences with Differences Highlighted')
plt.xlabel('Position')
plt.ylabel('Value')
plt.show()

# Example usage
rle1 = [(1, 3), (2, 4), (3, 10), (1, 1)]
rle2 = [(1, 3), (2, 3), (3, 3), (2, 5)]

rle_diff = compare_rle_as_vectors(rle1, rle2)
print("RLE of differences:")
print(rle_diff)

# Plot the comparison between the two RLE sequences and highlight the differences
plot_rle_comparison(rle1, rle2, rle_diff)
44 changes: 44 additions & 0 deletions runnig_tests.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from tests import test_data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"testing_cnv = [[(5,1000)],\n",
" [(5,350),(7,700)]]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Binary file modified tests/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file modified tests/__pycache__/test_data.cpython-311.pyc
Binary file not shown.
186 changes: 186 additions & 0 deletions tests/trial_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import numpy as np
import scipy.optimize as opt

# ------------------ Test Data Generation ------------------ #
def generate_test_data(cnv_string, noise):
"""
Generate simple data for testing.
"""
signal_values = np.concatenate([np.repeat(v, l) for v, l in cnv_string])
noise_values = noise(np.sum([l for _, l in cnv_string]))
return signal_values + noise_values

def noise(n, s=2):
"""
Generate noise.
"""
return np.random.normal(0, s, n)

def test_data():
cnv_string = [(5, 100), (3, 1345), (4.9, 99), (3.1, 1345)]
return generate_test_data(cnv_string, noise)

# ------------------ Haar Wavelet Functions ------------------ #
def haar_high(s, b, e):
"""
This is the first term in the page 10 equation.
"""
return np.sqrt(1 / (b - s + 1) - 1 / (e - s + 1))

def haar_low(s, b, e):
"""
This is the second term in the page 10 equation.
"""
return - np.sqrt(1 / (e - b) - 1 / (e - s + 1))

def basis_vector(s, b, e):
"""
This creates a single basis vector for the haar wavelet, parameterized by s, b, e.
"""
high = haar_high(s, b, e)
low = haar_low(s, b, e)
array = np.zeros(e - s)
array[0:b - s] = high
array[b - s:] = low
return array

def haar_matrix(s, e):
"""
This creates the matrix of basis vectors caused by iterating all possible break points.
"""
matrix = np.zeros((e - s - 1, e - s))
for i, b in enumerate(range(s + 1, e)):
matrix[i, :] = basis_vector(s, b, e)
return matrix

def choose_break(signal, s, e, p0=0.80, debug=False):
"""
This function chooses the best break point for the signal between s and e.
"""
if not (.5 <= p0 < 1):
raise ValueError("p0 must be between [.5,1).")

matrix = haar_matrix(s, e)
scores = np.abs(np.matmul(matrix, signal[s:e]))

trunc_scores = scores[int((1 - p0) * len(scores)):int(p0 * len(scores))]
best_options = np.argwhere(trunc_scores == np.nanmax(trunc_scores)).flatten() + 1 + s + int((1 - p0) * len(scores))
solution = best_options[np.abs(best_options - signal.size // 2).argmin()]

if debug:
return matrix, scores, best_options, solution
else:
return solution

def create_basis_form(s, b, e):
"""
This function creates the basis form of the haar wavelet.
"""
high = haar_high(s, b, e)
low = haar_low(s, b, e)
return [0, e - s, s, (high, b - s), (low, e - b)]

def generate_haar_basis(signal, p0=0.95, length=20, debug=False):
"""
This function generates the haar basis for the signal.
"""
d = length if isinstance(length, int) else int(length(signal.size))
s = 0
e = signal.size
done = [[0, 0, 0, (signal.mean(), len(signal)), (0, 0)]]
todo = [(0, s, e)]

while len(todo) > 0:
depth, s, e = todo.pop(0)
if e - s >= d:
if debug:
print(f"todo: {len(todo)}, done: {len(done)}, len signal: {e-s}")
break_point = choose_break(signal, s, e, p0)
if break_point == s or break_point == e:
break_point = (s + e) // 2
solution = create_basis_form(s, break_point, e)
solution[0] = depth
if solution not in done:
done.append(solution)
todo.append((depth + 1, s, break_point))
todo.append((depth + 1, break_point + 1, e))
return done

# ------------------ Decomposition Functions ------------------ #
def decompose(signal, base):
"""
Decompose the signal into a set of wavelets.
"""
coefficients = []

# Compute wavelet coefficients
for wavelet in base:
full_wavelet = np.zeros(len(signal))
full_wavelet[wavelet[2]:wavelet[2] + wavelet[3][1] + wavelet[4][1]] = generate_wavelet_function(wavelet)
coefficients.append((signal * full_wavelet).sum())

# Normalize the coefficients
coefficients = np.array(coefficients)
coefficients = coefficients / np.sum(coefficients)

# Define the difference function for optimization
def difference(coefficients, signal, base, difference_transformation=lambda x: np.abs(x)):
return np.sum(difference_transformation(signal - generate_function_from_wavelets(coefficients, base)))

res = opt.minimize(difference, coefficients, args=(signal, base))
return res

def generate_wavelet_function(wavelet):
"""
Generate a wavelet function from a wavelet.
"""
wavelet_parts = [np.repeat(v, l) for v, l in wavelet[3:]]
return np.concatenate(wavelet_parts)

def generate_function_from_wavelets(coefficients, base):
"""
Generate a function from a set of wavelets.
"""
wf = np.zeros(base[0][3][1])
for c, b in zip(coefficients, base):
_, _, start, (_, na), (_, nb) = b
wf[start:start + na + nb] += c * generate_wavelet_function(b)
return wf

# ------------------ Running the Full Process ------------------ #
# Generate the test data (signal with noise)
signal = test_data()

# Generate the Haar wavelet basis for the signal
haar_basis = generate_haar_basis(signal, p0=0.95, length=20)

# Decompose the signal using the Haar wavelet basis
decomposition_result = decompose(signal, haar_basis)

# Print decomposition results
print("Decomposition coefficients:", decomposition_result.x)

# Reconstruct the signal from the decomposition coefficients
reconstructed_signal = generate_function_from_wavelets(decomposition_result.x, haar_basis)
reconstructed_signal

# Compare the original signal with the reconstructed signal
difference = signal - reconstructed_signal

# Print results
print("Original Signal:", signal)
print("Reconstructed Signal:", reconstructed_signal)
print("Difference between Original and Reconstructed Signal:", difference)

# Plotting the original signal, reconstructed signal, and their difference on the same plot
plt.figure(figsize=(12, 6))

plt.plot(signal, label='Original Signal', color='blue',alpha=0.7)
plt.plot(reconstructed_signal, label='Reconstructed Signal', color='green',alpha=0.3)
plt.plot(difference, label='Difference (Original - Reconstructed)', color='red',alpha=0.3)

plt.title('Original Signal, Reconstructed Signal, and Difference')
plt.legend()

# Display the plot
plt.show()