MH-P2/src/genetic_algorithm.py

304 lines
11 KiB
Python

from numpy import intersect1d, array_equal
from numpy.random import randint, choice, shuffle
from pandas import DataFrame
from math import ceil
from functools import partial
from multiprocessing import Pool
from copy import deepcopy
from itertools import combinations
def get_row_distance(source, destination, data):
row = data.query(
"""(source == @source and destination == @destination) or \
(source == @destination and destination == @source)"""
)
return row["distance"].values[0]
def compute_distance(element, individual, data):
accumulator = 0
distinct_elements = individual.query(f"point != {element}")
for _, item in distinct_elements.iterrows():
accumulator += get_row_distance(
source=element, destination=item.point, data=data
)
return accumulator
def generate_individual(n, m, data):
individual = DataFrame(columns=["point", "distance", "fitness"])
individual["point"] = choice(n, size=m, replace=False)
individual["distance"] = individual["point"].apply(
func=compute_distance, individual=individual, data=data
)
return individual
def evaluate_individual(individual, data):
fitness = 0
comb = combinations(individual.index, r=2)
for index in list(comb):
elements = individual.loc[index, :]
fitness += get_row_distance(
source=elements["point"].head(n=1).values[0],
destination=elements["point"].tail(n=1).values[0],
data=data,
)
individual["fitness"] = fitness
return individual
def select_distinct_genes(matching_genes, parents, m):
first_parent = parents[0].query("point not in @matching_genes")
second_parent = parents[1].query("point not in @matching_genes")
cutoff = randint(m - len(matching_genes) + 1)
first_parent_genes = first_parent.point.values[cutoff:]
second_parent_genes = second_parent.point.values[:cutoff]
return first_parent_genes, second_parent_genes
def select_shuffled_genes(matching_genes, parents):
first_parent = parents[0].query("point not in @matching_genes")
second_parent = parents[1].query("point not in @matching_genes")
first_genes = first_parent.point.values
second_genes = second_parent.point.values
shuffle(first_genes)
shuffle(second_genes)
return first_genes, second_genes
def select_random_parent(parents):
random_index = randint(len(parents))
random_parent = parents[random_index]
if random_parent.point.empty:
opposite_index = 1 - random_index
random_parent = parents[opposite_index]
return random_parent
def get_best_point(parents, offspring):
while True:
random_parent = deepcopy(select_random_parent(parents))
best_index = random_parent["distance"].idxmax()
best_point = random_parent["point"].iloc[best_index]
random_parent.drop(index=best_index, inplace=True)
if best_point not in offspring.point.values:
return best_point
def repair_offspring(offspring, parents, m):
while len(offspring) != m:
if len(offspring) > m:
best_index = offspring["distance"].idxmax()
offspring.drop(index=best_index, inplace=True)
elif len(offspring) < m:
best_point = get_best_point(parents, offspring)
offspring = offspring.append(
{"point": best_point, "distance": 0, "fitness": 0}, ignore_index=True
)
return offspring
def get_matching_genes(parents):
first_parent = parents[0].point.values
second_parent = parents[1].point.values
return intersect1d(first_parent, second_parent)
def populate_offspring(values):
offspring = DataFrame(columns=["point", "distance", "fitness"])
for element in values:
aux = DataFrame(columns=["point", "distance", "fitness"])
aux["point"] = element
offspring = offspring.append(aux)
offspring["distance"] = 0
offspring["fitness"] = 0
return offspring
def uniform_crossover(parents, m):
matching_genes = get_matching_genes(parents)
first_genes, second_genes = select_distinct_genes(matching_genes, parents, m)
offspring = populate_offspring(values=[matching_genes, first_genes, second_genes])
viable_offspring = repair_offspring(offspring, parents, m)
return viable_offspring
def position_crossover(parents):
matching_genes = get_matching_genes(parents)
first_genes, second_genes = select_shuffled_genes(matching_genes, parents)
first_offspring = populate_offspring(values=[matching_genes, first_genes])
second_offspring = populate_offspring(values=[matching_genes, second_genes])
return first_offspring, second_offspring
def group_parents(parents):
parent_pairs = []
for i in range(0, len(parents), 2):
first = parents[i]
second = parents[i + 1]
if array_equal(first.point.values, second.point.values):
random_index = randint(i + 1)
second, parents[random_index] = parents[random_index], second
parent_pairs.append([first, second])
return parent_pairs
def crossover(mode, parents, m, probability=0.7):
parent_groups = group_parents(parents)
offspring = []
if mode == "uniform":
expected_crossovers = int(len(parents) * probability)
cutoff = expected_crossovers // 2
for element in parent_groups[:cutoff]:
offspring.append(uniform_crossover(element, m))
offspring.append(uniform_crossover(element, m))
for element in parent_groups[cutoff:]:
offspring.append(element[0])
offspring.append(element[1])
else:
for element in parent_groups:
first_offspring, second_offspring = position_crossover(element)
offspring.append(first_offspring)
offspring.append(second_offspring)
return offspring
def element_in_dataframe(individual, element):
duplicates = individual.query(f"point == {element}")
return not duplicates.empty
def select_new_gene(individual, n):
while True:
new_gene = randint(n)
if not element_in_dataframe(individual=individual, element=new_gene):
return new_gene
def mutate(offspring, n, data, probability=0.001):
expected_mutations = len(offspring) * n * probability
individuals = []
genes = []
for _ in range(ceil(expected_mutations)):
individuals.append(randint(len(offspring)))
current_individual = individuals[-1]
genes.append(offspring[current_individual].sample().index)
for ind, gen in zip(individuals, genes):
individual = offspring[ind]
individual["point"].iloc[gen] = select_new_gene(individual, n)
individual["distance"].iloc[gen] = compute_distance(
element=individual["point"].iloc[gen].values[0],
individual=individual,
data=data,
)
return offspring
def get_individual_index(element, population):
for index in range(len(population)):
if population[index].fitness.values[0] == element.fitness.values[0]:
return index
def tournament_selection(population):
individuals = [population[randint(len(population))] for _ in range(2)]
best_element = max(individuals, key=lambda x: x.fitness.values[0])
population_index = get_individual_index(best_element, population)
return best_element, population_index
def check_element_population(element, population):
for item in population:
if array_equal(element.point.values, item.point.values):
return True
return False
def generational_replacement(prev_population, current_population):
new_population = current_population
best_previous_individual = max(prev_population, key=lambda x: x.fitness.values[0])
if check_element_population(best_previous_individual, new_population):
worst_element = min(new_population, key=lambda x: x.fitness.values[0])
worst_index = get_individual_index(worst_element, new_population)
new_population[worst_index] = best_previous_individual
return new_population
def get_best_elements(population):
select_population = deepcopy(population)
first_element = max(select_population, key=lambda x: x.fitness.values[0])
first_index = get_individual_index(first_element, select_population)
select_population.pop(first_index)
second_element = max(select_population, key=lambda x: x.fitness.values[0])
second_index = get_individual_index(second_element, select_population)
return first_index, second_index
def get_worst_elements(population):
select_population = deepcopy(population)
first_element = min(select_population, key=lambda x: x.fitness.values[0])
first_index = get_individual_index(first_element, select_population)
select_population.pop(first_index)
second_element = min(select_population, key=lambda x: x.fitness.values[0])
second_index = get_individual_index(second_element, select_population)
return first_index, second_index
def stationary_replacement(prev_population, current_population):
new_population = prev_population
first_worst, second_worst = get_worst_elements(prev_population)
first_best, second_best = get_best_elements(current_population)
worst_indexes = [first_worst, second_worst]
best_indexes = [first_best, second_best]
for worst, best in zip(worst_indexes, best_indexes):
if (
current_population[best].fitness.values[0]
> prev_population[worst].fitness.values[0]
):
new_population[worst] = current_population[best]
return new_population
def replace_population(prev_population, current_population, mode):
if mode == "generational":
return generational_replacement(prev_population, current_population)
return stationary_replacement(prev_population, current_population)
def evaluate_population(population, data, cores=4):
fitness_func = partial(evaluate_individual, data=data)
with Pool(cores) as pool:
evaluated_population = pool.map(fitness_func, population)
return evaluated_population
def select_parents(population, n, mode):
select_population = deepcopy(population)
parents = []
if mode == "generational":
for _ in range(n):
element, index = tournament_selection(population=select_population)
parents.append(element)
select_population.pop(index)
else:
for _ in range(2):
element, index = tournament_selection(population=select_population)
parents.append(element)
select_population.pop(index)
return parents
def genetic_algorithm(n, m, data, select_mode, crossover_mode, max_iterations=100000):
population = [generate_individual(n, m, data) for _ in range(n)]
population = evaluate_population(population, data)
for _ in range(max_iterations):
parents = select_parents(population, n, select_mode)
offspring = crossover(crossover_mode, parents, m)
offspring = mutate(offspring, n, data)
population = replace_population(population, offspring, select_mode)
population = evaluate_population(population, data)
best_index, _ = get_best_elements(population)
return population[best_index]