Piping in threads

Pipes are quite useful in threads. Imagine a scenario, where one thread is continously monitoring for new user input and another thread performs some processing on this data input. This can be achieved with the help of shared memory. But in case of shared memory, one has to think about data synchronization. Pipes can be made use of here. One thread waits and reads the user input and writes it into the pipe. The second thread can read from the pipe and perform processing.

Let’s take a simple example. The program below shows the READER-WRITER threads. The WRITER thread continously writes the alphabets into the pipe. and the READER thread reads it. See clearly that fd[2] is declared globally so that both threads can access it. Also note that the pipe() system call was called before threads creation.

To compile a program with threads, we must specify -lpthread to link with the POSIX threads library
$ gcc program -lpthread
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

int fd[2];//File descriptor for creating a pipe

//This function continously reads fd[0] for any input data byte
//If available, prints

void *reader()
{
   while(1){
      char    ch;
      int     result;

      result = read (fd[0],&ch,1);
      if (result != 1) {
        perror("read");
        exit(3);
      }

      printf ("Reader: %c\n", ch);   }
}

//This function continously writes Alphabet into fd[1]
//Waits if no more space is available

void *writer()
{
   int     result;
   char    ch='A';

   while(1){
       result = write (fd[1], &ch,1);
       if (result != 1){
           perror ("write");
           exit (2);
       }

       printf ("Writer: %c\n", ch);
       if(ch == 'Z')
         ch = 'A'-1;

       ch++;
   }
}

int main()
{
   pthread_t       tid1,tid2;
   int             result;

   result = pipe (fd);
   if (result < 0){
       perror("pipe ");
       exit(1);
   }

   pthread_create(&tid1,NULL,reader,NULL);
   pthread_create(&tid2,NULL,writer,NULL);

   pthread_join(tid1,NULL);
   pthread_join(tid2,NULL);
}

Output

...
Reader: Y
Reader: Z
Reader: A
Reader: B
Reader: C
Reader: D
Reader: E
Reader: F
Reader: G
Reader: H
Reader: I
Reader: J
Reader: K
Reader: L
Reader: M
Reader: N
Reader: O
Reader: P
Reader: Q
Reader: R
Reader: S
Reader: T
Reader: U
Reader: V
Reader: W
Reader: X
Reader: Y
Reader: Z
Reader: A
Reader: B
Reader: C
...

After doing this, let’s check what’s the capacity of a pipe. Anyway pipe also utilizes memory, so there must be an upper limit to the capacity of a pipe. A writer thread keeps on writing to the pipe hoping that the READER thread is reading it. Let’s put some delay in READER thread (sleep() call). so that write will at some point of time stop writing the data to the pipe.

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

int fd[2];//File descriptor for creating a pipe

//This function continously reads fd[0] for any input data byte
//If available, prints

void *reader()
{

 int     count = 0;
 sleep (25);
  //Delay in starting the reading from the pipe

  while(1){
      char    ch;
      int     result;

      result = read (fd[0],&ch,1);
      if (result != 1) {
          perror("read");
          exit(3);
      } printf ("Reader: %d %c\n",++count,ch);
  }
}

//This function continously writes Alphabet into fd[1]

void *writer()
{
    int     result;
    char    ch='A'; int     count = 0;

    while(1){

       result = write (fd[1], &ch,1);
       if (result != 1){
           perror ("write");
           exit (2);
       }

       printf ("Writer: %d %c\n", ++count, ch);

       if(ch == 'Z')
          ch = 'A'-1;

        ch++;
   }
}

int main()
{
   pthread_t       tid1,tid2;
   int             result;

   result = pipe (fd);

   if (result < 0){
        perror("pipe ");
        exit(1);
   }

   pthread_create(&tid1,NULL,reader,NULL);
   pthread_create(&tid2,NULL,writer,NULL);

   pthread_join(tid1,NULL);
   pthread_join(tid2,NULL);
}

Output
...
Writer: 65524 D
Writer: 65525 E
Writer: 65526 F
Writer: 65527 G
Writer: 65528 H
Writer: 65529 I
Writer: 65530 J
Writer: 65531 K
Writer: 65532 L
Writer: 65533 M
Writer: 65534 N
Writer: 65535 O
Writer: 65536 P

