I'm working on a project accelerating the execution of a Genetic Algorithm both on multiple cores and on a GPU. The algorithm is specifically suited for the solution of the Traveling Salesman Problem in a 2D symmetric Euclidean space.
I've put all the functions in place, starting with the development of the CUDA version for the GPU and following with a single core simple implementation and since there, as expected, the speed-up was notable and consistent, scaling really good with the total number of individuals used in natural selection.
I then decided to compare both the versions with a Multi-core implementation.
Long story short, My multithreaded versions run really slow on Ubuntu 22.04.4, but on Windows 10, the same code is almost perfect and scales surprisingly good with increasing threads. Here a reproducible snippet of code (48_cities.txt file needed):
#include <iostream>
#include <thread>
#include <chrono>
#include <time.h>
#include <math.h>
#define pathSize 48
#define popSize 64000
#define subPopSize 32
#define threads 4
void random_shuffle(int *population, int sub_pop_num){
for (int i = 0; i < sub_pop_num * subPopSize; i++){
// Avoid last thread to overflow the islands (sub-populations)
if (i >= popSize) break;
// shuffle the ith chromosome
int gene;
for (int j = 0; j < pathSize; j++){
int idx1 = (int)((rand()/(RAND_MAX+1.0)) * pathSize);
int idx2 = (int)((rand()/(RAND_MAX+1.0)) * pathSize);
gene = population[i*pathSize + idx1];
population[i*pathSize + idx1] = population[i*pathSize + idx2];
population[i*pathSize + idx2] = gene;
}
}
}
double calculate_distance(int *chromosome, double *distance_matrix){
double distance = 0.0;
for (int i = 0; i < pathSize-1; i++){
distance += distance_matrix[chromosome[i]*pathSize + chromosome[i+1]];
}
distance += distance_matrix[chromosome[pathSize-1]*pathSize + chromosome[0]];
return distance;
}
void calculate_scores(int *population, double *distance_matrix, double *population_fitness, double *population_distances, int sub_pop_num){
for (int i = 0; i < sub_pop_num * subPopSize; i++){
// Avoid last thread to overflow the islands (sub-populations)
if (i >= popSize) break;
population_distances[i] = calculate_distance(population + i * pathSize, distance_matrix);
population_fitness[i] = 1000/population_distances[i];
}
}
void load_data(int *coordinates, const char *filename){
// read filename
FILE *file = fopen(filename, "r");
if (file == NULL){
printf("Error opening file %s\n", filename);
exit(1);
}
int i = 0;
while (fscanf(file, "%d %d", &coordinates[i*2], &coordinates[i*2+1]) != EOF){
i++;
}
}
int main(){
auto start = std::chrono::high_resolution_clock::now();
//-------------------------------------------------
// Load the coordinates of the cities from file
//-------------------------------------------------
int *path_coordinates = (int*)malloc(pathSize * 2 * sizeof(int)); //[pathSize][2];
load_data(path_coordinates, "48_cities.txt");
//-----------------------------------------
// Allocate and fill the distance matrix
//-----------------------------------------
double *distance_matrix = (double*)malloc(pathSize*pathSize*sizeof(double)); //[pathSize][pathSize];
for(int i = 0; i < pathSize; i++){
for(int j = 0; j < pathSize; j++){
distance_matrix[i*pathSize+j] = sqrt(pow(path_coordinates[i*2]-path_coordinates[j*2],2) + pow(path_coordinates[i*2+1]-path_coordinates[j*2+1],2));
}
}
int *population = (int*)malloc(popSize * pathSize * sizeof(int)); //[popSize][pathSize]; - This represents the order of the cities for each chromosome in the population
for (int i = 0; i < pathSize * popSize; i++){
population[i] = i % pathSize;
}
auto load_checkpoint = std::chrono::high_resolution_clock::now();
printf("Data loaded in %.2ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(load_checkpoint - start).count());
//---------------------------------------------------------------
// Random shuffle the population for the first time
//---------------------------------------------------------------
srand(time(NULL)); // seed the random number generator
// measure time to shuffle the population
auto shuffle_start = std::chrono::high_resolution_clock::now();
std::thread t[threads];
int sub_pop_num = std::ceil((popSize/subPopSize)/threads); // number of subpopulations/islands per thread
//-----------------------------------------------------------------------
// Allocate fitness and distances for each individual and calculate them
//-----------------------------------------------------------------------
double *population_fitness = (double*)malloc(popSize*sizeof(double)); //[popSize];
double *population_distances = (double*)malloc(popSize*sizeof(double)); //[popSize];
for (int i = 0; i < threads; i++){
int *pop_start = population + i * sub_pop_num * subPopSize * pathSize;
double *pop_fit_start = population_fitness + i * subPopSize;
t[i] = std::thread([pop_start, pop_fit_start, distance_matrix, population_distances, sub_pop_num](){
random_shuffle(pop_start, sub_pop_num);
calculate_scores(pop_start, distance_matrix, pop_fit_start, population_distances, sub_pop_num);
});
}
// join the threads
for (int i = 0; i < threads; i++){
t[i].join();
}
printf("Scores calculated\n");
auto shuffle_end = std::chrono::high_resolution_clock::now();
printf("Population shuffled in %.2ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(shuffle_end - shuffle_start).count());
// do the same in a single thread
shuffle_start = std::chrono::high_resolution_clock::now();
random_shuffle(population, popSize/subPopSize);
calculate_scores(population, distance_matrix, population_fitness, population_distances, popSize/subPopSize);
shuffle_end = std::chrono::high_resolution_clock::now();
printf("Population shuffled in %.2ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(shuffle_end - shuffle_start).count());
}
And the 48_cities.txt file
6734 1453
2233 10
5530 1424
401 841
3082 1644
7608 4458
7573 3716
7265 1268
6898 1885
1112 2049
5468 2606
5989 2873
4706 2674
4612 2035
6347 2683
6107 669
7611 5184
7462 3590
7732 4723
5900 3561
4483 3369
6101 1110
5199 2182
1633 2809
4307 2322
675 1006
7555 4819
7541 3981
3177 756
7352 4506
7545 2801
3245 3305
6426 3173
4608 1198
23 2216
7248 3779
7762 4595
7392 2244
3484 2829
6271 2135
4985 140
1916 1569
7280 4899
7509 3239
10 2676
6807 2993
5185 3258
3023 1942
I expected the timing to reduce with an increasing number of threads, up to the maximum available in the system, but the opposite happens.
- The next part of the post explains in detail what I tried.
1) My first iteration created a fixed number of std::threads and prompted them with tasks in the form of lambda functions on each iteration of the genetic algorithm. (Note that given the structure of the code and the model of genetic algorithm, synchronization points are distributed on every iteration, allowing for consistent mutations and, periodically, migrations of individuals)
Here a part of the While loop that handles evolution
int generation = 1;
printf("Starting the GA...\n");
while (generation <= generations){
auto start_gen = std::chrono::high_resolution_clock::now();
// assign the threads to the genetic_step function and mutate function
for (int i = 0; i < threads; i++){
int *pop_start = population + i * sub_pop_num * subPopSize * pathSize;
double *pop_fit_start = population_fitness + i * sub_pop_num * subPopSize;
t[i] = std::thread([pop_start, pop_fit_start, sub_pop_num, distance_matrix](){
genetic_step(pop_start, pop_fit_start, distance_matrix, sub_pop_num);
mutation(pop_start, sub_pop_num);
});
}
// join the threads
for (int i = 0; i < threads; i++){
t[i].join();
}
The performance were poor and time increased with the growing of employed threads instead of decreasing as I expected. Searching for this on stack overflow and around the web I resolved this had to be due to thread creation, assignment and joining overheads. I decided to employ a threadpool.
2) The Threadpool was implemented as follows:
class ThreadPool {
public:
ThreadPool(int numThreads) : stop(false), completedTasks(0), totalTasks(0) {
for (int i = 0; i < numThreads; ++i) {
workers.emplace_back([this, i] {
set_thread_affinity(i);
//std::cout << "Thread " << i << " started" << std::endl;
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();
}
task();
{
std::lock_guard<std::mutex> lock(completedTasksMutex);
++completedTasks;
if (completedTasks == totalTasks) {
condition.notify_all();
}
}
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& thread : workers) {
thread.join();
}
}
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
void wait() {
std::unique_lock<std::mutex> lock(completedTasksMutex);
condition.wait(lock, [this] { return tasks.empty() && completedTasks == totalTasks; });
completedTasks = 0; // Reset completed tasks after they have all finished
}
void setTotalTasks(int numTasks) {
totalTasks = numTasks;
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::mutex completedTasksMutex;
std::condition_variable condition;
bool stop;
int completedTasks;
int totalTasks;
void set_thread_affinity(int core_id) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);
pthread_t current_thread = pthread_self();
if (pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset) != 0) {
std::cerr << "Error setting thread affinity for core " << core_id << std::endl;
}
}
};
And the tasks enqueued accordingly:
pool.setTotalTasks(threads);
for (int i = 0; i < threads; i++){
int *pop_start = population + i * sub_pop_num * subPopSize * pathSize;
double *pop_fit_start = population_fitness + i * subPopSize;
pool.enqueue([pop_start, pop_fit_start, distance_matrix, population_distances, sub_pop_num](){
random_shuffle(pop_start, sub_pop_num);
calculate_scores(pop_start, distance_matrix, pop_fit_start, population_distances, sub_pop_num);
});
}
// Wait for the threads to finish
pool.wait();
(This implementation use counters to keep track of enqueued executions in order to let the master thread wait the completion of the tasks)
But unfortunately, with and without affinity mechanisms, as described in the set_thread_affinity(int core_id)
function, the performance didn't change that much, still resulting in scalability issues, i.e. increasing the used threads increased the time of execution.
3) As a last hope I tried to use the OpenMP library to parallilize the workload, the same for loop above was then implemented as follows:
#pragma omp parallel for num_threads(threads)
for (int i = 0; i < threads; i++){
int *pop_start = population + i * sub_pop_num * subPopSize * pathSize;
double *pop_fit_start = population_fitness + i * sub_pop_num * subPopSize;
genetic_step(pop_start, pop_fit_start, distance_matrix, sub_pop_num);
mutation(pop_start, sub_pop_num);
}
Luckily enough, the code structure admitted this mean of parallelization, as long as each thread had a defined indexing offset computed on each run. But still the performance was bad.
Investigating deeper on OpenMP directives I managed to change:
export OMP_PLACES=cores
export OMP_PROC_BIND=master
Which mitigated the slowing issues with a growing number of threads employed. Nonetheless right now this whole code base can't scale in performances on my ubuntu machine.
From this consideration stems the question: I decided to try the first 2 versions on another Windows 10 machine and they scaled surprisingly well (note the thread affinity wasn't used in the threadpool version), obtaining up to a 10x acceleration on the workload with a sufficient number of threads. I'm now wondering if I'm missing any fundamental step in operating a multithreaded application on an Ubuntu OS.
The entire codebase (repo still in progress) can be found here.
PREVIOUSLY POSTED ON STACKOVERFLOW