Monday 14 October 2013

Demonstrate Dispatcher Worker model using pthread

Leave a Comment
        Dispatcher Worker model also called as Boss worker model. In this model there is a server process which contain one dispatcher or boss thread and multiple worker threads. Dispatcher thread continuously check the status of the worker threads and assign job to the thread which is free or idle. Working of this model can be shown in the figure shown below.
Dispatcher Worker Model

        Here, we are going to demonstrate the working of dispatcher worker model in pthread which is also called as posix thread. As shown in the figure above we are going to create 4 worker threads using pthread_create function and 1 main thread is going to act as dispatcher thread which assigns the job. In the pthread example explain below we are going to consider that there are three jobs with name job1(), job2() and job3(), which are working for 40 seconds which is sufficient time for user to assign job to other threads while one or two are threads are working which is helpful for testing purpose, here you can add your own code. Here we are assigning job when main thread gives signal to worker threads this can be done using pthread_cond_wait() and pthread_cond_signal() functions for details you can visit here.
       As explain above, example of pthread program dispatcher_worker.c is shown below.
You can compile this program using command below
gcc dispatcher_worker.c -lpthread
You can run this program using command below
./a.out

#include"stdio.h"
#include
#include
#include 
pthread_t th1, th2, th3, th4;
pthread_cond_t con0 = PTHREAD_COND_INITIALIZER;
pthread_cond_t con1 = PTHREAD_COND_INITIALIZER;
pthread_cond_t con2 = PTHREAD_COND_INITIALIZER;
pthread_cond_t con3 = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex3 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex4 = PTHREAD_MUTEX_INITIALIZER;
int jobs[4];   //shared array used to submit job
void *fun1 (void *);
void *fun2 (void *);
void *fun3 (void *);

int job1 (int x)
{
 printf ("This is job1 from thread %d\n", x);
 sleep (40);   //working for 10 seconds 
 //(Suffient time to submit jobs to other threads for testing purpose)
 return 0;
}

int job2 (int x)
{
 printf ("This is job2 from thread %d\n", x);
 sleep (40);   //working for 10 seconds 
 //(Suffient time to submit jobs to other threads for testing purpose)
 return 0;
}

int job3 (int x)
{
 printf ("This is job3 from thread %d\n", x);
 sleep (40);   //working for 10 seconds 
 //(Suffient time to submit jobs to other threads for testing purpose)
 return 0;
}

void * fun0 (void *arg)
{
 int busy = 0;
 int j = 1;
 pthread_mutex_lock (&mutex);
 while (1)
 {
  pthread_cond_wait (&con0, &mutex); //waiting for job to be submitted
  if (jobs[0] == 1)
  {
   job1 (busy);
   jobs[0] = 0;
  }
  else if (jobs[0] == 2)
  {
   job2 (busy);
   jobs[0] = 0;
  }
  else if (jobs[0] == 3)
  {
   job3 (busy);
   jobs[0] = 0;
  }
  else
  {   //do nothing
  }
 }
 pthread_mutex_unlock (&mutex);
}

void * fun1 (void *arg)
{
 int busy = 1;
 pthread_mutex_lock (&mutex2);
 while (1)
 {
  pthread_cond_wait (&con1, &mutex2); //waiting for job to be submitted
  if (jobs[1] == 1)
  {
   job1 (busy);  //pass the thread number
   jobs[1] = 0;
  }
  else if (jobs[1] == 2)
  {
   job2 (busy);
   jobs[1] = 0;
  }
  else if (jobs[1] == 3)
  {
   job3 (busy);
   jobs[1] = 0;
  }
  else
  {   //do nothing
  }
 }
 pthread_mutex_unlock (&mutex2);
}