On executing the above program, the WRITER thread stopped at the count = 65536 waiting for READER thread to read the data already written. After the specified delay (here 25 seconds), the READER thread resumes its operation and starts reading. Then both WRITER and READER thread works normally.So pipe() is useful for implementing QUEUE mechanism of sending the data from one thread(process) to another keeping in mind that the pipe also has a capacity limit. So if the reader is blocked or if there is no reader, the writer will block at some point of time if it reached the capacity of the pipe.

 

Advertisements

Threads Programming in Linux: Examples

One of the important purpose of threads is to achieve concurrency. There may be many independent tasks in a program which can be done in parallel without the influence of the other. One of the first step in using threads is to first recognize the fact whether the program needs threads or not, otherwise the very purpose of threads becomes futile. For example if you are designing a program which involves reading a file and display it, you may utilize two or more threads, one to read the file, the second to update the display and other threads to monitor the user inputs in the form of keyboard interrupts or mouse movements. There can be cases when threads are dependent on each other like in case of our above example, if the concerned application is an editor so a character key pressed should be soon informed to the other thread which updates display so that it may open the menu corresponding to that key shortcut or do some other appropriate action.

Example 1:
Two threads displaying two strings “Hello” and “How are you?” independent of each other.

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

void * thread1()
{
        while(1){
                printf("Hello!!\n");
        }
}

void * thread2()
{
        while(1){
                printf("How are you?\n");
        }
}

int main()
{
        int status;
        pthread_t tid1,tid2;

        pthread_create(&tid1,NULL,thread1,NULL);
        pthread_create(&tid2,NULL,thread2,NULL);
        pthread_join(tid1,NULL);
        pthread_join(tid2,NULL);
        return 0;
}

Now compile this program (Note the -l option is to load the pthread library)

$gcc thread.c -lpthread 

On running, you can see many interleaved “Hello!!” and “How are you?” messages

Example 2
This example involves a reader and a writer thread. The reader thread reads a string from the user and writer thread displays it. This program uses semaphore so as to achieve synchronization

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h> 

char n[1024];
sem_t len;

void * read1()
{
        while(1){
                printf("Enter a string");
                scanf("%s",n);
                sem_post(&len);
        }
}

void * write1()
{
        while(1){
                sem_wait(&len);
                printf("The string entered is :");
                printf("==== %s\n",n);
        }

}

int main()
{
        int status;
        pthread_t tr, tw;

        pthread_create(&tr,NULL,read1,NULL);
        pthread_create(&tw,NULL,write1,NULL);

        pthread_join(tr,NULL);
        pthread_join(tw,NULL);
        return 0;
}

On running, in most cases we may be able to achieve a serial read and write( Thread1reads a string and Thread2 displays the same string). But suppose we insert a sleep function() in write1 like

void * write1()
{
         while(1){
                 sleep(5);
                 sem_wait(&len);
                 printf("The string entered is :");
                 printf("==== %s\n",n);
         }
}

The thread 1 may read one more string and thread2 displays the last read string. That is no serial read and write is achieved.

So we may need to use the condition variables to achieve serial read and write.

Example 3
This example involves a reader and a writer thread. The reader thread reads a string from the user and writer thread displays it. This program uses condition variables to achieve synchronization and achieve serial programming.

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>

#define TRUE 1
#define FALSE 0

char n[1024];
pthread_mutex_t lock= PTHREAD_MUTEX_INITIALIZER;
int string_read=FALSE;

pthread_cond_t cond;

void * read1()
{
        while(1){
                while(string_read);
                pthread_mutex_lock(&lock);
                printf("Enter a string: ");
                scanf("%s",n);
                string_read=TRUE;
                pthread_mutex_unlock(&lock);
                pthread_cond_signal(&cond);
        }
}

void * write1()
{
        while(1){
                pthread_mutex_lock(&lock);
                while(!string_read)
                        pthread_cond_wait(&cond,&lock);
                        printf("The string entered is %s\n",n);

                        string_read=FALSE;
                        pthread_mutex_unlock(&lock);
        }
}
int main()
{
        int status;
        pthread_t tr, tw;

        pthread_create(&tr,NULL,read1,NULL);
        pthread_create(&tw,NULL,write1,NULL);

        pthread_join(tr,NULL);
        pthread_join(tw,NULL);
        return 0;
}

The output is serial read and write.

