Simple Thoughts on Multiprocessing and Distributed Computing with Functional Languages

In this thread on lispforum.com (which is a good forum for lisp-related stuff – and the only forum I know of)the topic of multiprocessing and distributed computing came up, and how functional languages could be used for this. The problem is to find good heuristics and a good possibility to split calculations into multiple threads.

The code I am posting now was a one-night-experiment by me, trying to create something like that, using a thread-pool and a processing-queue. Basically, the threads always read something from the processing queue, calculate it, and then write anything that still has to be calculated into that queue. To do this they are using the atomic instructions gcc provides (hence, afaik this code is not portable).

It is almost like saving continuations into a queue. And in fact, I am wondering whether this would be a good thing to do, i.e. saving the whole stack into one object in the queue.

Anyway. Here is my C-Code. I split it into two files. The first file is the header-file atomic_fifo.h:
#ifndef ATOMIC_FIFO_H
#define ATOMIC_FIFO_H

inline void debug (const char* x, void* y) {
#ifdef DEBUG
printf (x, y);
#else
#endif
}

typedef struct {
void** content;
int maxlen;
int read_optimistic;
int read_pessimistic;
int write_optimistic;
int write_pessimistic;

/* how much elements are in the buffer that are ready to be read */
int can_be_read;

/* how much elements are in the buffer that cannot be written
(i.e. are written at the moment, etc.). we must always have
cannot_be_written >= can_be_read */
int cannot_be_written;
} atomic_fifo;

void initialize_atomic_fifo (atomic_fifo *blubb, int length) {
/* note: you MUST allocate content yourself */
blubb->maxlen = length;
blubb->read_optimistic = 0;
blubb->read_pessimistic = 0;
blubb->write_optimistic = 0;
blubb->write_pessimistic = 0;
blubb->can_be_read = 0;
blubb->cannot_be_written = 0;
}

int write_fifo_nowait (void* data, atomic_fifo* fifo) {
/* write data into fifo, if it fits into buffer. if it doesnt fit,
dont wait, but return -1 */

debug („=====Entering write_fifo_nowait====\n“, (void*) 0);

debug („data: %d\n“, (void*) data);

/* can we write? */
int space = __sync_fetch_and_add (&(fifo->cannot_be_written), 1);

debug („space: %d\n“, (void*) space);
debug („fifo->maxlen: %d\n“, (void*) fifo->maxlen);

if (space >= fifo->maxlen) {
/* buffer is full (or almost full) */
__sync_fetch_and_sub (&(fifo->cannot_be_written), 1);
debug („leaving write_fifo_nowait…\n“, (void*) 0);
return -1;
}

/* we can write. so fetch a pointer optimistic. */
int myelement = __sync_fetch_and_add (&(fifo->write_optimistic), 1);

debug („myelement: %d\n“, (void*) myelement);
debug („myelement mod maxlen: %d\n“, (void*) (myelement % fifo->maxlen));

(fifo->content)[myelement % fifo->maxlen] = data;

debug („fifo->content: %d\n“, (void*)(fifo->content)[myelement % fifo->maxlen]);

/* now our element is inside the array. try to increment the
pessimistic pointer as soon as possible (no polling, do it
active, since we assume that this will only happen for a very
short time), i.e. wait until other processes which happen to set
elements before ours have finished. */

while (!__sync_bool_compare_and_swap(&(fifo->write_pessimistic), myelement, myelement+1));

__sync_fetch_and_add (&(fifo->can_be_read), 1);

return 0;
}

void write_fifo_active (void* data, atomic_fifo* fifo) {
/* actively try to write data */
while (write_fifo_nowait(data, fifo)!=0);
}

/*void write_fifo_polling (const void* data, atomic_fifo* fifo, int nanosec) {
}*/

