#####################################################################
# Wavelet filtering in 1 dimension
# v1.0, copyright Markus Rexroth, EPFL, 2016
# License: BSD 3-clause license
# Based with permission on a script by Remy Joseph, EPFL
#####################################################################
# We use the "a trous" method with B3 spline scaling function
# and the resulting h coefficients
# For details, see e.g. the paper Rexroth et al. 2017
# or the appendix A in the book "Astronomical
# image and data analysis" by Starck and Murtagh
# Note that we use a slightly modified algorithm: We calculate the
# wavelets by using 2 convolutions, not one.
# We account for this in the wavelet - image space back transformation
# The chosen B3 scaling function is well suited for isotropic signals
# (e.g. Gaussian emission lines). For anisotropic signals, it might
# be better to pick another scaling function.
import numpy as np
from scipy.ndimage import convolve1d
__all__ = ('wavelet_transform', 'wavelet_backTransform', 'cleanSignal')
# See book by Starck
H_COEFFICIENTS_LIST = np.array([1 / 16.0, 1 / 4.0, 3 / 8.0, 1 / 4.0, 1 / 16.0])
def test_levels(signal, levels):
"""Test if the chosen levels are too many for our signal (sampling
condition).
"""
signal = np.asarray(signal)
h_length = len(H_COEFFICIENTS_LIST)
signalSize = signal.size
max_level = np.log2((signalSize - 1.0) / (h_length - 1.0))
if levels > max_level:
# If the (level+1)-th h array is larger than the signal array, the
# final smoothing function for level number of wavelets is wider than
# the signal range itself, which has to be avoided. Thus h_length
# + (2**level -1)*(h_length-1) > signalSize must be avoided (the second
# term = number of 0s in h array)
levels = int(np.floor(max_level))
raise IOError("Attention: The chosen number of levels exceeds the "
"number allowed (sampling condition). Thus it was "
"automatically set to the maximum number allowed = {}"
.format(levels))
return levels
def get_h_coefficients(levels):
"""Build the list of h coefficient arrays.
The spacing between the values is 2**i_level -1 (see Starck book).
"""
h_length = len(H_COEFFICIENTS_LIST)
h_coefficients = []
for i in range(levels):
# Array long enough to store the spacing and the values
max_length = ((2**i) - 1) * (h_length - 1) + h_length
h_array = np.zeros(max_length)
counter = 0
for index in range(len(h_array)):
# Now we enter the values and ensure that the spacing between them
# is correct
if index % (2**i) == 0:
h_array[index] = H_COEFFICIENTS_LIST[counter]
counter += 1
h_coefficients.append(h_array)
return h_coefficients
[docs]
def cleanSignal(signal, noise, levels, sigmaCutoff=5.0, epsilon=0.05):
"""Filter an input signal by using the 1 standard deviation noise estimate
and wavelets epsilon is the iteration-stop parameter for extracting signal
from the residual signal.
"""
signal = np.array(signal, dtype=float)
noise = np.array(noise, dtype=float)
# If we have missing values, set the signal to 0 and the noise to 100000
# standard deviations to downweigh the signal
nans = np.isnan(signal) & np.isnan(noise)
if nans.any():
signal[nans] = 0.0
noise[nans] = np.inf
sigmaCutoff = float(sigmaCutoff)
epsilon = float(epsilon)
signalSize = signal.size
# Test if the chosen levels are too many for our signal
levels = test_levels(signal, levels)
# We have a dirac function, 0 everywhere except 1 in the central pixel
# (pixel value = integral over signal in area of pixel = 1 for a dirac
# function)
diracSignal = np.zeros(signalSize)
diracSignal[signalSize // 2] = 1.0
# We calculate the wavelet coefficients. We do not need the coefficients
# for the "smoothing function", thus we delete the last array (.shape[0]
# gives us the number of arrays)
dirac_coefficients = wavelet_transform(diracSignal, levels)[:-1]
# We calculate the wavelet coefficients for the noise (= 1 sigma**2):
# convolve each row to obtain the variance
variance_coefficients = convolve1d(dirac_coefficients**2.0, noise**2.0,
mode='wrap')
# Get standard deviation array of arrays
noise_coef = variance_coefficients**(1 / 2.0)
# Create the multiresolution support
signal_coef = np.abs(wavelet_transform(signal, levels))
# The support is 0 for every non-significant wavelet and 1 for every
# significant wavelet/smoothing function coefficient. If the coefficient
# is less than (x+1)*sigma detection, we consider it as noise. We increase
# the threshold for i = 0 = high frequencies, as typically noise has high
# frequencies and signal has lower frequencies.
M_support = np.vstack([
signal_coef[0] >= ((sigmaCutoff + 1.0) * noise_coef[0]),
signal_coef[1:-1] >= (sigmaCutoff * noise_coef[1:]),
# Last row: smoothing function coefficients are always all significant
np.ones(signalSize, dtype=bool)
])
# Do the cleaning
cleaned_signal = np.zeros(signalSize)
# Initialize the standard deviation of the residual signal
residual_signal_sigma_old = 0.0
# Initialize the residual signal
residual_signal = signal
residual_signal_sigma = np.std(residual_signal)
# We can still extract signal from the residual. We do this here after the
# first iteration until the epsilon condition is false and add it to the
# already extracted signal
while np.abs((residual_signal_sigma_old - residual_signal_sigma) /
residual_signal_sigma) > epsilon:
# We continue to extract until the standard deviation doesn't change
# too much
residual_coefficients = wavelet_transform(residual_signal, levels)
# We clean the non-significant wavelets
residual_coefficients[~M_support] = 0.0
cleaned_signal += wavelet_backTransform(residual_coefficients)
residual_signal = signal - cleaned_signal
residual_signal_sigma_old = residual_signal_sigma
residual_signal_sigma = np.std(residual_signal)
return cleaned_signal
def test(stdDev=5.0, random='yes', levels=3, sigmaCutoff=5.0, epsilon=0.05):
"""Test function for the functions defined above."""
def gauss(x, amplitude, mu, sigma):
return amplitude * np.exp(-(x - mu)**2 / (2.0 * sigma**2))
x = np.arange(-20, 20)
signal = gauss(x, 50.0, 0.0, 5.0)
if random == 'yes':
noise = np.random.normal(0, stdDev, np.size(signal))
elif random == 'no':
noise = np.array([1.57439344, 3.25840322, -2.5442794, 0.22431039,
2.66510799, -4.31975032, -2.38717285, -4.00639705,
-3.73459199, 7.390269, -6.76915999, 0.49809766,
9.03401679, -6.20273076, -1.08940482, 1.40441121,
10.27251409, -0.77355778, -4.31925802, -2.38433115,
5.43225019, 3.12188464, -2.53535334, -8.8736281,
-2.88545833, 2.2022186, 2.6992551, 4.13129327,
-1.13410893, -2.58064627, 1.01763015, 4.97684862,
-0.58867364, -1.41797943, -3.36680655, -7.25776942,
1.70606578, 2.10790981, 0.12066381, -0.16763699])
else:
raise ValueError("Error: random keyword must be yes or no!")
stddevlist = stdDev * np.ones_like(x)
signal_final = signal + noise
wavelet_signal = wavelet_transform(signal_final, levels)
reconstructed = wavelet_backTransform(wavelet_signal)
denoised = cleanSignal(signal_final, stddevlist, levels,
sigmaCutoff=sigmaCutoff, epsilon=epsilon)
import matplotlib.pyplot as plt
plt.plot(x, signal_final, color='blue', label='signal+noise')
plt.plot(x, reconstructed, 'r--', label='reconstructed')
plt.plot(x, signal, 'b--', label='signal')
plt.plot(x, denoised, color='black', ls='dashed', label='denoised')
plt.plot(x, signal - denoised, 'k--', label='residual')
plt.legend()