Evolutionary Algorithm (EA) 2

Solve a small system with Evolutionary Algorithmes. Use decision encoded as inidies of available places. We use python module DEAP. See DEAP documentation for more information.

For this tutorial we need following modules

import copy
import random  # For seed.
import array
import csv
import multiprocessing

import numpy
from deap import base
from deap import creator
from deap import tools

import prp.recorder as recorder
import prp.xy as xy
import prp.utils as utils
from prp.solvers.simple import CostsType
import prp.solvers.evo_simple as evo_simple
import prp.solvers.evolution_only_free as evo
from prp.solvers.optimal_fixed import get_place_costs

Parameters

We define paths to JSON files of a small warehouse system

LAYOUT_FILE = "../data/10-layout.json"
INITIAL_STATE_FILE = "../data/10-initial-state.json"
DEPARTURES_FILE = "../data/10-departures.json"

and store the solution to a JSON file:

SOLUTION_FILE = "../data/solutions/10-evo-fo-solution.json"

We also store intermediate results to an CSV file. Set it to None if you do not want to create the CSV file.

CSV_FILE = "../data/10-evo-fo-intermediate-from-cheapest-place-results.csv"

We set parameters for the evolutionary algorithm. The evolutionary algorithms are random, use random.seed(…) to make results reproducible.

# Use normal random instead of numpy.random.seed(7) to get reproducible results.
random.seed(7)
POPULATION_SIZE = 100
MAX_GENERATIONS = 100000
MAX_NO_IMPROVEMENTS = 100

Place Order

Before we can use the evolutionary algorithm, we need to create a fixed order of places. Each \(k\)-th element of the genome with value \(i\) means: go to the \(i\)-th place when making \(k\)-th decision to send a pod to the storage. Note, \(k\) is usually smaller than decision time \(t\), because at the very beginning the system does not send pods to storage until the corresponding queue is full.

For this tutorial we sort places by their average costs. The cheapest places are in the front.

def get_place_ordered_by_avg_costs():
    wh = load_problem()
    avg_costs = get_place_costs(wh)
    places = list(avg_costs.keys())
    return list(sorted(places, key=lambda x: avg_costs[x]))
PLACE_ORDER = get_place_ordered_by_avg_costs()
print("Use place order: {}".format(PLACE_ORDER))

Calculate

Load a warehouse model from JSON files.

def load_problem():
    """Load a test system with 10 places and 10 pods randomly distributed among them."""
    layout = xy.Layout()
    with open(LAYOUT_FILE, 'r') as infile:
        layout.load_from_json(infile)
        warehouse = layout.get_empty_warehouse()
        costs = layout.get_costs()
        warehouse.set_costs(costs)
    with open(INITIAL_STATE_FILE, 'r') as infile:
        recorder.load_initial_state_from_json(infile, warehouse)
    with open(DEPARTURES_FILE, 'r') as infile:
        departures = recorder.load_departures_from_json(infile)
        warehouse.set_departure_generator(departures)
    return warehouse
warehouse = load_problem()

Use helper class evo_only_free.Helper with our place order to translate between the Pod Reposition Problems and an Evolutionary Problem.

helper = evo.Helper(warehouse, PLACE_ORDER)

An evolutionary algorithm maximizes fitness of individuals, but we want to minimize costs of individuals. To minimize the costs we define the fitness as FitnessMin := (-1)*costs.

creator.create("FitnessMin", base.Fitness, weights=(-1.0,))

Our individuals are sequence of signed integers (type = “l”), whose fitness is FitnessMin.

creator.create("Individual", array.array, typecode="l", fitness=creator.FitnessMin)

Setup DEAP solver.

toolbox = base.Toolbox()

Create initial population from a cheapest-place solution.

initial_solution = helper.cheapest_place_solution(CostsType.DECISION)
toolbox.register("initialSolution", copy.deepcopy, initial_solution)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.initialSolution)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

Add rules for mating. Use two-point random crossover.

toolbox.register("mate", tools.cxTwoPoint)

Add rules for mutation. Select uniform mutation in such a way that on average there is 3 mutation in an individual.

toolbox.register("mutate", tools.mutUniformInt, low=helper.min_action(),
                 up=helper.max_action(), indpb=3.0 / len(initial_solution))

Use helper.evaluate() to calculate fitness. This method returns average costs.

toolbox.register("evaluate", helper.evaluate)

Use tournament based selection. That means, 100 times select random 3 and add the best one to the new population.

toolbox.register("select", tools.selTournament, tournsize=3)

Before we start optimization, we can print out the initial solution and its fitness (=costs).

ind1 = toolbox.individual()
print(ind1)
print("Initial fitness: {}".format(helper.evaluate(initial_solution)))

Generate initial population with POPULATION_SIZE individuals.

pop = toolbox.population(n=POPULATION_SIZE)

Store the best individual after each new population. At the end the hall of fame will contain only one individual with the best fitness.

hof = tools.HallOfFame(1)

Store some population statistics.

stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("Avg", numpy.mean)
stats.register("Std", numpy.std)
stats.register("Min", numpy.min)
T = len(initial_solution)
stats.register("MinTotal", lambda x: numpy.min(x) * T)  # Minimal total costs and not average.
stats.register("Max", numpy.max)

Optionally switch on multiprocessing.

pool = multiprocessing.Pool()
toolbox.register("map", pool.map)

Start calculation.

pop, logbook = evo_simple.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=MAX_GENERATIONS,
                                   maxnoimprovments=MAX_NO_IMPROVEMENTS,
                                   stats=stats, halloffame=hof, verbose=True)

Store intermediate results to CSV_FILE.

if CSV_FILE is not None:
    with open(CSV_FILE, 'w+') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=logbook.header)
        writer.writeheader()
        writer.writerows(logbook[0:len(logbook)])
        csvfile.close()

Print out the best solution and save it to a JSON file.

print(hof)
print(hof[0].fitness.valid)
print(hof[0].fitness)
utils.create_missing_directories_of_file(SOLUTION_FILE)
with open(SOLUTION_FILE, 'w') as outfile:
    recorder.store_solution_to_json(solution, outfile)