Source code for nlgm.optimizers

import numpy as np
import random


[docs] class RandomWalkOptimizer: """ Random-walk optimizer over a graph of candidate signatures. Parameters ---------- graph : numpy.ndarray Square adjacency matrix. signatures : list Signature values indexed by graph node. start : int Starting node index. criterion : str Optimization criterion label. """ def __init__(self, graph, signatures, start, criterion): if graph.shape[0] != graph.shape[1]: raise ValueError("Graph adjacency matrix dimensions must be square.") self.graph = graph self.start = start self.signatures = signatures self.criterion = criterion
[docs] def optimize(self, objective, max_iters, callback=None): """ Optimize objective with random-neighbor traversal. Parameters ---------- objective : callable Function taking a signature and returning ``(metric, loss)``. max_iters : int Maximum number of walk iterations. callback : callable, optional Callback invoked as ``callback(signature, (metric, loss))``. Returns ------- tuple Best result as ``(signature, metric, loss)``. """ visited = set() current_node = self.start best_node = (None, float("inf"), float("inf")) it = 0 while it < max_iters: print("--------------------") print("Iteration: " + str(it + 1)) print("--------------------") # if current_node in visited: # break visited.add(current_node) # test loss = index 1 metric, loss = objective(self.signatures[current_node]) if loss < best_node[2]: best_node = (self.signatures[current_node], metric, loss) if callback: callback(self.signatures[current_node], (metric, loss)) neighbors = np.where(self.graph[current_node] > 0)[0] if len(neighbors) == 0: break current_node = random.choice(neighbors) it += 1 return best_node
[docs] def normalize_graph(self): """ Normalize transition weights row-wise. """ rowsums = self.graph.sum(axis=1) self.graph = self.graph / rowsums[:, np.newaxis]
[docs] def optimize_with_backtracking(self, objective, max_iters, callback=None): """ Optimize objective using random walk with path backtracking. Parameters ---------- objective : callable Function taking a signature and returning ``(metric, loss)``. max_iters : int Maximum number of walk iterations. callback : callable, optional Callback invoked as ``callback(signature, (metric, loss))``. Returns ------- tuple Best result as ``(signature, metric, loss)``. """ visited = set() current_node = self.start best_node = (None, float("inf"), float("inf")) it = 0 path = [] # to keep track of the path taken while it < max_iters: if current_node in visited: if path: current_node = path.pop() else: break continue print("--------------------") print("Iteration: " + str(it + 1)) print("--------------------") visited.add(current_node) path.append(current_node) metric, loss = objective(self.signatures[current_node]) if loss < best_node[2]: best_node = (self.signatures[current_node], metric, loss) if callback: callback(self.signatures[current_node], (metric, loss)) neighbors = np.where(self.graph[current_node] > 0)[0] # filter out visited neighbors neighbors = [node for node in neighbors if node not in visited] if not neighbors: path.pop() if path: # backtrack to the previous node if there are no unvisited neighbors current_node = path[-1] else: break else: current_node = random.choice(neighbors) it += 1 return best_node