int read_fifo_nowait (void** data, atomic_fifo* fifo) {
/* read data from fifo, if any. if not, return -1. */

debug („======Entering read_fifo_nowait=======\n“, (void*)0);

/* can we read? */
int available = __sync_fetch_and_sub(&(fifo->can_be_read), 1);

debug („available: %d\n“, (void*)available);

if (available <= 0) {
/* buffer is empty */
__sync_fetch_and_add(&(fifo->can_be_read), 1);
debug(„leaving … none available. \n“, (void*)0);
return -1;
}

/* yes, we can read. */
int myelement = __sync_fetch_and_add(&(fifo->read_optimistic), 1);

debug („myelement: %d\n“, (void*)myelement);
debug („myelement mod maxlen: %d\n“, (void*)(myelement % fifo->maxlen));

*data = (fifo->content)[myelement % fifo->maxlen];

debug („*data: %d\n“, (void*) *data);
debug („data: %d\n“, (void*) data);

while (!__sync_bool_compare_and_swap(&(fifo->read_pessimistic), myelement, myelement+1));

__sync_fetch_and_sub (&(fifo->cannot_be_written), 1);

return 0;
}

void read_fifo_active (void* data, atomic_fifo* fifo) {
while (read_fifo_nowait(data, fifo)!=0);
}

#endif

The second file is spamqueue.c:

#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define GC_THREADS
#include <gc/gc.h>

#include „atomic_fifo.h“

#define NUM_THREADS (2)
#define NUM_QUEUE (65536)

/* waits for calculation */
typedef struct {

/* how many other waiters have to be calculated until this
calculation is ready to be calculated? */
int remainingNeededCalculations;

/* Pointer to some Integer that should be decremented when this
waiter has been calculated. Should be set to some other waiter’s
remainingNeededCalculations. Passed to the calculatorFunction as
first argument. Must be decremented (using safe_decf) */
int* decfWhenCalculated;

/* Pointer which is passed to the calculatorFunction. It is the only
argument that can be passed. It should be used for return-values,
etc., too. */

void* argumentPointer;

/* function that is evaluated. decfWhenReady – see above*/
void (*calculatorFunction) (int* decfWhenCalculated, void* T);

} waiter;

void safe_decf (int* bla) {
__sync_fetch_and_sub (bla, 1);
}

void safe_incf (int* bla) {
__sync_fetch_and_add (bla, 1);
}

/* our waiting queue: */
int waiters[NUM_QUEUE];
atomic_fifo fifo;

/* our worker-threads */
pthread_t threads[NUM_THREADS];

/*int incf0r = 0;

void *blafasel() {
while (1) {
printf („%d\n“, __sync_fetch_and_add (&incf0r, 1));}
}*/

waiter* add_waiter (void (*calculatorFunction) (int* decf, void* T),
void* argumentPointer, int neededCalculations,
int* decfWhenCalculated) {

waiter *bla = (waiter*) GC_MALLOC(sizeof(waiter));

bla->calculatorFunction = calculatorFunction;
bla->argumentPointer = argumentPointer;
bla->remainingNeededCalculations = neededCalculations;
bla->decfWhenCalculated = decfWhenCalculated;

while (write_fifo_nowait ((void*) bla, &fifo) != 0) {
usleep(100000);
}

return bla;
}

waiter* get_next_waiter () {

waiter* bla;

while(1) {
while ((read_fifo_nowait ((void*)&bla, &fifo) != 0)) {
usleep(100000);
}

// add again if still needs calculations
if (__sync_bool_compare_and_swap(&(bla->remainingNeededCalculations), 0, 0)) {
return bla;
} else {
while (write_fifo_nowait ((void*) bla, &fifo) != 0) {
usleep(100000);
}
}
}
}

inline void execute_some_waiter () {
/* execute a waiter from the queue */
waiter* w = get_next_waiter ();
w->calculatorFunction(w->decfWhenCalculated, w->argumentPointer);
}

inline void *executionLoop (void *bla) {
(void) bla;
while (1) {
execute_some_waiter ();
}
}

/* experiment */

int numberToPrint = 0;

int someNil = 1;

void someFunc1 (int* decf, void* T) {
(void) T;
printf („jippee: number is: %d\n“, numberToPrint);
safe_decf (decf);
}

void someFunc2 (int* decf, void* T) {
(void) T;
numberToPrint = 1;
printf („jippee. Setting number.\n“);
safe_decf(decf);
}

typedef struct {
int* x1;
int* x2;
int* ret;
} arithmetic_struct;

void calcSum (int* decf, void* fact) {
arithmetic_struct* f = (arithmetic_struct*) fact;
*(f->ret) = (*(f->x1)) + (*(f->x2));
safe_decf(decf);
printf („sum(%d %d)=%d;\n“,*(f->x1),*(f->x2),*(f->ret));
printf(„decf: %d\n“, *decf);
printf(„someNil: %d\n“, someNil);
}

