Piping in threads

Pipes are quite useful in threads. Imagine a scenario, where one thread is continously monitoring for new user input and another thread performs some processing on this data input. This can be achieved with the help of shared memory. But in case of shared memory, one has to think about data synchronization. Pipes can be made use of here. One thread waits and reads the user input and writes it into the pipe. The second thread can read from the pipe and perform processing.

Let’s take a simple example. The program below shows the READER-WRITER threads. The WRITER thread continously writes the alphabets into the pipe. and the READER thread reads it. See clearly that fd[2] is declared globally so that both threads can access it. Also note that the pipe() system call was called before threads creation.

To compile a program with threads, we must specify -lpthread to link with the POSIX threads library
$ gcc program -lpthread
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

int fd[2];//File descriptor for creating a pipe

//This function continously reads fd[0] for any input data byte
//If available, prints

void *reader()
{
   while(1){
      char    ch;
      int     result;

      result = read (fd[0],&ch,1);
      if (result != 1) {
        perror("read");
        exit(3);
      }

      printf ("Reader: %c\n", ch);   }
}

//This function continously writes Alphabet into fd[1]
//Waits if no more space is available

void *writer()
{
   int     result;
   char    ch='A';

   while(1){
       result = write (fd[1], &ch,1);
       if (result != 1){
           perror ("write");
           exit (2);
       }

       printf ("Writer: %c\n", ch);
       if(ch == 'Z')
         ch = 'A'-1;

       ch++;
   }
}

int main()
{
   pthread_t       tid1,tid2;
   int             result;

   result = pipe (fd);
   if (result < 0){
       perror("pipe ");
       exit(1);
   }

   pthread_create(&tid1,NULL,reader,NULL);
   pthread_create(&tid2,NULL,writer,NULL);

   pthread_join(tid1,NULL);
   pthread_join(tid2,NULL);
}

Output

...
Reader: Y
Reader: Z
Reader: A
Reader: B
Reader: C
Reader: D
Reader: E
Reader: F
Reader: G
Reader: H
Reader: I
Reader: J
Reader: K
Reader: L
Reader: M
Reader: N
Reader: O
Reader: P
Reader: Q
Reader: R
Reader: S
Reader: T
Reader: U
Reader: V
Reader: W
Reader: X
Reader: Y
Reader: Z
Reader: A
Reader: B
Reader: C
...

After doing this, let’s check what’s the capacity of a pipe. Anyway pipe also utilizes memory, so there must be an upper limit to the capacity of a pipe. A writer thread keeps on writing to the pipe hoping that the READER thread is reading it. Let’s put some delay in READER thread (sleep() call). so that write will at some point of time stop writing the data to the pipe.

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

int fd[2];//File descriptor for creating a pipe

//This function continously reads fd[0] for any input data byte
//If available, prints

void *reader()
{

 int     count = 0;
 sleep (25);
  //Delay in starting the reading from the pipe

  while(1){
      char    ch;
      int     result;

      result = read (fd[0],&ch,1);
      if (result != 1) {
          perror("read");
          exit(3);
      } printf ("Reader: %d %c\n",++count,ch);
  }
}

//This function continously writes Alphabet into fd[1]

void *writer()
{
    int     result;
    char    ch='A'; int     count = 0;

    while(1){

       result = write (fd[1], &ch,1);
       if (result != 1){
           perror ("write");
           exit (2);
       }

       printf ("Writer: %d %c\n", ++count, ch);

       if(ch == 'Z')
          ch = 'A'-1;

        ch++;
   }
}

int main()
{
   pthread_t       tid1,tid2;
   int             result;

   result = pipe (fd);

   if (result < 0){
        perror("pipe ");
        exit(1);
   }

   pthread_create(&tid1,NULL,reader,NULL);
   pthread_create(&tid2,NULL,writer,NULL);

   pthread_join(tid1,NULL);
   pthread_join(tid2,NULL);
}

Output
...
Writer: 65524 D
Writer: 65525 E
Writer: 65526 F
Writer: 65527 G
Writer: 65528 H
Writer: 65529 I
Writer: 65530 J
Writer: 65531 K
Writer: 65532 L
Writer: 65533 M
Writer: 65534 N
Writer: 65535 O
Writer: 65536 P

On executing the above program, the WRITER thread stopped at the count = 65536 waiting for READER thread to read the data already written. After the specified delay (here 25 seconds), the READER thread resumes its operation and starts reading. Then both WRITER and READER thread works normally.So pipe() is useful for implementing QUEUE mechanism of sending the data from one thread(process) to another keeping in mind that the pipe also has a capacity limit. So if the reader is blocked or if there is no reader, the writer will block at some point of time if it reached the capacity of the pipe.

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s