Source code for nlgm.optimizers
import numpy as np
import random
[docs]
class RandomWalkOptimizer:
"""
RandomWalkOptimizer class for optimizing a given objective function using random walk algorithm.
Args:
graph (numpy.ndarray): The adjacency matrix of the graph.
signatures (list): List of signatures corresponding to each node in the graph.
start (int): The starting node for the random walk.
criterion (str): The criterion for optimization.
Attributes:
graph (numpy.ndarray): The adjacency matrix of the graph.
start (int): The starting node for the random walk.
signatures (list): List of signatures corresponding to each node in the graph.
criterion (str): The criterion for optimization.
Methods:
optimize(objective, max_iters, callback=None): Optimizes the objective function using random walk algorithm.
normalize_graph(): Normalizes the graph by dividing each row by its sum.
optimize_with_backtracking(objective, max_iters, callback=None): Optimizes the objective function using random walk algorithm with backtracking.
"""
def __init__(self, graph, signatures, start, criterion):
if graph.shape[0] != graph.shape[1]:
raise ValueError("Graph adjacency matrix dimensions must be square.")
self.graph = graph
self.start = start
self.signatures = signatures
self.criterion = criterion
[docs]
def optimize(self, objective, max_iters, callback=None):
"""
Optimizes the objective function using random walk algorithm.
Args:
objective (function): The objective function to be optimized.
max_iters (int): The maximum number of iterations for the optimization.
callback (function, optional): A callback function to be called after each iteration. Defaults to None.
Returns:
tuple: A tuple containing the best node, metric, and loss.
"""
visited = set()
current_node = self.start
best_node = (None, float("inf"), float("inf"))
it = 0
while it < max_iters:
print("--------------------")
print("Iteration: " + str(it + 1))
print("--------------------")
# if current_node in visited:
# break
visited.add(current_node)
# test loss = index 1
metric, loss = objective(self.signatures[current_node])
if loss < best_node[2]:
best_node = (self.signatures[current_node], metric, loss)
if callback:
callback(self.signatures[current_node], (metric, loss))
neighbors = np.where(self.graph[current_node] > 0)[0]
if len(neighbors) == 0:
break
current_node = random.choice(neighbors)
it += 1
return best_node
[docs]
def normalize_graph(self):
"""
Normalizes the graph by dividing each row by its sum.
"""
rowsums = self.graph.sum(axis=1)
self.graph = self.graph / rowsums[:, np.newaxis]
[docs]
def optimize_with_backtracking(self, objective, max_iters, callback=None):
"""
Optimizes the objective function using random walk algorithm with backtracking.
Args:
objective (function): The objective function to be optimized.
max_iters (int): The maximum number of iterations for the optimization.
callback (function, optional): A callback function to be called after each iteration. Defaults to None.
Returns:
tuple: A tuple containing the best node, metric, and loss.
"""
visited = set()
current_node = self.start
best_node = (None, float("inf"), float("inf"))
it = 0
path = [] # to keep track of the path taken
while it < max_iters:
if current_node in visited:
if path:
current_node = path.pop()
else:
break
continue
print("--------------------")
print("Iteration: " + str(it + 1))
print("--------------------")
visited.add(current_node)
path.append(current_node)
metric, loss = objective(self.signatures[current_node])
if loss < best_node[2]:
best_node = (self.signatures[current_node], metric, loss)
if callback:
callback(self.signatures[current_node], (metric, loss))
neighbors = np.where(self.graph[current_node] > 0)[0]
# filter out visited neighbors
neighbors = [node for node in neighbors if node not in visited]
if not neighbors:
path.pop()
if path:
# backtrack to the previous node if there are no unvisited neighbors
current_node = path[-1]
else:
break
else:
current_node = random.choice(neighbors)
it += 1
return best_node