from numpy import sum, append, intersect1d, array_equal from numpy.random import randint, choice, shuffle from pandas import DataFrame from math import ceil from functools import partial from multiprocessing import Pool from copy import deepcopy def get_row_distance(source, destination, data): row = data.query( """(source == @source and destination == @destination) or \ (source == @destination and destination == @source)""" ) return row["distance"].values[0] def compute_distance(element, individual, data): accumulator = 0 distinct_elements = individual.query(f"point != {element}") for _, item in distinct_elements.iterrows(): accumulator += get_row_distance( source=element, destination=item.point, data=data ) return accumulator def generate_individual(n, m, data): individual = DataFrame(columns=["point", "distance", "fitness"]) individual["point"] = choice(n, size=m, replace=False) individual["distance"] = individual["point"].apply( func=compute_distance, individual=individual, data=data ) return individual def evaluate_individual(individual, data): fitness = [] genotype = individual.point.values distances = data.query(f"source in @genotype and destination in @genotype") for item in genotype[:-1]: element_df = distances.query(f"source == {item} or destination == {item}") max_distance = element_df["distance"].astype(float).max() fitness = append(arr=fitness, values=max_distance) distances = distances.query(f"source != {item} and destination != {item}") individual["fitness"] = sum(fitness) return individual def select_distinct_genes(matching_genes, parents, m): first_parent = parents[0].query("point not in @matching_genes") second_parent = parents[1].query("point not in @matching_genes") cutoff = randint(m - len(matching_genes)) first_parent_genes = first_parent.point.values[cutoff:] second_parent_genes = second_parent.point.values[:cutoff] return first_parent_genes, second_parent_genes def select_shuffled_genes(matching_genes, parents): first_parent = parents[0].query("point not in @matching_genes") second_parent = parents[1].query("point not in @matching_genes") first_genes = first_parent.point.values second_genes = second_parent.point.values shuffle(first_genes) shuffle(second_genes) return first_genes, second_genes def select_random_parent(parents): random_index = randint(len(parents)) random_parent = parents[random_index] if random_parent.point.empty: opposite_index = 1 - random_index random_parent = parents[opposite_index] return random_parent def get_best_point(parents, offspring): while True: random_parent = deepcopy(select_random_parent(parents)) best_index = random_parent["distance"].idxmax() best_point = random_parent["point"].iloc[best_index] random_parent.drop(index=best_index, inplace=True) if best_point not in offspring.point.values: return best_point def repair_offspring(offspring, parents, m): while len(offspring) != m: if len(offspring) > m: best_index = offspring["distance"].idxmax() offspring.drop(index=best_index, inplace=True) elif len(offspring) < m: best_point = get_best_point(parents, offspring) offspring = offspring.append( {"point": best_point, "distance": 0, "fitness": 0}, ignore_index=True ) return offspring def get_matching_genes(parents): first_parent = parents[0].point.values second_parent = parents[1].point.values return intersect1d(first_parent, second_parent) def populate_offspring(values): offspring = DataFrame(columns=["point", "distance", "fitness"]) for element in values: aux = DataFrame(columns=["point", "distance", "fitness"]) aux["point"] = element offspring = offspring.append(aux) offspring["distance"] = 0 offspring["fitness"] = 0 return offspring def uniform_crossover(parents, m): matching_genes = get_matching_genes(parents) first_genes, second_genes = select_distinct_genes(matching_genes, parents, m) offspring = populate_offspring(values=[matching_genes, first_genes, second_genes]) viable_offspring = repair_offspring(offspring, parents, m) return viable_offspring def position_crossover(parents): matching_genes = get_matching_genes(parents) first_genes, second_genes = select_shuffled_genes(matching_genes, parents) first_offspring = populate_offspring(values=[matching_genes, first_genes]) second_offspring = populate_offspring(values=[matching_genes, second_genes]) return first_offspring, second_offspring def group_parents(parents): parent_pairs = [] for i in range(0, len(parents), 2): first = parents[i] second = parents[i + 1] if array_equal(first.point.values, second.point.values): tmp = second second = parents[i - 2] parents[i - 2] = tmp parent_pairs.append([first, second]) return parent_pairs def crossover(mode, parents, m, probability=0.7): parent_groups = group_parents(parents) offspring = [] if mode == "uniform": expected_crossovers = int(len(parents) * probability) cutoff = expected_crossovers // 2 for element in parent_groups[:cutoff]: offspring.append(uniform_crossover(element, m)) offspring.append(uniform_crossover(element, m)) for element in parent_groups[cutoff:]: offspring.append(element[0]) offspring.append(element[1]) else: for element in parent_groups: first_offspring, second_offspring = position_crossover(element) offspring.append(first_offspring) offspring.append(second_offspring) return offspring def element_in_dataframe(individual, element): duplicates = individual.query(f"point == {element}") return not duplicates.empty def select_new_gene(individual, n): while True: new_gene = randint(n) if not element_in_dataframe(individual=individual, element=new_gene): return new_gene def mutate(offspring, n, data, probability=0.001): expected_mutations = len(offspring) * n * probability individuals = [] genes = [] for _ in range(ceil(expected_mutations)): individuals.append(randint(len(offspring))) current_individual = individuals[-1] genes.append(offspring[current_individual].sample().index) for ind, gen in zip(individuals, genes): individual = offspring[ind] individual["point"].iloc[gen] = select_new_gene(individual, n) individual["distance"].iloc[gen] = compute_distance( element=individual["point"].iloc[gen].values[0], individual=individual, data=data, ) return offspring def get_individual_index(element, population): for index in range(len(population)): if population[index].fitness.values[0] == element.fitness.values[0]: return index def tournament_selection(population): individuals = [population[randint(len(population))] for _ in range(2)] best_element = max(individuals, key=lambda x: x.fitness.values[0]) population_index = get_individual_index(best_element, population) return best_element, population_index def check_element_population(element, population): for item in population: if array_equal(element.point.values, item.point.values): return True return False def generational_replacement(prev_population, current_population): new_population = current_population best_previous_individual = max(prev_population, key=lambda x: x.fitness.values[0]) if check_element_population(best_previous_individual, new_population): worst_element = min(new_population, key=lambda x: x.fitness.values[0]) worst_index = get_individual_index(worst_element, new_population) new_population[worst_index] = best_previous_individual return new_population def get_best_elements(population): select_population = deepcopy(population) first_element = max(select_population, key=lambda x: x.fitness.values[0]) first_index = get_individual_index(first_element, select_population) select_population.pop(first_index) second_element = max(select_population, key=lambda x: x.fitness.values[0]) second_index = get_individual_index(second_element, select_population) return first_index, second_index def get_worst_elements(population): select_population = deepcopy(population) first_element = min(select_population, key=lambda x: x.fitness.values[0]) first_index = get_individual_index(first_element, select_population) select_population.pop(first_index) second_element = min(select_population, key=lambda x: x.fitness.values[0]) second_index = get_individual_index(second_element, select_population) return first_index, second_index def stationary_replacement(prev_population, current_population): new_population = prev_population first_worst, second_worst = get_worst_elements(prev_population) first_best, second_best = get_best_elements(current_population) worst_indexes = [first_worst, second_worst] best_indexes = [first_best, second_best] for worst, best in zip(worst_indexes, best_indexes): if ( current_population[best].fitness.values[0] > prev_population[worst].fitness.values[0] ): new_population[worst] = current_population[best] return new_population def replace_population(prev_population, current_population, mode): if mode == "generational": return generational_replacement(prev_population, current_population) return stationary_replacement(prev_population, current_population) def evaluate_population(population, data, cores=4): fitness_func = partial(evaluate_individual, data=data) with Pool(cores) as pool: evaluated_population = pool.map(fitness_func, population) return evaluated_population def select_parents(population, n, mode): select_population = deepcopy(population) parents = [] if mode == "generational": for _ in range(n): element, index = tournament_selection(population=select_population) parents.append(element) select_population.pop(index) else: for _ in range(2): element, index = tournament_selection(population=select_population) parents.append(element) select_population.pop(index) return parents def genetic_algorithm(n, m, data, select_mode, crossover_mode, max_iterations=100000): population = [generate_individual(n, m, data) for _ in range(n)] population = evaluate_population(population, data) for _ in range(max_iterations): parents = select_parents(population, n, select_mode) offspring = crossover(crossover_mode, parents, m) offspring = mutate(offspring, n, data) population = replace_population(population, offspring, select_mode) population = evaluate_population(population, data) best_index, _ = get_best_elements(population) return population[best_index]