Message-Passing Interface

This guide assumes you have previous knowledge about C programming and will present you Message-Passing Interface (MPI) by several examples.

What is MPI ?Edit

MPI is a standardized and portable message-passing system. Message-passing systems are used especially on distributed machines with separate memory for executing parallel applications. With this system, each executing process will communicate and share its data with others by sending and receiving messages. MPI is the specification resulting from the MPI-Forum which involved several organizations designing a portable system (that can allow programs to work on a heterogeneous network).

Since the data can only be shared by exchanging messages, this standard is not intended for use on shared-memory systems, like a multiprocessor computer (although it will work, it is not the primary goal and there are more powerful alternatives, for instance, OpenMP). Basically, MPI includes point-to-point communication, collective communication (over a network of processes), process groups, bindings for Fortran and C and other advanced functions. On the other hand, the standard does not specify explicit shared-memory operations, debugging facilities, explicit support for threads, I/O functions.

The current version is 2.0, although 1.1 is still used.

DocumentationEdit

The volume Using MPI: Portable Parallel Programming with the Message-Passing Interface by William Gropp, Ewing Lusk and Anthony Skjellum is recommended as an introduction to MPI. For more complete information, read MPI: The Complete Reference by Snir, Otto, Huss-Lederman, Walker and Dongarra. Also, the standard itself can be found at [1] or [2]. For other Internet references, such as tutorials, see the Open Directory Project or the List of Tutorials at the Argonne National Laboratory.

ImplementationEdit

Currently, there are two principal implementations: MPICH and LAM. The official web-site of the latter ([3]) contains much information about LAM and also about MPI.

Supplementary libraryEdit

Additionally libraries exist for solving numerical problem or for using I/O functions on a distributed machine. We can mention: the ScaLAPACK library, FFTW (a portable C FFT library), the PETSc Scientific Computing Libraries. A more complete list can be found on the site [4]

Developing toolsEdit

CompilersEdit

Each implementation provides a compiler or at least a front-end to an existing compiler.

DebuggingEdit

As MPI doesn't specify debugging facilities, some program can be used to this purpose: XMPI is a run/debug GUI for LAM/MPI; MPI-CHECK is a debugger for Fortran; Etnus, Inc. offers a version of the Totalview debugger that supports MPICH (as well as some other MPI implementations); and, Streamline Computing's Distributed Debugging Tool works with several MPI implementations, include MPICH and LAM/MPI.

BenchmarksEdit

Mpptest, SKaMPI and MPBench measure the performance of MPI implementations. They can be used to decide what implementation to use, how to implement portable and efficient MPI programs and also for predicting the performance of MPI programs.

Famous and popular MPI benchmark includes NASA Parallel Benchmark, SpecMPI 2007 and HPCC benchmark suite.

ApplicationsEdit

It can be used on large heterogeneous parc.

ExamplesEdit

Sum of an arrayEdit

The point of this first simple example is to calculate the sum of all the value stored in an array. In C programming, the sequential version will be as below:

 
 /* Sum of an array - Sequential version */
 #include <stdio.h>
 #define N 100000
 
 int main (void) {
   float array[N];
   double mysum = 0;
   unsigned long long i;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //Calculation of the sum
   for (i = 0; i < N; i++)
     mysum += array[i];
   printf ("%lf\n", mysum);
   return 0;
 }

First version - Basic point-to-point communicationsEdit

MPI implementations allow a user to launch several processes which will each have a defined numerical identifier, its rank. Conventionally, we will consider that the master is the process which has the rank '0'. All other processes are slaves. In this program, the master divides the array into sub-arrays and sends these to each slave. Each slave will then calculate the sum of its sub-array. In the case where the size of the array is not an even multiple of the number of slaves, the master will finish the remaining work by calculating the sum of the last values of the array (see figure below).

Distribution of data from an array to processes

For each process, the source code of the program is identical and thus the executable code is, also. The code therefore must contain both the slave and the master parts. The part that will be executed depends on the rank of the process.

