from numpy import sum, append, arange, delete, intersect1d from numpy.random import randint, choice, shuffle from pandas import DataFrame from math import ceil from functools import partial from multiprocessing import Pool from preprocessing import parse_file def get_row_distance(source, destination, data): row = data.query( """(source == @source and destination == @destination) or \ (source == @destination and destination == @source)""" ) return row["distance"].values[0] def compute_distance(element, individual, data): accumulator = 0 distinct_elements = individual.query(f"point != {element}") for _, item in distinct_elements.iterrows(): accumulator += get_row_distance( source=element, destination=item.point, data=data ) return accumulator def generate_individual(n, m, data): individual = DataFrame(columns=["point", "distance", "fitness"]) individual["point"] = choice(n, size=m, replace=False) individual["distance"] = individual["point"].apply( func=compute_distance, individual=individual, data=data ) return individual def evaluate_individual(individual, data): fitness = [] genotype = individual.point.values distances = data.query(f"source in @genotype and destination in @genotype") for item in genotype[:-1]: element_df = distances.query(f"source == {item} or destination == {item}") max_distance = element_df["distance"].astype(float).max() fitness = append(arr=fitness, values=max_distance) distances = distances.query(f"source != {item} and destination != {item}") individual["fitness"] = sum(fitness) return individual def select_distinct_genes(matching_genes, parents, m): first_parent = parents[0].query("point not in @matching_genes") second_parent = parents[1].query("point not in @matching_genes") cutoff = randint(m - len(matching_genes)) first_parent_genes = first_parent.point.values[cutoff:] second_parent_genes = second_parent.point.values[:cutoff] return first_parent_genes, second_parent_genes def select_random_genes(matching_genes, parents, m): random_parent = parents[randint(len(parents))] distinct_indexes = delete(arange(m), matching_genes) genes = random_parent.point.iloc[distinct_indexes].values shuffle(genes) return genes def select_random_parent(parents): random_index = randint(len(parents)) random_parent = parents[random_index] if random_parent.point.empty: opposite_index = 1 - random_index random_parent = parents[opposite_index] return random_parent def repair_offspring(offspring, parents, m): while len(offspring) != m: if len(offspring) > m: best_index = offspring["distance"].idxmax() offspring.drop(index=best_index, inplace=True) elif len(offspring) < m: # NOTE Refactor into its own function while True: random_parent = select_random_parent(parents) best_index = random_parent["distance"].idxmax() best_point = random_parent["point"].loc[best_index] random_parent.drop(index=best_index, inplace=True) if best_point not in offspring.point.values: break offspring = offspring.append( {"point": best_point, "distance": 0, "fitness": 0}, ignore_index=True ) return offspring def get_matching_genes(parents): first_parent = parents[0].point.values second_parent = parents[1].point.values return intersect1d(first_parent, second_parent) def populate_offspring(values): offspring = DataFrame(columns=["point", "distance", "fitness"]) for element in values: aux = DataFrame(columns=["point", "distance", "fitness"]) aux["point"] = element offspring = offspring.append(aux) offspring["distance"] = 0 offspring["fitness"] = 0 offspring = offspring[1:] return offspring def uniform_crossover(parents, m): matching_genes = get_matching_genes(parents) first_genes, second_genes = select_distinct_genes(matching_genes, parents, m) offspring = populate_offspring(values=[matching_genes, first_genes, second_genes]) viable_offspring = repair_offspring(offspring, parents, m) return viable_offspring def position_crossover(parents, m): matching_genes = get_matching_genes(parents) shuffled_genes = select_random_genes(matching_genes, parents, m) first_offspring = populate_offspring(values=[matching_genes, shuffled_genes]) second_offspring = populate_offspring(values=[matching_genes, shuffled_genes]) return [first_offspring, second_offspring] def crossover(mode, parents, m): split_parents = zip(*[iter(parents)] * 2) if mode == "uniform": crossover_func = partial(uniform_crossover, m=m) else: crossover_func = partial(position_crossover, m=m) offspring = [*map(crossover_func, split_parents)] return offspring def element_in_dataframe(individual, element): duplicates = individual.query(f"point == {element}") return not duplicates.empty def select_new_gene(individual, n): while True: new_gene = randint(n) if not element_in_dataframe(individual=individual, element=new_gene): return new_gene def mutate(offspring, data, probability=0.001): expected_mutations = len(offspring) * n * probability individuals = [] genes = [] for _ in range(ceil(expected_mutations)): individuals.append(randint(len(offspring))) current_individual = individuals[-1] genes.append(offspring[current_individual].sample().index) for ind, gen in zip(individuals, genes): individual = offspring[ind] individual["point"].iloc[gen] = select_new_gene(individual, n) individual["distance"].iloc[gen] = compute_distance( element=individual["point"].iloc[gen].values[0], individual=individual, data=data, ) return offspring def get_individual_index(population, element): for index in range(len(population)): if population[index].fitness.values[0] == element.fitness.values[0]: return index def tournament_selection(population): individuals = [population[randint(len(population))] for _ in range(2)] best_element = max(individuals, key=lambda x: x.fitness.values[0]) population_index = get_individual_index(population, best_element) return best_element, population_index def generational_replacement(previous_population, current_population): new_population = current_population best_previous_individual = max(previous_population, key=lambda x: all(x.fitness)) if best_previous_individual not in new_population: worst_index = new_population.index( min(new_population, key=lambda x: all(x.fitness)) ) new_population[worst_index] = best_previous_individual return new_population def get_best_elements(population): first_index = population.index(max(population, key=lambda x: all(x.fitness))) population.pop(first_index) second_index = population.index(max(population, key=lambda x: all(x.fitness))) return first_index, second_index def get_worst_elements(population): first_index = population.index(min(population, key=lambda x: all(x.fitness))) population.pop(first_index) second_index = population.index(min(population, key=lambda x: all(x.fitness))) return first_index, second_index def stationary_replacement(prev_population, current_population): new_population = prev_population worst_indexes = get_worst_elements(prev_population) best_indexes = get_best_elements(current_population) for worst, best in zip(worst_indexes, best_indexes): if current_population[best].fitness > prev_population[worst].fitness: new_population[worst] = current_population[best] return new_population def replace_population(prev_population, current_population, mode): if mode == "generational": return generational_replacement(prev_population, current_population) return stationary_replacement(prev_population, current_population) def evaluate_population(population, data, cores=4): fitness_func = partial(evaluate_individual, data=data) with Pool(cores) as pool: evaluated_population = pool.map(fitness_func, population) return evaluated_population def select_parents(population, n, mode): select_population = population parents = [] if mode == "generational": for _ in range(n): element, index = tournament_selection(population=select_population) parents.append(element) select_population.pop(index) else: for _ in range(2): element, index = tournament_selection(population=select_population) parents.append(element) select_population.pop(index) return parents def genetic_algorithm(n, m, data, select_mode, crossover_mode, max_iterations=100000): population = [generate_individual(n, m, data) for _ in range(n)] population = evaluate_population(population, data) for _ in range(max_iterations): parents = select_parents(population, n, select_mode) offspring = crossover(crossover_mode, parents, m) offspring = mutate(offspring, data) population = replace_population(population, offspring, select_mode) population = evaluate_population(population, data) best_solution, _ = get_best_elements(population) return best_solution n, m, data = parse_file("data/GKD-c_11_n500_m50.txt") genetic_algorithm( n=10, m=4, data=data, select_mode="generational", crossover_mode="uniform", max_iterations=1, )