void *
fun2 (void *arg)
{
 int busy = 2;
 pthread_mutex_lock (&mutex3);
 while (1)   //going to infinite state 
 {
  pthread_cond_wait (&con2, &mutex3); //waiting for job to be submitted
  if (jobs[2] == 1)
  {
   job1 (busy);
   jobs[2] = 0;
  }
  else if (jobs[2] == 2)
  {
   job2 (busy);
   jobs[2] = 0;
  }
  else if (jobs[2] == 3)
  {
   job3 (busy);
   jobs[2] = 0;
  }
  else
  {   //do nothing
  }
 }
 pthread_mutex_unlock (&mutex3);
}

void * fun3 (void *arg)
{
 int busy = 3;
 pthread_mutex_lock (&mutex4);
 while (1)
 {
  pthread_cond_wait (&con3, &mutex4); //waiting for job to be submitted
  if (jobs[3] == 1)
  {
   job1 (busy);
   jobs[3] = 0;
  }
  else if (jobs[3] == 2)
  {
   job2 (busy);
   jobs[3] = 0;
  }
  else if (jobs[3] == 3)
  {
   job3 (busy);
   jobs[3] = 0;
  }
  else
  {   //do nothing
  }
 }
 pthread_mutex_unlock (&mutex4);
}

int main ()
{
 int i, j, k = 0, job_no = 0, th_no = 0;
 char choice, cond_var[20];
 for (i = 0; i < 4; i++)
 jobs[i] = 0;
 pthread_create (&th1, NULL, fun0, NULL);
 pthread_create (&th2, NULL, fun1, NULL);
 pthread_create (&th3, NULL, fun2, NULL);
 pthread_create (&th4, NULL, fun3, NULL);
 while (1)
 {
  k = 0;
  //printf ("Assign job to the free threads given below\n");
  for (i = 0; i < 4; i++)
  {
   if (jobs[i] == 0)
   {
    printf ("%d\t", i);
    k++;
   }
  }

  if (k > 0)
  {
   printf("numbered threads are free\n");
   printf("-------------------------------------------------------");
   printf ("\nWant to give job Y:yes N:no T:\"Terminate\" \n");
   scanf ("%s", &choice);
   if (choice == 'Y' || choice == 'y')
   {
    printf ("Enter job number (1-3) and thread number (0-3)\n");
    scanf ("%d %d", &job_no, &th_no);
    if (th_no == 0)
    {
     pthread_mutex_lock (&mutex);
     jobs[th_no] = job_no;
     pthread_cond_signal (&con0); //wake up thread waiting for condition variable con0
     pthread_mutex_unlock (&mutex);
    }
    else if (th_no == 1)
    {
     pthread_mutex_lock (&mutex2);
     jobs[th_no] = job_no;
     pthread_cond_signal (&con1); //wake up thread waiting for condition variable con1
     pthread_mutex_unlock (&mutex2);
    }
    else if (th_no == 2)
    {
     pthread_mutex_lock (&mutex3);
     jobs[th_no] = job_no;
     pthread_cond_signal (&con2); //wake up thread waiting for condition variable con2
     pthread_mutex_unlock (&mutex3);
    }
    else if (th_no == 3)
    {
     pthread_mutex_lock (&mutex4);
     jobs[th_no] = job_no;
     pthread_cond_signal (&con3); //wake up thread waiting for condition variable con3
     pthread_mutex_unlock (&mutex4);
    }
    else
    {
    }
   }
   else if (choice == 'T' || choice == 't')
   {
    exit (0);
   }
   else
   {
   }
  }
  else if (k == 0)
  {
   printf ("All threads are busy\n");
   sleep (2);
  }
  else
  {   //do nothing
  }
 }

 pthread_join (th1, NULL);
 pthread_join (th2, NULL);
 pthread_join (th3, NULL);
 pthread_join (th4, NULL);
 return 0;
}
Output:

      Since multiple threads are running parallel the sequence of output (that is printing) of statements may get vary. As shown in the figure above line number 12 is the output of previously given job to thread number 0, but output of this statement is appearing after line number 11 which is the output line of main thread. So you the program is waiting for input at line number 13 not at 12.

0 comments:

Post a Comment