Source code for markov_clustering.mcl

import numpy as np
from scipy.sparse import isspmatrix, dok_matrix, csc_matrix
import sklearn.preprocessing
from .utils import MessagePrinter


[docs]def sparse_allclose(a, b, rtol=1e-5, atol=1e-8): """ Version of np.allclose for use with sparse matrices """ c = np.abs(a - b) - rtol * np.abs(b) # noinspection PyUnresolvedReferences return c.max() <= atol
[docs]def normalize(matrix): """ Normalize the columns of the given matrix :param matrix: The matrix to be normalized :returns: The normalized matrix """ return sklearn.preprocessing.normalize(matrix, norm="l1", axis=0)
[docs]def inflate(matrix, power): """ Apply cluster inflation to the given matrix by raising each element to the given power. :param matrix: The matrix to be inflated :param power: Cluster inflation parameter :returns: The inflated matrix """ if isspmatrix(matrix): return normalize(matrix.power(power)) return normalize(np.power(matrix, power))
[docs]def expand(matrix, power): """ Apply cluster expansion to the given matrix by raising the matrix to the given power. :param matrix: The matrix to be expanded :param power: Cluster expansion parameter :returns: The expanded matrix """ if isspmatrix(matrix): return matrix ** power return np.linalg.matrix_power(matrix, power)
[docs]def add_self_loops(matrix, loop_value): """ Add self-loops to the matrix by setting the diagonal to loop_value :param matrix: The matrix to add loops to :param loop_value: Value to use for self-loops :returns: The matrix with self-loops """ shape = matrix.shape assert shape[0] == shape[1], "Error, matrix is not square" if isspmatrix(matrix): new_matrix = matrix.todok() else: new_matrix = matrix.copy() for i in range(shape[0]): new_matrix[i, i] = loop_value if isspmatrix(matrix): return new_matrix.tocsc() return new_matrix
[docs]def prune(matrix, threshold): """ Prune the matrix so that very small edges are removed. The maximum value in each column is never pruned. :param matrix: The matrix to be pruned :param threshold: The value below which edges will be removed :returns: The pruned matrix """ if isspmatrix(matrix): pruned = dok_matrix(matrix.shape) pruned[matrix >= threshold] = matrix[matrix >= threshold] pruned = pruned.tocsc() else: pruned = matrix.copy() pruned[pruned < threshold] = 0 # keep max value in each column. same behaviour for dense/sparse num_cols = matrix.shape[1] row_indices = matrix.argmax(axis=0).reshape((num_cols,)) col_indices = np.arange(num_cols) pruned[row_indices, col_indices] = matrix[row_indices, col_indices] return pruned
[docs]def converged(matrix1, matrix2): """ Check for convergence by determining if matrix1 and matrix2 are approximately equal. :param matrix1: The matrix to compare with matrix2 :param matrix2: The matrix to compare with matrix1 :returns: True if matrix1 and matrix2 approximately equal """ if isspmatrix(matrix1) or isspmatrix(matrix2): return sparse_allclose(matrix1, matrix2) return np.allclose(matrix1, matrix2)
[docs]def iterate(matrix, expansion, inflation): """ Run a single iteration (expansion + inflation) of the mcl algorithm :param matrix: The matrix to perform the iteration on :param expansion: Cluster expansion factor :param inflation: Cluster inflation factor """ # Expansion matrix = expand(matrix, expansion) # Inflation matrix = inflate(matrix, inflation) return matrix
[docs]def get_clusters(matrix): """ Retrieve the clusters from the matrix :param matrix: The matrix produced by the MCL algorithm :returns: A list of tuples where each tuple represents a cluster and contains the indices of the nodes belonging to the cluster """ if not isspmatrix(matrix): # cast to sparse so that we don't need to handle different # matrix types matrix = csc_matrix(matrix) # get the attractors - non-zero elements of the matrix diagonal attractors = matrix.diagonal().nonzero()[0] # somewhere to put the clusters clusters = set() # the nodes in the same row as each attractor form a cluster for attractor in attractors: cluster = tuple(matrix.getrow(attractor).nonzero()[1].tolist()) clusters.add(cluster) return sorted(list(clusters))
[docs]def run_mcl(matrix, expansion=2, inflation=2, loop_value=1, iterations=100, pruning_threshold=0.001, pruning_frequency=1, convergence_check_frequency=1, verbose=False): """ Perform MCL on the given similarity matrix :param matrix: The similarity matrix to cluster :param expansion: The cluster expansion factor :param inflation: The cluster inflation factor :param loop_value: Initialization value for self-loops :param iterations: Maximum number of iterations (actual number of iterations will be less if convergence is reached) :param pruning_threshold: Threshold below which matrix elements will be set set to 0 :param pruning_frequency: Perform pruning every 'pruning_frequency' iterations. :param convergence_check_frequency: Perform the check for convergence every convergence_check_frequency iterations :param verbose: Print extra information to the console :returns: The final matrix """ assert expansion > 1, "Invalid expansion parameter" assert inflation > 1, "Invalid inflation parameter" assert loop_value >= 0, "Invalid loop_value" assert iterations > 0, "Invalid number of iterations" assert pruning_threshold >= 0, "Invalid pruning_threshold" assert pruning_frequency > 0, "Invalid pruning_frequency" assert convergence_check_frequency > 0, "Invalid convergence_check_frequency" printer = MessagePrinter(verbose) printer.print("-" * 50) printer.print("MCL Parameters") printer.print("Expansion: {}".format(expansion)) printer.print("Inflation: {}".format(inflation)) if pruning_threshold > 0: printer.print("Pruning threshold: {}, frequency: {} iteration{}".format( pruning_threshold, pruning_frequency, "s" if pruning_frequency > 1 else "")) else: printer.print("No pruning") printer.print("Convergence check: {} iteration{}".format( convergence_check_frequency, "s" if convergence_check_frequency > 1 else "")) printer.print("Maximum iterations: {}".format(iterations)) printer.print("{} matrix mode".format("Sparse" if isspmatrix(matrix) else "Dense")) printer.print("-" * 50) # Initialize self-loops if loop_value > 0: matrix = add_self_loops(matrix, loop_value) # Normalize matrix = normalize(matrix) # iterations for i in range(iterations): printer.print("Iteration {}".format(i + 1)) # store current matrix for convergence checking last_mat = matrix.copy() # perform MCL expansion and inflation matrix = iterate(matrix, expansion, inflation) # prune if pruning_threshold > 0 and i % pruning_frequency == pruning_frequency - 1: printer.print("Pruning") matrix = prune(matrix, pruning_threshold) # Check for convergence if i % convergence_check_frequency == convergence_check_frequency - 1: printer.print("Checking for convergence") if converged(matrix, last_mat): printer.print("Converged after {} iteration{}".format(i + 1, "s" if i > 0 else "")) break printer.print("-" * 50) return matrix