MH-P3/src/local_search.py

111 lines
3.6 KiB
Python

from numpy.random import choice, seed, randint
from pandas import DataFrame
from multiprocessing import Pool
from functools import partial
from itertools import combinations
def get_row_distance(source, destination, data):
row = data.query(
"""(source == @source and destination == @destination) or \
(source == @destination and destination == @source)"""
)
return row["distance"].values[0]
def compute_distance(element, solution, data):
accumulator = 0
distinct_elements = solution.query(f"point != {element}")
for _, item in distinct_elements.iterrows():
accumulator += get_row_distance(
source=element,
destination=item.point,
data=data,
)
return accumulator
def get_first_random_solution(placeholder, n, m, data):
solution = DataFrame(columns=["point", "distance"])
seed(42)
solution["point"] = choice(n, size=m, replace=False)
solution["distance"] = solution["point"].apply(
func=compute_distance, solution=solution, data=data
)
return solution
def element_in_dataframe(solution, element):
duplicates = solution.query(f"point == {element}")
return not duplicates.empty
def replace_worst_element(previous, n, data):
solution = previous.copy()
worst_index = solution["distance"].astype(float).idxmin()
random_element = randint(n)
while element_in_dataframe(solution=solution, element=random_element):
random_element = randint(n)
solution["point"].loc[worst_index] = random_element
solution["distance"].loc[worst_index] = compute_distance(
element=solution["point"].loc[worst_index], solution=solution, data=data
)
return solution
def get_random_solution(previous, n, data):
solution = replace_worst_element(previous, n, data)
while solution["distance"].sum() <= previous["distance"].sum():
solution = replace_worst_element(previous=solution, n=n, data=data)
return solution
def explore_neighbourhood(element, n, data, max_iterations=100000):
neighbourhood = []
neighbourhood.append(element)
for _ in range(max_iterations):
previous_solution = neighbourhood[-1]
neighbour = get_random_solution(previous=previous_solution, n=n, data=data)
neighbourhood.append(neighbour)
return neighbour
def evaluate_solution(solution, data):
fitness = 0
comb = combinations(solution.index, r=2)
for index in list(comb):
elements = solution.loc[index, :]
fitness += get_row_distance(
source=elements["point"].head(n=1).values[0],
destination=elements["point"].tail(n=1).values[0],
data=data,
)
return fitness
def generate_initial_solutions(n, m, data, number_solutions, cores=4):
generation_func = partial(get_first_random_solution, n=n, m=m, data=data)
with Pool(cores) as pool:
initial_solutions = pool.map(generation_func, range(number_solutions))
return initial_solutions
def evaluate_all_solutions(solutions, data, cores=4):
generation_func = partial(evaluate_solution, data=data)
with Pool(cores) as pool:
fitness = pool.map(generation_func, solutions)
return fitness
def local_search(n, m, data, number_solutions=10):
initial_solutions = generate_initial_solutions(n, m, data, number_solutions)
solutions = []
for solution in initial_solutions:
local_best_solution = explore_neighbourhood(
element=solution, n=n, data=data, max_iterations=100
)
solutions.append(local_best_solution)
fitness = evaluate_all_solutions(solutions, data)
best_index = fitness.index(max(fitness))
return solutions[best_index]