Source code for nlgm.optimizers
import numpy as np
import random
[docs]
class RandomWalkOptimizer:
    """
    RandomWalkOptimizer class for optimizing a given objective function using random walk algorithm.
    Args:
        graph (numpy.ndarray): The adjacency matrix of the graph.
        signatures (list): List of signatures corresponding to each node in the graph.
        start (int): The starting node for the random walk.
        criterion (str): The criterion for optimization.
    Attributes:
        graph (numpy.ndarray): The adjacency matrix of the graph.
        start (int): The starting node for the random walk.
        signatures (list): List of signatures corresponding to each node in the graph.
        criterion (str): The criterion for optimization.
    Methods:
        optimize(objective, max_iters, callback=None): Optimizes the objective function using random walk algorithm.
        normalize_graph(): Normalizes the graph by dividing each row by its sum.
        optimize_with_backtracking(objective, max_iters, callback=None): Optimizes the objective function using random walk algorithm with backtracking.
    """
    def __init__(self, graph, signatures, start, criterion):
        if graph.shape[0] != graph.shape[1]:
            raise ValueError("Graph adjacency matrix dimensions must be square.")
        self.graph = graph
        self.start = start
        self.signatures = signatures
        self.criterion = criterion
[docs]
    def optimize(self, objective, max_iters, callback=None):
        """
        Optimizes the objective function using random walk algorithm.
        Args:
            objective (function): The objective function to be optimized.
            max_iters (int): The maximum number of iterations for the optimization.
            callback (function, optional): A callback function to be called after each iteration. Defaults to None.
        Returns:
            tuple: A tuple containing the best node, metric, and loss.
        """
        visited = set()
        current_node = self.start
        best_node = (None, float("inf"), float("inf"))
        it = 0
        while it < max_iters:
            print("--------------------")
            print("Iteration: " + str(it + 1))
            print("--------------------")
            # if current_node in visited:
            #     break
            visited.add(current_node)
            # test loss = index 1
            metric, loss = objective(self.signatures[current_node])
            if loss < best_node[2]:
                best_node = (self.signatures[current_node], metric, loss)
            if callback:
                callback(self.signatures[current_node], (metric, loss))
            neighbors = np.where(self.graph[current_node] > 0)[0]
            if len(neighbors) == 0:
                break
            current_node = random.choice(neighbors)
            it += 1
        return best_node 
[docs]
    def normalize_graph(self):
        """
        Normalizes the graph by dividing each row by its sum.
        """
        rowsums = self.graph.sum(axis=1)
        self.graph = self.graph / rowsums[:, np.newaxis] 
[docs]
    def optimize_with_backtracking(self, objective, max_iters, callback=None):
        """
        Optimizes the objective function using random walk algorithm with backtracking.
        Args:
            objective (function): The objective function to be optimized.
            max_iters (int): The maximum number of iterations for the optimization.
            callback (function, optional): A callback function to be called after each iteration. Defaults to None.
        Returns:
            tuple: A tuple containing the best node, metric, and loss.
        """
        visited = set()
        current_node = self.start
        best_node = (None, float("inf"), float("inf"))
        it = 0
        path = []  # to keep track of the path taken
        while it < max_iters:
            if current_node in visited:
                if path:
                    current_node = path.pop()
                else:
                    break
                continue
            print("--------------------")
            print("Iteration: " + str(it + 1))
            print("--------------------")
            visited.add(current_node)
            path.append(current_node)
            metric, loss = objective(self.signatures[current_node])
            if loss < best_node[2]:
                best_node = (self.signatures[current_node], metric, loss)
            if callback:
                callback(self.signatures[current_node], (metric, loss))
            neighbors = np.where(self.graph[current_node] > 0)[0]
            # filter out visited neighbors
            neighbors = [node for node in neighbors if node not in visited]
            if not neighbors:
                path.pop()
                if path:
                    # backtrack to the previous node if there are no unvisited neighbors
                    current_node = path[-1]
                else:
                    break
            else:
                current_node = random.choice(neighbors)
            it += 1
        return best_node