In the beginning, I started the discussion that threads are used to achieve concurrency. But the above examples can be easily done by simple scanf and printf i.e.,
scanf(“%s”,n);
printf(“%s”,n);

But these examples were given to demonstrate the semaphores and condition variables. Example 3 can be further modified to design a reader/writer application. Example string_read boolean variable can be converted to a string_count variable.

 

Linux Kernel Support for Threads (Light Weight Processes)

Yes, Linux supports threads. Thread is also a context of execution like the processes. We can do programming with threads. The further discussion on this topic is with respect to the Linux 2.6 kernel. As of now, there is support for muliple threads in the Linux kernel and in the user space, we make utilize of the Threads library like POSIX threads. The way POSIX threads is implemented in Linux is different from other Operating Systems (Because they are no specific system calls for dealing with threads)

So the question arises if Linux does not have any system calls corresponding to threads creation, how are the POSIX threads created. And if POSIX thread implementation is done in the user space alone, there are many issues that has to be dealt with. Let’s think that POSIX thread is implemented completely in the user space, so the scheduling of threads is to be done by the POSIX threads library itself.

Threads must be light weight in the sense that all the threads in the same process share the same address space. Implementing POSIX threads in the user space can solve this issue. Because threads are nothing but an abstraction to the programmer. He/she enjoys some of the benefits of threads. But if one of the threads is blocked on a particular system call like read, the whole process will be blocked because the very idea of the threads is transparent to the kernel. So all the benefits of threads can not be utilized.

So how is this issue solved in Linux? It is done by implementing the so called ‘Light weight processes’. A fork() system call creates a new process. If the child process does not have any execve() like system calls, both the child and the parent process share the same address space for the text(program code). And the data address space is marked as ‘Copy on write’ that is in the beginning both the child and the parent process share the same data adrress space, but any attempt of changing the data by the child will result in creating a new data address space for the child.

Since now we got an idea about how the fork() system call works, we can now think about implementing threads. As we know that threads in the same process share the same address space for text and data, so we need not set any ‘Copy on write’. The issues of data synchroniztion which comes up when two threads access the same data varible has to be worked upon by the programmer.

There is a system call called clone called clone() or clone2() which helps in creating a child process but unlike fork(), we have more control on deciding the behaviour of the child process whether we want to have the new child share the same filesystem information, file descriptor table, signal handler table or the memory space of the parent. In fact the threads library makes use of the clone() system call to create new threads.

So by this time, we got an idea that threads in Linux are nothing but processes or better to be called ‘Light weight processes'(because of the sharing of data).

There is an advantage of this way of implementation. To the kernel everything is seen as processes and the scheduler has nothing to think about separate scheduling techniques for threads and processes. This makes the implementation as simple as possible and also solves the above blocking problem of the threads implementation in user space.

Let’s check a small snippet of code

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
  void * function1(void *arg)
{
   pthread_t tid=pthread_self();
   printf("In thread %u and process %u\n",tid,getpid());
}

void * function2(void *arg)
{
   pthread_t tid=pthread_self();
   printf("In thread %u and process %u\n",tid,getpid());
}

int main()
{
   void *status;
   pthread_t tid1,tid2;
   pthread_attr_t attr;

    if(pthread_create(&tid1,NULL,function1,NULL)){
        perror("Failure");
        exit(1);
   }

   if(pthread_create(&tid2,NULL,function2,NULL)){
       perror("Failure");
       exit(2);
   }

   pthread_join(tid1,NULL);
   pthread_join(tid2,NULL);
   printf("In main thread %u and process %u\n",pthread_self(),getpid());
}

To compile this program

$gcc thread.c -lpthread

On Executing, the output is

In thread 3086625680 and process 5480
In thread 3076135824 and process 5480
In main thread 3086628544 and process 5480

How can this be possible? By our above discussion, we found that threads are nothing but light weight processes. But getpid() (which returns the Process ID) gives same PID for all the processes. This is because the POSIX standard says that the threads must return the same PID (based on the assumption that they all are running in the same process). To deal with this issue, Linux introduced a tgid(Thread group Leader ID) field. The tgid is same as the PID of the first light weight process in the threads group(Group of threads created by a process including itself). System calls like getpid() has been so designed that they return the tgid of the process instead of the PID, thus threads in the same thread group get the same value from getpid().