Periodically trigger pthread workers and wait for completion

64 views Asked by At

I would like to create a set of N pthreads under control of the original process. I'd like to control them as in this pseudocode:

create_n_threads();

While(1) {
    main task modifies a global variable "phase" to control the type of work
    trigger the N threads to wake up and do work based on the global "phase" variable
    wait until all threads have completed their tasks
    main task does meta-calculation on the partial results of all the workers
}

I've tried pthread_barrier_wait(). It works well to trigger a compute cycle, but it doesn't give me a way to know when every task has completed. How do I know when all the threads are done so that I can safely do my meta-calculation on the result? I don't want to use pthread_join, because these work cycles will be in a tight loop and I don't want the overhead of killing and recreating the tasks on each cycle.

#include <stdio.h>
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h>     // for sleep()

#define NTHREADS 4
pthread_barrier_t b;

typedef struct threadargs {
int id;             // thread ID 0-\>N
int phase;          // either 0 or non zero to set blk/red phase
} THREADARGS;

int phase=0;
int cycle=0;

// worker function
// gets called with a pointer to THREADARGS structure
// which tells worker their id, starting, ending column to relax, and the red/black phase

void *thread_func(void *x)
{
int tid;                    // thread id

    int *retval = (int *) malloc(sizeof(int));   // so it persists across thread death
    tid = ((THREADARGS *) x)->id;
    
    while(1) {                  // wait to be triggered
        printf("%d: %d %d\n", cycle, tid, phase);
        pthread_barrier_wait(&b);
    }
    
    *retval = 2*tid;
    pthread_exit((void *)retval);

}

int main(int argc, char *argv[])
{
pthread_t threadids[NTHREADS];      // store os thread ids
THREADARGS thread_args[NTHREADS];           // arguments to thread
int rc, i;

    // initialize the multiprocess barrier 
    pthread_barrier_init(&b, NULL, NTHREADS+1);
    
    /* spawn the threads */
    for (i = 0; i < NTHREADS; ++i) {
        thread_args[i].id = i;
        printf("spawning thread %d\n", i);
        if((rc=pthread_create(&threadids[i], NULL, thread_func, (void *) &thread_args[i]))!=0) {
            fprintf(stderr, "cannot create thread %d\n",i);
            exit(8);
        };
    }
    
    for (i=0; i<10; i++) {              // do ten iterations
        printf("cycle %d\n", i);
        phase=(phase+1)%3;
        cycle++;
        pthread_barrier_wait(&b);       // trigger all the workers and wait for all to complete
    }
    
    exit(2);    // just kill everything

}

This example produces output like:

!) pthread
spawning thread 0
spawning thread 1
spawning thread 2
spawning thread 3
0: 0 0
0: 1 0
cycle 0
1: 2 1
1: 3 1
1: 3 1
1: 1 1
1: 2 1
1: 0 1
cycle 1
cycle 2
3: 1 0
3: 3 0
3: 2 0
3: 0 0
3: 0 0
3: 2 0
3: 3 0
cycle 3
3: 1 0
4: 1 1
4: 2 1
4: 0 1
4: 3 1

You can see that some workers are running multiple times per cycle and that the "phase" variable doesn't count properly from cycle to cycle. What I want is something like:

cycle 1
1: 0 0
1: 1 0
1: 2 0
1: 3 0
cycle 2
2: 0 1
2: 1 1
2: 2 1
2: 2 1
cycle 3
3: 0 2
...

Of course the print statements from each task will be scrambled but I want to trigger all 4 pthreads to perform a task "0,1,2" and for them all to finish so that I can work with their results and safely change global variables for the next cycle.

2

There are 2 answers

2
Erdal Küçük On BEST ANSWER

From pthread_barrier_wait

DESCRIPTION

The calling thread shall block until the required number of threads have called pthread_barrier_wait() specifying the barrier.

When the required number of threads have called pthread_barrier_wait() specifying the barrier, the constant PTHREAD_BARRIER_SERIAL_THREAD shall be returned to one unspecified thread and zero shall be returned to each of the remaining threads.

RETURN VALUE

Upon successful completion, the pthread_barrier_wait() function shall return PTHREAD_BARRIER_SERIAL_THREAD for a single (arbitrary) thread synchronized at the barrier and zero for each of the other threads. Otherwise, an error number shall be returned to indicate the error.

Therefore:

int res = pthread_barrier_wait(barrier);
if (res == PTHREAD_BARRIER_SERIAL_THREAD) {
    //only one single (arbitrary) thread will reach this point
    //and only if all other threads have reached the barrier
} else {
   //all others will see this part
}

With two barriers, you can do the following:

int done = 0;

void* thread_func(void *arg) {

    while (!done) {

        //wait until all threads have signal to go
        pthread_barrier_wait(start_barrier);

        //all green, lets do the work ...
        //as last operation, store the result in a dedicated global variable
        //as long as only this thread will access it, no need to
        //protect it with a mutex

        //after work is done, wait until all the others are done too
        pthread_barrier_wait(stop_barrier);

    }

}

int main()
{
    //all the init stuff (inclusive thread creation)

    do {
        
        //do preparations before the threads starts doing their work
        //set global vars etc.
        //e.g. done = 1;

        //all threads are waiting for the last thread to get going
        //give green signal
        pthread_barrier_wait(start_barrier);

        //while the threads doing their job
        pthread_barrier_wait(stop_barrier);

        //at this point, each task has done its job
        //and it is guaranteed, that all data will no longer
        //accessed by any thread, therefore no protection needed

        //do the main calculation

    } while (!done);

}
1
ikegami On

You could use a cond var.

int done = 0;
int work = 0;
int working = 0;
int waiting = n;

Worker:

pthread_mutex_lock( &mutex );

while ( 1 ) {
   // Wait for work to arrive.
   while ( !work && !done )
      pthread_cond_wait( &cond, &mutex );

   if ( !work && done )
      break;

   --work
   --waiting;
   ++working;
   pthread_cond_signal( &cond );

   pthread_mutex_unlock( &mutex );

   // Do work.

   pthread_mutex_lock( &mutex );

   --working;
   ++waiting;
   pthread_cond_signal( &cond );
}

pthread_mutex_unlock( &mutex );

Main:

pthread_mutex_lock( &mutex );

while ( 1 ) {
   work = n;
   pthread_cond_signal( &cond );

   // Wait for the workers to complete the work.
   while ( work || working )
      pthread_cond_wait( &cond, &mutex );

   // Process result.
}

done = 1;
pthread_cond_broadcast( &cond );

pthread_mutex_unlock( &mutex );

// Join worker threads here.

Note that any amount of work can be added. Less than n work is fine. Even more than n will work.


See also: Thread-safe queue that supports multiple producers and consumers