void calcFib (int* decf, void* f) {
arithmetic_struct* fib = (arithmetic_struct*) f;

if (*(fib->x1) == 0) {
*(fib->ret) = 0;
safe_decf (decf);
printf(„fib(0)=0;\n“);
} else if (*(fib->x1) == 1) {
*(fib->ret) = 1;
safe_decf (decf);
printf(„fib(1)=1;\n“);
} else {
// calc the sum of partial fibonacci-numbers
// reuse fib    arithmetic_struct* cSum = (arithmetic_struct*) GC_MALLOC(sizeof(arithmetic_struct));

arithmetic_struct* fib1 = (arithmetic_struct*) GC_MALLOC(sizeof(arithmetic_struct));
arithmetic_struct* fib2 = (arithmetic_struct*) GC_MALLOC(sizeof(arithmetic_struct));

fib1->x1 = (int*) GC_MALLOC(sizeof(int));
fib2->x1 = (int*) GC_MALLOC(sizeof(int));
fib1->ret = (int*) GC_MALLOC(sizeof(int));
fib2->ret = (int*) GC_MALLOC(sizeof(int));

*(fib1->x1) = *(fib->x1) – 1;
*(fib2->x1) = *(fib->x1) – 2;
fib->x1 = fib1->ret;
fib->x2 = fib2->ret;

printf („fib(%d)=sum(fib(%d) fib(%d));\n“, *(fib->x1), *(fib1->x1), *(fib2->x1));

printf(„decf: %d\n“, *decf);
printf(„someNil: %d\n“, someNil);

waiter* summe = add_waiter (calcSum, fib, 2, decf);
add_waiter (calcFib, fib1, 0, &(summe->remainingNeededCalculations));
add_waiter (calcFib, fib2, 0, &(summe->remainingNeededCalculations));
}
}

int main (void) {

int i = 0;
GC_INIT();

initialize_atomic_fifo (&fifo, NUM_QUEUE);

fifo.content = (void*) &waiters;

for (i=0; i < NUM_THREADS; ++i)
pthread_create(&threads[i], NULL, *executionLoop, NULL);

//int* someNil = (int*) GC_MALLOC(sizeof(int));
//  *someNil = 1;

/*    add_waiter (someFunc2, NULL, 0,
&(add_waiter(someFunc1, NULL, 1, &someNil)->remainingNeededCalculations));*/

arithmetic_struct* as = (arithmetic_struct*) GC_MALLOC(sizeof(arithmetic_struct));
as->x1 = (int*) GC_MALLOC(sizeof(int));
as->ret = (int*) GC_MALLOC(sizeof(int));

*(as->x1) = 20;

add_waiter (calcFib, as, 0, &someNil);

while (__sync_bool_compare_and_swap (&someNil, 1, 1)) {
// wait till we finish
usleep (1000000);
}

for (i=0; i < NUM_THREADS; ++i) {
printf(„cancelling %d\n“, i);
pthread_cancel(threads[i]);}

printf („\naftermath: %d\n“, *(as->ret));

}

This calculates the fibonacci-number 20 per recursion distributed into two threads. Calculating fibonacci-numbers recursively is not efficient at all, but I couldnt find a comparably easy-to-check algorithm which has that high complexity.

On my 2-processor-architecture, the 2-threaded version is even slower than the 1-threaded one – but I think the threads are always executed on the same processor.

Anyway. I am sure there are already Compilers working in a similar way. If somebody knows, I would like to know.

Schreibe einen Kommentar

Trage deine Daten unten ein oder klicke ein Icon um dich einzuloggen:

WordPress.com-Logo

Du kommentierst mit Deinem WordPress.com-Konto. Abmelden / Ändern )

Twitter-Bild

Du kommentierst mit Deinem Twitter-Konto. Abmelden / Ändern )

Facebook-Foto

Du kommentierst mit Deinem Facebook-Konto. Abmelden / Ändern )

Google+ Foto

Du kommentierst mit Deinem Google+-Konto. Abmelden / Ändern )

Verbinde mit %s

%d Bloggern gefällt das: