from numpy import sum, append, intersect1d from numpy.random import randint, choice, shuffle from pandas import DataFrame from math import ceil from functools import partial from multiprocessing import Pool from copy import deepcopy from preprocessing import parse_file def get_row_distance(source, destination, data): row = data.query( """(source == @source and destination == @destination) or \ (source == @destination and destination == @source)""" ) return row["distance"].values[0] def compute_distance(element, individual, data): accumulator = 0 distinct_elements = individual.query(f"point != {element}") for _, item in distinct_elements.iterrows(): accumulator += get_row_distance( source=element, destination=item.point, data=data ) return accumulator def generate_individual(n, m, data): individual = DataFrame(columns=["point", "distance", "fitness"]) individual["point"] = choice(n, size=m, replace=False) individual["distance"] = individual["point"].apply( func=compute_distance, individual=individual, data=data ) return individual def evaluate_individual(individual, data): fitness = [] genotype = individual.point.values distances = data.query(f"source in @genotype and destination in @genotype") for item in genotype[:-1]: element_df = distances.query(f"source == {item} or destination == {item}") max_distance = element_df["distance"].astype(float).max() fitness = append(arr=fitness, values=max_distance) distances = distances.query(f"source != {item} and destination != {item}") individual["fitness"] = sum(fitness) return individual def select_distinct_genes(matching_genes, parents, m): first_parent = parents[0].query("point not in @matching_genes") second_parent = parents[1].query("point not in @matching_genes") cutoff = randint(m - len(matching_genes)) first_parent_genes = first_parent.point.values[cutoff:] second_parent_genes = second_parent.point.values[:cutoff] return first_parent_genes, second_parent_genes def select_shuffled_genes(matching_genes, parents): first_parent = parents[0].query("point not in @matching_genes") second_parent = parents[1].query("point not in @matching_genes") first_genes = first_parent.point.values second_genes = second_parent.point.values shuffle(first_genes) shuffle(second_genes) return first_genes, second_genes def select_random_parent(parents): random_index = randint(len(parents)) random_parent = parents[random_index] if random_parent.point.empty: opposite_index = 1 - random_index random_parent = parents[opposite_index] return random_parent def get_best_point(parents, offspring): while True: random_parent = deepcopy(select_random_parent(parents)) best_index = random_parent["distance"].idxmax() best_point = random_parent["point"].iloc[best_index] random_parent.drop(index=best_index, inplace=True) if best_point not in offspring.point.values: return best_point def repair_offspring(offspring, parents, m): while len(offspring) != m: if len(offspring) > m: best_index = offspring["distance"].idxmax() offspring.drop(index=best_index, inplace=True) elif len(offspring) < m: best_point = get_best_point(parents, offspring) offspring = offspring.append( {"point": best_point, "distance": 0, "fitness": 0}, ignore_index=True ) return offspring def get_matching_genes(parents): first_parent = parents[0].point.values second_parent = parents[1].point.values return intersect1d(first_parent, second_parent) def populate_offspring(values): offspring = DataFrame(columns=["point", "distance", "fitness"]) for element in values: aux = DataFrame(columns=["point", "distance", "fitness"]) aux["point"] = element offspring = offspring.append(aux) offspring["distance"] = 0 offspring["fitness"] = 0 return offspring def uniform_crossover(parents, m): matching_genes = get_matching_genes(parents) first_genes, second_genes = select_distinct_genes(matching_genes, parents, m) offspring = populate_offspring(values=[matching_genes, first_genes, second_genes]) viable_offspring = repair_offspring(offspring, parents, m) return viable_offspring def position_crossover(parents): matching_genes = get_matching_genes(parents) first_genes, second_genes = select_shuffled_genes(matching_genes, parents) first_offspring = populate_offspring(values=[matching_genes, first_genes]) second_offspring = populate_offspring(values=[matching_genes, second_genes]) return first_offspring, second_offspring def crossover(mode, parents, m): split_parents = list(zip(*[iter(parents)] * 2)) offspring = [] if mode == "uniform": for element in split_parents: offspring.append(uniform_crossover(element, m)) offspring.append(uniform_crossover(element, m)) else: for element in split_parents: first_offspring, second_offspring = position_crossover(element) offspring.append(first_offspring) offspring.append(second_offspring) return offspring def element_in_dataframe(individual, element): duplicates = individual.query(f"point == {element}") return not duplicates.empty def select_new_gene(individual, n): while True: new_gene = randint(n) if not element_in_dataframe(individual=individual, element=new_gene): return new_gene def mutate(offspring, data, probability=0.001): expected_mutations = len(offspring) * n * probability individuals = [] genes = [] for _ in range(ceil(expected_mutations)): individuals.append(randint(len(offspring))) current_individual = individuals[-1] genes.append(offspring[current_individual].sample().index) for ind, gen in zip(individuals, genes): individual = offspring[ind] individual["point"].iloc[gen] = select_new_gene(individual, n) individual["distance"].iloc[gen] = compute_distance( element=individual["point"].iloc[gen].values[0], individual=individual, data=data, ) return offspring def get_individual_index(element, population): for index in range(len(population)): if population[index].fitness.values[0] == element.fitness.values[0]: return index def tournament_selection(population): individuals = [population[randint(len(population))] for _ in range(2)] best_element = max(individuals, key=lambda x: x.fitness.values[0]) population_index = get_individual_index(best_element, population) return best_element, population_index def check_element_population(element, population): for item in population: if all(element.point.values) == all(item.point.values): return True return False def generational_replacement(prev_population, current_population): new_population = current_population best_previous_individual = max(prev_population, key=lambda x: x.fitness.values[0]) if check_element_population(best_previous_individual, new_population): worst_element = min(new_population, key=lambda x: x.fitness.values[0]) worst_index = get_individual_index(worst_element, new_population) new_population[worst_index] = best_previous_individual return new_population def get_best_elements(population): select_population = deepcopy(population) first_element = max(select_population, key=lambda x: x.fitness.values[0]) first_index = get_individual_index(first_element, select_population) select_population.pop(first_index) second_element = max(select_population, key=lambda x: x.fitness.values[0]) second_index = get_individual_index(second_element, select_population) return first_index, second_index def get_worst_elements(population): select_population = deepcopy(population) first_element = min(select_population, key=lambda x: x.fitness.values[0]) first_index = get_individual_index(first_element, select_population) select_population.pop(first_index) second_element = min(select_population, key=lambda x: x.fitness.values[0]) second_index = get_individual_index(second_element, select_population) return first_index, second_index def stationary_replacement(prev_population, current_population): new_population = prev_population first_worst, second_worst = get_worst_elements(prev_population) first_best, second_best = get_best_elements(current_population) worst_indexes = [first_worst, second_worst] best_indexes = [first_best, second_best] for worst, best in zip(worst_indexes, best_indexes): if ( current_population[best].fitness.values[0] > prev_population[worst].fitness.values[0] ): new_population[worst] = current_population[best] return new_population def replace_population(prev_population, current_population, mode): if mode == "generational": return generational_replacement(prev_population, current_population) return stationary_replacement(prev_population, current_population) def evaluate_population(population, data, cores=4): fitness_func = partial(evaluate_individual, data=data) with Pool(cores) as pool: evaluated_population = pool.map(fitness_func, population) return evaluated_population def select_parents(population, n, mode): select_population = deepcopy(population) parents = [] if mode == "generational": for _ in range(n): element, index = tournament_selection(population=select_population) parents.append(element) select_population.pop(index) else: for _ in range(2): element, index = tournament_selection(population=select_population) parents.append(element) select_population.pop(index) return parents def genetic_algorithm(n, m, data, select_mode, crossover_mode, max_iterations=100000): population = [generate_individual(n, m, data) for _ in range(n)] population = evaluate_population(population, data) for _ in range(max_iterations): parents = select_parents(population, n, select_mode) offspring = crossover(crossover_mode, parents, m) offspring = mutate(offspring, data) population = replace_population(population, offspring, select_mode) population = evaluate_population(population, data) best_index, _ = get_best_elements(population) return population[best_index] n, m, data = parse_file("data/GKD-c_11_n500_m50.txt") genetic_algorithm( n=10, m=4, data=data, select_mode="generational", crossover_mode="uniform", max_iterations=10, )