In this first version, the work is divided into just two parts:

 /* Sum of an array - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   //Initialization of MPI
   MPI_Init (&argc, &argv);
   //myrank will contain the rank of the process
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   //The part of the program which will be executed is decided
   if (myrank == 0)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum = 0, tmpsum;
   unsigned long long i;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //The array is divided by two and each part is sent to the slaves
   MPI_Send (array, N / 2, MPI_FLOAT, 1, MSG_DATA, MPI_COMM_WORLD);
   MPI_Send (array + N / 2, N / 2, MPI_FLOAT, 2, MSG_DATA, MPI_COMM_WORLD);
   //The master receive the result from the slaves
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 1, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 2, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   MPI_Status status;
   //The slave receives the array from the master
   MPI_Recv (array, N / 2, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < N / 2; i++)
     sum += array[i];
   //The result is send to the master
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

This program will be explained step by step.

First, for using MPI functions, we must include the file mpi.h which contains the prototypes of the required functions. The size of the array is next defined by N. MSG_DATA and MSG_RESULT will be used by the MPI_Send and the MPI_RECV functions. Two prototypes of functions are declared. These functions contain the code for the master process and for the slave processes, respectively.

The main function chooses whether it should execute the master part (if the process rank is 0) or the slave part (for any other process rank.) The function begins with the MPI_INIT routine that must be called before any other MPI functions. This routine performs various initialization functions. Next, the rank of the process is obtained by calling the MPI_COMM_RANK function and, depending on the result, the master part or the slave part is executed. At the end of the computation, the final routine MPI_FINALIZE is called. It performs various administrative tasks associated with ending the use of MPI.

Two declarations not found in the sequential version of the program are included in the master: the tmpsum variable, which will store the result in each of the two slaves, and the status variable (whose type is defined by MPI), which is needed by the MPI_RECV function.

The computation part is now replaced by two calls to MPI_SEND and MPI_RECV. The arguments of the MPI_SEND function are the address of the first element to send, the number of elements to send, their type (the more common MPI types are MPI_INT, MPI_CHAR, MPI_FLOAT, MPI_DOUBLE, ...), the rank of the receiver, the tag of the message, and the communicator. In the simplest usage, the communicator would be MPI_COMM_WORLD, which includes all the processes sharing in the execution of the program. The arguments of the MPI_RECV function are the address of the receive buffer, the maximum number of elements to receive, their type, the rank of the sender, the tag of the message, communicator, and the status. For a common message, the type, the tag and the communicator must be the same (see figure below).

Arguments of the MPI_Send and MPI_RECV functions for a same message

The first half of the array is sent to the first slave and the second half of the array is sent to the second. In each case, the size of the sub-array is N/2. Next, the result is received from the slaves, stored, and added to the final sum.

Before it can receive the computed sums from the slaves, however, each slave must first sum the elements of the sub-array it has received from the master. The slave then sends the result back to the master.

When several messages are sent to the same process in an arbitrary order, the tag argument allow this process to distinguish between these messages and to receive them.

For execute this program with a lam implementation, the lam daemon must be started:

$ lamboot

Next, mpicc and mpirun are used for the compilation and the execution:

$ mpicc ./source.c
$ mpirun -np 3 ./a.out

The option -np specifies the number of processes to launch. In this example, it must be three (one master, two slaves). The program mpirun creates an MPI environment, coppying a.out to all three processors and running each one with the appropriate processor number set.

Second version - Adaptiveness to the number of processesEdit

This basic program has two problems:

  • The program needs two slaves for running correctly and cannot adapt it-self to the number of processes available
  • The master receives the data in a predefined order (the first slave and then the second). However the second slave can finish before the first

This leads to this second version:

 /* Sum of an array - Second parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum, tmpsum;
   unsigned long long step, i;
   int size;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   if (size != 1)
     step = N / (size - 1);
   //The array is divided by the number of slaves
   for (i = 0; i < size - 1; i++)
     MPI_Send (array + i * step, step, MPI_FLOAT, i + 1, MSG_DATA, MPI_COMM_WORLD);
   //The master completes the work if necessary
   for (i = (size - 1) * step, mysum = 0; i < N; i++)
     mysum += array[i];
   //The master receives the results in any order
   for (i = 1; i < size; mysum += tmpsum, i++)
     MPI_Recv (&tmpsum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   int count;
   MPI_Status status;
   MPI_Recv (array, N, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   //The slave finds the size of the array
   MPI_Get_count (&status, MPI_FLOAT, &count);
   for (i = 0, sum = 0; i < count; i++)
     sum += array[i];
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

For the first issue, the master needs the number of processes to determinate the size of the sub-array (which will be stored in the step variable). The MPI_COMM_SIZE function gives this indication in the size variable, so N is divided by the number of slaves, namely size−1 . The array is next sent to slave and while the slave will compute, the master will finish the work if it is necessary. It will then receive the result and add these to the final sum. For solving the second issue, the master will receive result from any source. This leads to the use of MPI_ANY_SOURCE in the MPI_Recv function in place of the rank of the process.

The slave does not need to know the number of processes but does the size of the sub-array that it has received. For this we will use the status argument. Actually, status is a structure that contains three data: the source of the message (useful if MPI_ANY_SOURCE is used), the tag of the message (if MPI_ANY_TAG is used), and the error code. Additionally, we can access to the real length of the received data by using the MPI_Get_count function and the status argument. The count variable is the length of the sub-array.

This version of the program can be used with any number of processes, even 1. In this special case, the master will do all the calculations.

Unfortunately, this program is not adapted to MPI because, there is a lot of data transfer and a little computation. While MPI is designed for non-shared memory operation, there must be more computation than data transferring for losing the less possible time in message. We could improve this program by sending only the first number and the last to sum (and to add all integer between these), but in this case only geometric sum can be calculated.

Integration by the Simpson methodEdit

The purpose of this program is to calculate the integral of a given function by the Simpson method. This example is more adapting to MPI, because it requires more computation and less data-transferring than previously. The mathematical equation is:

\lim_{n\rightarrow\infty}\int_{a}^{b}f\left(x\right)dx=\frac{b-a}{3n}\left[f\left(x_{0}\right)+4f\left(x_{n-1}\right)+f\left(x_{n}\right)+2\sum_{k=1}^{\frac{n}{2}-1}\left[2f\left(x_{2k-1}\right)+f\left(x_{2k}\right)\right]\right]

In C programming, the sequential version is:

 /* Integration - Simpson method - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define n 10000 //Must be even
 
 double f (double x);
 
 int main (void) {
   double integration,step;
   int i;
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   step = (b - a) / n;
   for (i = 1, integration = 0; i < n / 2; i++)
     integration += 2*f(a+(2*i-1)*step) + f(a+2*i*step);
   integration *= 2;
   integration += f(a) + f(b) + 4*f(b - step);
   integration *= step / 3;
   printf ("%.15lf\n", integration);
   return 0;
 }
 
 double f (double x) {
   return exp(-1*pow(x,2));
 }

First version - User-defined typeEdit

The work to be done is a sum like in the precedent example (an array with different coefficient to each boxes). The work will be divided as shown in the figure below.

Distribution of the data from a sum to processes.

The sum is divided and calculated by the slave and the master calculate the other values and complete the work if necessary as before.

The domain of the sum will be sent to each slave and they will evaluate several return values of the function and sum these. The information that will be sent are the beginning of the sum (begin), the difference between each value (step) and the number of value for which the function must be evaluated (two double and one integer). However, the data send by the MPI_Send routine must have the same type. Then, for minimizing the number of message to send and so for optimizing the program, a MPI type will be created and used for sending these data.

Generally, the data to send must be grouped and minimized for reducing the time wasted in communication (sending the minimum of message and with the minimum of data). So, two integer must be stored in a array and sent in a unique message rather than sent in two different messages.

Since the division of the work is faster than the work it-self, when a few processes are used, the master will wait a relatively long time for the result of the slaves. In this program, when there is less than a defined number of processes (LIMIT_PROC), the master will participate to the computation (namely, the work will be divided by the number of processes rather than the number of slave).

 /* Integration - Simpson method - First parallel version */
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000 //Must be even
 #define LIMIT_PROC 4 //Number of process for which the master stop to work and only divide the work
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 struct domain_of_sum {
   double begin;
   double step;
   int number; //Number of iteration
 };
 
 double f (double x);
 MPI_Datatype Init_Type_Domain (void); //Declaration of a MPI type which will contain the domain
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[3] = { MPI_DOUBLE, MPI_DOUBLE, MPI_INT };
   int blocklen[3] = { 1, 1, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[3] = { 0, ext, 2 * ext };
   MPI_Type_struct (3, blocklen, disp, type, &Domain);
   return Domain;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size;
   struct domain_of_sum mydomain; //The C structure which will contain the domain
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type
   MPI_Type_commit (&Domain);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain.step = (b - a) / n; //step of the integration
   //The work is divided
   if (size > 1) {
     if (size > LIMIT_PROC)
       mydomain.number = (n / 2 - 1) / (size - 1);
     else
       mydomain.number = (n / 2 - 1) / size;
   }
   mydomain.begin = a + mydomain.step;
   //Each domain are sent
   for (i = 0; i < size - 1; i++) {
     mydomain.begin = a + (2 * i * mydomain.number + 1) * mydomain.step;
     MPI_Send (&mydomain, 1, Domain, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * mydomain.number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain.step) + f (a + 2 * i * mydomain.step);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain.step);
   integration *= mydomain.step / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i;
   struct domain_of_sum mydomain;
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type 
   MPI_Type_commit (&Domain);
   MPI_Recv (&mydomain, 1, Domain, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < mydomain.number; i++)
     sum += 2*f(mydomain.begin+(2*i)*mydomain.step) + f(mydomain.begin+(2*i+1)*mydomain.step);
   MPI_Send (&sum, 1, MPI_LONG_LONG_INT, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

All the routine used for creating the MPI type are regrouped in the function Init_Type_Domain for simplifying the code. The function MPI_TYPE_STRUCT is used for creating the new type and store it in Domain. This function need 5 arguments: the number of blocks that the new type will contain (in this case two doubles and one integer); the number of each elements of these blocks in an array (one double, another double and one integer); an array of the displacement of each block in the message; an array of these types; and, the address of the variable which will contain the structure. The displacement of each blocks is obtained by the use of the MPI_TYPE_EXTENT function which give the length of a MPI_DOUBLE. It is the equivalent to the sizeof() function in C. The length of a MPI type is store in a MPI_Aint variable.

An other possibility would be to declare a structure that contains a array of two doubles and a integer:

 struct domain_of_sum {
   double array[2]; //Contains begin and step
   int number;
 };
 
 //The new MPI type
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[2] = { MPI_DOUBLE, MPI_INT };
   int blocklen[2] = { 2, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[2] = { 0, 2 * ext };
   MPI_Type_struct (2, blocklen, disp, type, &Domain);
   return Domain;
 }

Second versionEdit

Two different methods have been used for determining the number of operations that the slave must realize. In the first example, the slave use the status argument of the MPI_Recv function. In the second, this number was directly sent with the data. An other way is to obtain this by the same way that the master fixed this data, namely with the number of processes. For minimizing the quantity of data-transfer, we will use this last method (this version is just a optimization):

 /* Integration - Simpson method - Second parallel version*/
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 int Get_the_number (int size);  // Function which gives the number of iterations
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = (n / 2 - 1) / (size - 1);
     else
       number = (n / 2 - 1) / size;
   }
   return number;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain[1] = (b - a) / n;
   //The work is divided
   mydomain[0] = a + mydomain[1];
   for (i = 0; i < size - 1; i++) {
     mydomain[0] = a + (2 * i * number + 1) * mydomain[1];
     MPI_Send (mydomain, 2, MPI_DOUBLE, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain[1]) + f (a + 2 * i * mydomain[1]);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain[1]);
   integration *= mydomain[1] / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   MPI_Recv (mydomain, 2, MPI_DOUBLE, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < number; i++)
     sum+=2*f(mydomain[0]+(2*i)*mydomain[1])+f(mydomain[0]+(2*i+1)*mydomain[1]);
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

Product of a vector and a matrixEdit

The point of this program is to calculate the product of a vector and a matrix. In C programming, the sequential version is:

 /* Product of a vector by a matrix - Sequential version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 
 void writecolumn (float *A);
 
 int main (int argc, char **argv) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Product of the matrix and the vector
   for (i = 0; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   writecolumn(C);
   free (A);
   return 0;
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

First version - BroadcastEdit

For parallelizing this program, the matrix will be divide by groups of lines and each part will be sent to a different slave while the vector will be sent to every slave (see figure below).

Distribution of the data from a matrix to processes

Sending the same data to each slave can be done by broadcasting these data with the MPI_Bcast function which must be called by all the processes.

 /* Product of a vector by a matrix - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 int Get_the_number (int size);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = N / (size - 1);
     else
       number = N / size;
   }
   return number;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N], *buff;
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   //Broadcast of the vector
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   for (i = 1; i < size; i++)
     MPI_Send (A + (i - 1) * step * N, N * step, MPI_FLOAT, i, MSG_DATA, MPI_COMM_WORLD);
   //Finishing the work
   for (i = (size - 1) * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   buff = (float *) malloc (step * sizeof (float));
   //Receive and reorder
   for (i = 1; i < size; i++) {
     MPI_Recv (buff, step, MPI_FLOAT, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     for (j = 0; j < step; j++)
       C[j + (status.MPI_SOURCE - 1) * step] = buff[j];
   }
   writecolumn(C);
   free (A);
   free (buff);
 }
 
 void slave (void) {
   float Vect[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Recv (partmat, N * step, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   MPI_Send (result, step, MPI_FLOAT, 0, MSG_RESULT, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }

The MPI_Bcast function requires fewer arguments than the MPI_Send and the MPI_Recv functions. The MPI_Bcast function requires the address of the buffer that contains or will contain the broadcasting data, the number of entries in the buffer, these type, the rank of the broadcast root (the process that will send the data) and the communicator. There are neither tag nor status arguments because they serve to differentiate messages and to give informations about the receiving data and in this case, all processes are involved in the broadcast message (so, there is no need to differentiate message) and the information stored in status are known (the source is the broadcast root, the length is the number of entries and there is no tag).

When the master receive the data, it must reorder it because the order of receiving is not specified (MPI_ANY_SOURCE). For this purpose, the status structure contains the source of the message which is used for reordering and composing the vector.

In this example, the matrix is stored in a one-dimensional array in place of a conventional two-dimensinal array. It is possible to send a multi-dimensional array with MPI, but it must be carefully allocated. Namely, the array must be stored contiguously because the first argument of sent functions is the address of the starting buffer, and the second argument specifies the number of the following values that will be sent. Then the static and dynamic allocation are:

 /* Static allocation */
 int array[rows][columns];
 
 /* Dynamic allocation */
 int **array;
 array = (int**) malloc (rows * sizeof(int*));
 if (array == NULL) exit (-1);
 
 array[0] = (int*) malloc (rows * columns * sizeof(int));
 if (array[0] == NULL) exit (-1);
 
 for (i=1; i<rows; i++)
   array[i] = array[0] + i * columns;
 
 /* Send either kind of array */
 MPI_Send (array[0],rows*columns,MPI_INT,0,0,MPI_COMM_WORLD);

Second version - Scatter and gatherEdit

The MPI specification provides collective communication functions, and the MPI_Bcast function is one of these. The next version of this program will use the "scatter" and "gather" functions which, respectively, send different data to everyone and receive different data from everyone (see figure below).

Principle of the scatter and gather functions

Thus, the work will be divided a little differently, namely, the master will also participate in the computation.

 /* Product of a vector by a matrix - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 5000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of the work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Sending the data
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Each processes will receive a different part of the matrix
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //The master make its part (the same code is used for the slaves)
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += *(partmat + i * N + j) * B[j];
   //It finish the work
   for (i = size * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   //Receiving the data in the good order
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   writecolumn(C);
   free (partmat);
   free (result);
   free (A);
 }
 
 void slave (void) {
   float Vect[N], *A, *C; //C and A are not used by slaves but must be declared
   unsigned int i, j, step;
   int size;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Receiving the data
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Computation
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   //Sending the result
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

The call to the functions MPI_SCATTER and MPI_GATHER is the same for the master as for the slaves (emphasized arguments are ignored by the slaves). The arguments are:

MPI_Scatter MPI_Gather
address of send buffer address of send buffer
number of elements sent in each process number of elements in send buffer
data type of send buffer elements data type of send buffer elements
address of receive buffer address of receive buffer
number of elements sent in receive buffer number of elements for any single receive
data type of receive buffer elements data type of receive buffer elements
rank of sending process rank of receiving process
communicator communicator

Note that in the first case, the master sends data to itself and in the second, it receives data from itself. In this example, the matrix is divided by the number of processes and the master sends the first part to itself, the second part to the first slave, etc. Afterwards the master receives the first part of the resulting vector from itself, the second from the first slave, etc.

Cellular automataEdit

In this example, we will code a cellular automata. A cellular automata is the evolution of a matrix on which we execute a defined function until there is convergence or cyclic phenomena. The function return a value for each point of the matrix depending of the value of his four neighbors. The border of the matrix will have the same value than the border of there opposite side. The resulting matrix will be a torus. In C programming, the sequential version is:

 /* Cellular automata - Sequential version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #define N 62
 #define MAX_ITERATION N //Number of iteration
 
 void init_mat (int *A, int length);
 void writemat (int *A);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc(N*N*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int i,j,k=0;
   init_mat (A,N);
   while(k++<MAX_ITERATION) {
   //Calculation of the new matrix in a temporary matrix
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     //Copy of the new matrix
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     //Update of the border
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
   }
   writemat(A);
   return 0;
 }
 
 void init_mat (int *A, int length) {
   int i,j;
   for (i=0;i<length;i++)
     for (j=0;j<length;j++)
       *(A+i*length+j)=i+j>length/2&&i+j<length*3/2;
   //Each border of the matrix will have the same value than the border of his opposite side
   for (i=1;i<length-1;i++) {
     *(A+i)=*(A+(length-2)*length+i);
     *(A+(length-1)*length+i)=*(A+length+i);
     *(A+i*length)=*(A+i*length+length-2);
     *(A+i*length+length-1)=*(A+i*length+1);
   }
 }
 
 void writemat (int *A) {
   int i,j;
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       printf ("%d ", *(A + i*N +j));
     printf("\n");
   }
 }
 
 int f (int x1,int x2,int x3,int x4) {
   return x1+x2+x3+x4>2;
 }

First version - DeadlockEdit

The matrix will be divided by group of lines like in the figure [fig:vect]. For calculating the new part of the matrix, the functions applied need the values of points that belong to other parts of the matrix and then, each process will calculate the new matrix and exchange the first line and the last line with its two neighbors (see figure below).

Partitioning of the matrix and communication between processes

While the processes share their data, a dead-lock can occur. A deadlock is a blocking state of a program due to the fact that one process is waiting for a specific action to be taken by another process, but the latter process is waiting for a specific action to be taken by the former process. This phenomena leads to a permanent waiting state (see figure below).

Example of deadlock

A deadlock can occur in this code because the MPI_SEND and MPI_RECV functions block the program until they are completed, namely, until the receiver receives the message sent or the sender sends the message to be received. If all the processes perform the same code (excluding the master), process one will send data to process two, process two will send data to process three, etc. Process one will wait until process two receives its data, but this will never happen because process two is also waiting until process three receives its data. Because the communication is cyclic, the program is blocked. As a first solution, we can change the order of sending and receiving on each even process.

 /* Cellular automata - First parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes (which must be > 1)
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   //Initialization by the master
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The next and the previous processes are determined
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The order of exchanging the data is decided
     if (myrank%2) {
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
     } else {
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     }
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The master recompose the matrix and print it
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

In this example, the master and the slave have almost the same code, so it is not necessary to divide the code in two functions (the code specific to the master is simply be preceded by a condition on the rank).

While the master wait for data from the process 1, the process 1 send data to the master. Then it wait for data from the master while this one send data to its. The code will have the same behavior for all processes and thus is safe.

Second version - Non-blocking send and receiveEdit

A second solution is to use non-blocking send and receive: these functions are called and give immediately the hand to the process. Then, the process can make some computation and when the data of the message need to be accessed, it can completed it by calling some appropriate functions. Program using non-blocking communication functions can be faster. In this second version, all the functions will be completed just after that all send and all receive are begun.

In the previous example, the dimension of the matrix should be a multiple of the number of processes because the remaining lines were not send to any process. In this version, the master send these lines to the last process and receive the resulting line separately.

 /* Cellular automata - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Request request[4];
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //Code allowing any dimension of the matrix
   if (myrank==0&&(N-2)%size) MPI_Send (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_DATA,MPI_COMM_WORLD);
   else if (myrank==size-1&&(N-2)%size) {
     MPI_Recv (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_DATA,MPI_COMM_WORLD,&status);
     step+=N-2-step*size;
   }
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The data begin to be shared
     MPI_Isend (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request);
     MPI_Irecv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+1);
     MPI_Isend (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+2);
     MPI_Irecv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request+3);
     //The transfer of data are completed
     for (i=0;i<4;i++)
       MPI_Wait(request+i,&status);
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   if (myrank==size-1&&(N-2)%size) {
     step=(N-2)/size;
     MPI_Send (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_RESULT,MPI_COMM_WORLD);
   } else if (myrank==0&&(N-2)%size)
     MPI_Recv (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_RESULT,MPI_COMM_WORLD,&status);
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

The MPI_Isend and the MPI_Irecv (I means immediate) functions need a supplementary argument: the address of a MPI_Request variable which will identify the request for a later completion. The status argument will be given to the completion functions. MPI_WAIT is used for complete the call to the previous functions (identified by the first argument) and will wait until the functions are effectively completed.

Third version - SendrecvEdit

The next version use a send-receive function: it is equivalent to call a send and a receive function in parallel. Only the while loop differ:

 while(k++<MAX_ITERATION) {
   //The transfers are simultaneous
   MPI_Sendrecv(A+N,N,MPI_INT,left,MSG_DATA,A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
   MPI_Sendrecv(A+(step)*N,N,MPI_INT,right,MSG_DATA,A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   for (i=1;i<step-1;i++) {
     *(A+i*N)=*(A+i*N+N-2);
     *(A+i*N+N-1)=*(A+i*N+1);
   }
   for (i=0;i<step;i++)
     for (j=0;j<N-2;j++)
       *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
   for (i=1;i<step+1;i++)
     for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
 }

The MPI_SENDRECV function need arguments which are essentially the same that are required by the MPI_SEND and MPI_RECV functions.

Fourth version - Ring topologyEdit

This last version is radically different, in which that the topology used is a ring topology. A new matrix is calculated on the process 0 and then is sent to the process 1. After one iteration, this process send it to the process 2, which calculate the new matrix and send it again to the next process. The last process will return the matrix to the master and the cycle will continue until the number of iteration reach a defined constant (see figure below).

Interaction in a ring topology

With this topology, several different data can be computed in the same time. Although in this example the operation applied on the data are the same, the processes can carry out different kinds of operation. Only one matrix will be sent in the ring. It is however possible (and it is the advantage of this structure) to send a second matrix just after.

 /* Cellular automata - Parallel version using a ring topology */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62
 #define MAX_ITERATION N
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc((N*N+1)*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   right=(myrank+1)%size; left=(myrank-1+size)%size;
   if (!myrank) init_mat(A,N);
   else MPI_Recv (A,N*N+1,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   while (*(A+N*N)<MAX_ITERATION) {
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     //The number of iteration is incremented
     (*(A+N*N))++;
     //The matrix is sent and another is receive
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
   }
   //When the number of iteration reach the maximum, the program stop
   if ((*(A+N*N))++==MAX_ITERATION) {
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
     writemat(A,N);
   } else MPI_Send (A,N*N+1,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     MPI_Finalize ();
   return 0;
 }

The number of iterations is stored with the matrix, in an additional element. It will serve to determine when to stop the loop. When one process will reach the maximum number of iterations, the matrix will then be transferred on all the other processes. It is indeed necessary for stopping these properly because else, they will continue their loop and never stop.

The MPI_SENDRECV_REPLACE function is similar to the MPI_SENDRECV function, though it use the same buffer for sending and receiving the data.

DifferentiationEdit

This program will calculate the numerical value of the derivate of a functions in multiple points by using the finite elements method. The equation is: f''\left(x\right)=\lim_{h\rightarrow0}\frac{f\left(x+h\right)-f\left(x-h\right)}{2h}

However, it is numerically difficult to divide by 0 and h must not be chosen arbitrary (see figure below).

Schematic evolution of the precision of the finite element methods in function of h

The optimal value of h is unknown and thus, the algorithm will decrease a started value of h until that the precision began to decrease (the difference between two derivate calculated with different h must decrease). In C programming, the sequential version is:

 /* Derivate - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define H 1 //first h
 #define DEC 1.4 //decrease
 #define N 500
 
 double f (double x);
 
 int main (int argc, char **argv) {
   double x,deriv,diffnew,diffold,h;
   double result;
   int i;
   for (i=0,x=i;i<N;x=++i) {
     h=H;
     deriv=(f(x+h)-f(x-h))/(2*h);
     diffnew=100;
     diffold=2*diffnew;
     while (fabs(diffnew)<fabs(diffold)) {
       h/=DEC;
       result=deriv;
       diffold=diffnew;
       deriv=(f(x+h)-f(x-h))/(2*h);
       diffnew=deriv-result;
     }
   printf ("%lf: %.15lf\n",x,result);
   }
   return 0;
 }
 
 double f (double x) {
   return x*x*x+x*x;
 }

First version - Farming topologyEdit

Since the number of loop is variable, the time of execution cannot be predicted. This program send a derivate to calculate to each processes and when one have finish (not necessarily the first), he resend immediately data to this one. These data and the result are gathered into two arrays (deriv[] and result[]).

 /* Derivate - Parallel version using farming topology */
 
 #include <stdio.h>
 #include <mpi.h>
 #include <math.h>
 
 #define H 1 //first h
 #define DEC 1.4 //decrease (1.4)
 #define N 500
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 void init_deriv(double *deriv, double x);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void init_deriv(double *deriv, double x) {
   deriv[0]=x;
   deriv[4]=H;
   deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
   deriv[2]=100;
   deriv[3]=2*deriv[2];
 }
 
 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //Sending data to all processes
   for (i=1;i<size;i++) {
     init_deriv(deriv,1);
     MPI_Send(deriv,5,MPI_DOUBLE,i,MSG_DATA,MPI_COMM_WORLD);
   }
   //When a result is received, another task is sent
   for (;i<N;++i) {
     init_deriv(deriv,1);
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   deriv[4]=10*H;
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }
 
 void slave(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   MPI_Status status;
   MPI_Recv(deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   while (deriv[4]<5*H) {
     while (fabs(deriv[2])<fabs(deriv[3])) {
       deriv[4]/=DEC;
       result[1]=deriv[1];
       deriv[3]=deriv[2];
       deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
       deriv[2]=deriv[1]-result[1];
     }
   result[0]=deriv[0];
   MPI_Send (result,2,MPI_DOUBLE,0,MSG_RESULT,MPI_COMM_WORLD);
   MPI_Recv (deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   }
 }

Second version - Completion of send and receiveEdit

The second topology illustrated looks like to a ring topology and introduce a new routine: MPI_REQUEST_FREE. The master will always send data to the process 1 and receive the result from any others processes (see figure below).

Schema of the used topology

Since the code of the slave is similar to that of the cellular automata (in the version using a ring topology), only the master function will be described. For avoiding deadlock, only non-blocking send will be used (the non-blocking receive is used here for improving performance).

 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Request request[N],req;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //The master fill the ring by sending enough data
   for (i=1;i<size;i++) {
     init_deriv(deriv,i);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
   }
   //When a result is received, another data is sent in the ring
   for (;i<N;++i) {
     MPI_Irecv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&req);
     init_deriv(deriv,i);
     MPI_Wait (&req,&status);
     MPI_Request_free (request+(int)result[0]);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Request_free (request+(int)result[0]);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }

When the master finish to send enough data (for all processes), it execute a new loop. It receive a result and initialize a new data simultaneously and next, it complete the send of the data that lead to the result that it just receive and he resend in the ring a new data. Actually, if the master receive the result of a data, this means that the send of this data is completed and there is no need to complete the send. The MPI_REQUEST_FREE function allow to free a buffered request without checking when this request is logically completed.

Communicators and groupsEdit

The collective communications (like MPI_BCAST, MPI_SCATTER and MPI_GATHER) send data on the processes that belong to a same group. MPI provide functions for managing groups and communicators. This allows to use collective communications on specified processes and thus to divide a network into groups which can execute different task.

For example, different operations can occurred on a matrix (trace, transpose, inversion, diagonalizing). For each operations, a group will be defined and each processes will communicate with the other processes of its groups by using collective communication without disturbing other groups.

A communicator is a object which specifies a communication domain. It is associated to a group. A group is created from existing groups and the initial group is the group of the communicator MPI_COMM_WORLD which contains all processes.

First version - Creation of communicator from a groupEdit

The first method for creating new communicators is to create groups from this first group and then to create the communicators of these groups.

 /* Example of use of group */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   int rank[50],size,i,myrank;
   char data[10]="";
   MPI_Group group,newgroup;
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   //group of MPI_COMM_WORLD which contains all the processes
   MPI_Comm_group (MPI_COMM_WORLD,&group);
   MPI_Group_size (group,&size);
   //Creation of the communicator containing even-ranked processes
   for (i=0;i<size/2;i++)
     rank[i]=2*i; if (size%2) rank[size/2]=size-1;
   MPI_Group_incl (group,(size+1)/2,rank,&newgroup);
   MPI_Comm_create (MPI_COMM_WORLD,newgroup,&newcomm);
   //Only the processes belonging to the group of newcomm will participate to the broadcast on this communicator
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Group_free (&group);
   MPI_Finalize ();
   return 0;
 }

In this program, data are broadcast only on the even-ranked processes. This is achieved by creating a communicator which will contain the group of the even-ranked processes. First, the initial group of MPI_COMM_WORLD is given by the MPI_COMM_GROUP routine. The new created group contains the \frac{\left(size+1\right)}{2} processes that have a even rank in the initial group. The MPI_GROUP_INCL function required an array of the ranks of the processes of the first group that will belong to the new group. Then, MPI_COMM_CREATE create the communicator of the new group and if the process have a odd rank, it participates to the broadcast. The group is free by MPI_GROUP_FREE.

The MPI_GROUP_SIZE and the MPI_COMM_RANK functions are similar to MPI_COMM_SIZE and MPI_GROUP_RANK. These last are just shortcut:

MPI_Comm_size (comm, &size); MPI_Comm_rank (comm, &rank);
MPI_Comm_group(comm, &group); MPI_Comm_group(comm, &group);
MPI_Group_size(group, &size); MPI_Group_rank(group, &rank);
MPI_Group_free(&group); MPI_Group_free(&group);

However, if the process that call MPI_GROUP_RANK don't belong to the group specified in argument to the function, the value return in rank will be MPI_UNDEFINED whereas a call to MPI_COMM_RANK will not work and will stop the program.

Second version - Creation of communicator from a communicatorEdit

One communicator can also be creating without using group, directly by splitting a existing communicator.

 int main (int argc, char *argv[]) {
   int color,i,myrank;
   char data[10]="";
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   //Creation of the communicator containing even-ranked processes
   if (!(myrank%2)) color=0;
   else color=1;
   MPI_Comm_split (MPI_COMM_WORLD,color,1,&newcomm);
   //Only the processes having even rank will participate to the broadcast on this communicator
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Finalize ();
   return 0;
 }

All the processes will be classified in a new communicator, depending of his color. Namely, if two processes have a color equal to 1 and one process have a color equal to 2, two communicators will be created, one for each groups. The third argument required by MPI_COMM_SPLIT is used for ordering the processes in the new communicator (this will change the rank of the new processes). Here, the same value is given for all processes.

DiversEdit

Environmental managementEdit

Finally, environmental management functions exist for getting the name of the node on which a process run or for measuring the time taken by a part of a program (for example).

 /* Example of use of environmental function */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   double starttime,endtime;
   int namelen;
   char processor_name[MPI_MAX_PROCESSOR_NAME];
 
   MPI_Init(&argc,&argv);
   MPI_Get_processor_name(processor_name,&namelen);
   printf ("Process %s\n", processor_name);
 
   starttime = MPI_Wtime();
   //Here some computation or communication
   endtime = MPI_Wtime();
   printf ("Time: %lf\n",endtime-starttime);
 
   MPI_Finalize ();
   return 0;
 }

Other possibilitiesEdit

There is many other functions that are not mentioned: there is different kinds of point-to-point communication functions; MPI allow user to define datatype by a lot of different functions; there is also other collective communication and reduction functions; and, more functions for using communicators and groups.

MPI defines many useful constant, among those MPI_ANY_SOURCE, MPI_INT, MPI_UNDEFINED, ...

MPI can also be used in an advanced way for managing process topologies. Namely, the MPI implementation can exploit the specificity of a physical network (for heterogeneous network) by privileging for example the transfer of message on the fastest connection.

Prototype of basic functionsEdit

For more information on these MPI functions, report to the man page:

int MPI_Init(int *argc, char ***argv);
int MPI_Finalize(void);
int MPI_Comm_rank(MPI_Comm comm, int *rank);
int MPI_Comm_size(MPI_Comm comm, int *size);
int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status);
int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count);
int MPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent);
int MPI_Type_struct(int count, int *array_of_blocklengths, MPI_Aint *array_of_displacements, MPI_Datatype *array_of_types, MPI_Datatype *newtype);
int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvcount, int root, MPI_Comm comm);
int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
int MPI_ISend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype datatype, int dest, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Request_free(MPI_Request *request);
int MPI_Group_rank(MPI_Group group, int *rank);
int MPI_Group_size(MPI_Group group, int *size);
int MPI_Comm_group(MPI_Comm comm, MPI_Group *group);
int MPI_Group_free(MPI_Group *group);
int MPI_Group_incl(MPI_Group *group, int n, int *ranks, MPI_Group *newgroup);
int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newgroup);
int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm);
int MPI_Wtime(void);
int MPI_Get_processor_name(char *name, int *resultlen);

As usual in the C programming language, the "address of an array" is really the address of the first element of the array.

PrerequisitesEdit

Further readingEdit

Last modified on 20 November 2013, at 23:10