Source code for astroML.dimensionality.iterative_pca

import sys

import numpy as np

from scipy.linalg import solve


[docs]def iterative_pca(X, M, n_ev=5, n_iter=15, norm=None, full_output=False): """ Parameters ---------- X: ndarray, shape = (n_samples, n_features) input data M: ndarray, bool, shape = (n_samples, n_features) mask for input data. where mask == True, the spectrum is unconstrained n_ev: int number of eigenvectors to use in reconstructing masked regions n_iter: int number of iterations to find eigenvectors norm: string what type of normalization to use on the data. Options are - None : no normalization - 'L1' : L1-norm - 'L2' : L2-norm full_output: boolean (optional) if False (default) return only the reconstructed data X_recons if True, return the full information (see below) Returns ------- X_recons: ndarray, shape = (n_samples, n_features) data with masked regions reconstructed mu: ndarray, shape = (n_features,) mean of data evecs: ndarray, shape = (min(n_samples, n_features), n_features) eigenvectors of the reconstructed data evals: ndarray, size = min(n_samples, n_features) eigenvalues of the reconstructed data norms: ndarray, size = n_samples normalization of each input coeffs: ndarray, size = (n_samples, n_ev) coefficients used to reconstruct X """ X = np.asarray(X, dtype=float) M = np.asarray(M, dtype=bool) if X.shape != M.shape: raise ValueError('X and M must have the same shape') n_samples, n_features = X.shape if np.any(M.sum(0) == n_samples): raise ValueError('Some features are masked in all samples') if type(norm) == str: norm = norm.upper() if norm not in (None, 'none', 'L1', 'L2'): raise ValueError('unrecognized norm: %s' % norm) notM = (~M) X_recons = X.copy() X_recons[M] = 0 # as an initial guess, we'll fill-in masked regions with the mean # of the rest of the sample if norm is None: mu = (X_recons * notM).sum(0) / notM.sum(0) mu = mu * np.ones([n_samples, 1]) X_recons[M] = mu[M] else: # since we're normalizing each spectrum, and the norm depends on # the filled-in values, we need to iterate a few times to make # sure things are consistent. for i in range(n_iter): # normalize if norm == 'L1': X_recons /= np.sum(X_recons, 1)[:, None] else: X_recons /= np.sqrt(np.sum(X_recons ** 2, 1))[:, None] # find the mean mu = (X_recons * notM).sum(0) / notM.sum(0) mu = mu * np.ones([n_samples, 1]) X_recons[M] = mu[M] # Matrix of coefficients coeffs = np.zeros((n_samples, n_ev)) # Now we iterate through, using the principal components to reconstruct # these regions. for i in range(n_iter): sys.stdout.write(' PCA iteration %i / %i\r' % (i + 1, n_iter)) sys.stdout.flush() # normalize the data if norm == 'L1': X_recons /= np.sum(X_recons, 1)[:, None] else: X_recons /= np.sqrt(np.sum(X_recons ** 2, 1))[:, None] # now compute the principal components mu = X_recons.mean(0) X_centered = X_recons - mu U, S, VT = np.linalg.svd(X_centered, full_matrices=False) # perform a least-squares fit to estimate the coefficients of the # first n_ev eigenvectors for each data point. # The eigenvectors are in the rows of the matrix VT. # The coefficients are given by # a_n = [V_n^T W V_n]^(-1) V_n W x # Such that x can be reconstructed via # x_n = V_n a_n # Variables here are: # x : vector length n_features. This is a data point to be # reconstructed # a_n : vector of length n. These are the reconstruction weights # V_n : eigenvector matrix of size (n_features, n). # W : diagonal weight matrix of size (n_features, n_features) # such that W[i,i] = M[i] # x_n : vector of length n_features which approximates x VWx = np.dot(VT[:n_ev], (notM * X_centered).T) for i in range(n_samples): VWV = np.dot(VT[:n_ev], (notM[i] * VT[:n_ev]).T) coeffs[i] = solve(VWV, VWx[:, i], sym_pos=True, overwrite_a=True) X_fill = mu + np.dot(coeffs, VT[:n_ev]) X_recons[M] = X_fill[M] sys.stdout.write('\n') # un-normalize X_recons norms = np.zeros(n_samples) for i in range(n_samples): ratio_i = X[i][notM[i]] / X_recons[i][notM[i]] norms[i] = ratio_i[~np.isnan(ratio_i)][0] X_recons[i] *= norms[i] if full_output: return X_recons, mu, VT, S, norms, coeffs else: return X_recons