Message Passing Interface

CS 441/641 Lecture, Dr. Lawlor

MPI is a standardized communication library that actually *is* a standard: if you build a supercomputer, you put MPI on it.  Period.  MPI is implemented as a library, with no compiler support, which makes it more portable but less convenient than OpenMP.

First, read this basic MPI tutorial, which has a good survey of all the MPI routines.  The gory details are in the MPI 1.1 standard, which includes some examples and is fairly readable for a standard (that's not saying much: most standards are hideously unreadable). 

Pay particular attention to the "big eight" MPI functions:
Those are really the only functions you need to learn in MPI 1.1, the rest are just small variations on those themes.

For example, here's an idiomatic MPI program where the first process sends one integer to the last process:
#include <mpi.h> /* for MPI_ functions */
#include <stdio.h> /* for printf */
#include <stdlib.h> /* for atexit */

void call_finalize(void) {MPI_Finalize();}
int main(int argc,char *argv[]) {
MPI_Init(&argc,&argv);
atexit(call_finalize); /*<- important to avoid weird errors! */

int rank=0,size=1;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);

int tag=17; /*<- random integer ID for this message exchange */
if (rank==0) {
int val=1234;
MPI_Send(&val,1,MPI_INT, size-1,tag,MPI_COMM_WORLD);
printf("Rank %d sent value %d\n",rank,val);
}
if (rank==size-1) {
MPI_Status sts;
int val=0;
MPI_Recv(&val,1,MPI_INT, MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&sts);
printf("Rank %d received value %d\n",rank,val);
}
return 0;
}
(Try this in NetRun now!)

Here's a simple parameter sweep program:
#include <mpi.h>
#include <vector>

extern "C" void call_mpi_finalize(void) {MPI_Finalize();}
int main(int argc,char **argv)
{
MPI_Init(&argc,&argv);
atexit(call_mpi_finalize); // call MPI_Finalize before exiting
int rank=0, size=1; // my processor, n processors
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
std::cout<<"I am rank "<<rank<<" of "<<size<<"\n";

// Do local work
int n=1000000; // problem size across whole machine
int first=rank*n/size; // start of my piece of the problem
int last=(rank+1)*n/size;
std::vector<int> good;
for (int i=first;i<last;i++) {
if ((i%17==0) && (i%451==13)) good.push_back(i);
}

// Sum local pieces across processors
int total=0;
int mygood=good.size();
MPI_Reduce(&mygood,&total,1,MPI_INT, MPI_SUM, 0,MPI_COMM_WORLD);
// rank 0 now has the total, so send it out to everybody
MPI_Bcast(&total,1,MPI_INT, 0,MPI_COMM_WORLD);
std::cout<<"Total n good: "<<total<<"\n";

return 0;
}

(Try this in NetRun now!)


Here's a simple MPI integration program:
#include "mpi.h"
#include <math.h>

int main(int argc,char *argv[]) {
MPI_Init(&argc,&argv); /* <- *sets* argc and argv! */
int rank=0, size=1;
MPI_Comm_size(MPI_COMM_WORLD,&size);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
if (rank==0) std::cout<<"Started up: I am "<<rank<<" of "<<size<<"\n";
double start=MPI_Wtime();

int total=10000000; // iterations for all processors
int per=total/size; // iterations per processor
int first=per*rank; // my first iteration
int last=first+per; // my last iteration

double t=0.0;
for (int i=first;i<last;i++) t+=sin(log(1.0+i));

// Sum up "t" across all processors
double sumt=0.0;
MPI_Reduce(&t,&sumt,1,MPI_DOUBLE,MPI_SUM,0, MPI_COMM_WORLD);

if (rank==0) {
double elapsed=MPI_Wtime()-start;
std::cout<<"t="<<sumt<<" in "<<elapsed<<"s on "<<size<<" cpus\n";
}
MPI_Finalize();
return 0;
}

(Try this in NetRun now!)


Here's a more complex program that renders parts of the mandelbrot set on each MPI process, and assembles the pieces on rank 0:
/* 
Mandelbrot renderer in MPI
Dr. Orion Lawlor, 2010-11-30 (Public Domain)
*/
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <complex>

/**
A linear function in 2 dimensions: returns a double as a function of (x,y).
*/
class linear2d_function {
public:
double a,b,c;
void set(double a_,double b_,double c_) {a=a_;b=b_;c=c_;}
linear2d_function(double a_,double b_,double c_) {set(a_,b_,c_);}
double evaluate(double x,double y) const {return x*a+y*b+c;}
};

const int wid=1000, ht=1000;
// Set up coordinate system to render the Mandelbrot Set:
double scale=3.0/wid;
linear2d_function fx(scale,0.0,-1.0); // returns c given pixels
linear2d_function fy(0.0,scale,-1.0);

char render_mset(int x,int y) {
/* Walk this Mandelbrot Set pixel */
typedef std::complex<double> COMPLEX;
COMPLEX c(fx.evaluate(x,y),fy.evaluate(x,y));
COMPLEX z(0.0);
int count;
enum {max_count=256};
for (count=0;count<max_count;count++) {
z=z*z+c;
if ((z.real()*z.real()+z.imag()*z.imag())>4.0) break;
}

return count;
}

class row {
public:
char data[wid];
};


int main(int argc,char *argv[]) {
/* MPI's args, MPI's random working directory */
MPI_Init(&argc,&argv);
/* Your command line args, your working directory */

int size,rank;
MPI_Comm_size(MPI_COMM_WORLD,&size);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);

std::cout<<"I am "<<rank<<" of "<<size<<"\n";


int procpiece=ht/size; int gystart=rank*procpiece;
row limg[procpiece]; /* local copy of the final image */

double start=MPI_Wtime();

/* Render our piece of the image */
for (int y=0;y<procpiece;y++)
{
for (int x=0;x<wid;x++) limg[y].data[x]=render_mset(x,gystart+y);
}
double elapsed_compute=MPI_Wtime()-start;

int tag=12378;
if (rank>0)
{ /* send our partial piece to rank 0 */
//skt_sendN(s[0],&limg[0].data[0],sizeof(row)*procpiece);
MPI_Send(&limg[0].data[0],sizeof(row)*procpiece,MPI_CHAR,
0,tag,MPI_COMM_WORLD);
}
else
{ /* rank 0: receive partial pieces from other ranks */
row gimg[ht];
for (int r=0;r<size;r++)
if (r==0) {
memcpy(gimg,limg,sizeof(row)*procpiece);
} else {
//skt_recvN(s[r],&gimg[r*procpiece].data[0],
// sizeof(row)*procpiece);
MPI_Status status;
MPI_Recv(&gimg[r*procpiece].data[0],sizeof(row)*procpiece,MPI_CHAR,
r,tag,MPI_COMM_WORLD,&status);
}
/* Print out assembled image */
std::ofstream of("out.ppm",std::ios_base::binary);
of<<"P5\n"; // greyscale, binary
of<<wid<<" "<<ht<<"\n"; // image size
of<<"255\n"; // byte image
of.write(&gimg[0].data[0],sizeof(row)*ht);
}

double elapsed_send=MPI_Wtime()-start;
std::cout<<"Rank "<<rank<<": "<<1000.0*elapsed_compute<<"ms compute, "<<
1000.0*elapsed_send<<"ms total\n";

MPI_Finalize();
return 0;
}
(Try this in NetRun now!)

MPI Performance on the Powerwall

The UAF Bioinformatics Powerwall is a fairly typical 2006-era cluster, with gigabit ethernet connecting ten dual-core nodes.  It's running OpenMPI 1.3.  Here's how it performs:
Test send/recv_int took 47.113us per iteration (10000 iterations)
Sending even a small, one-int message takes a *long* time (high alpha cost; about 50 us).  This is mostly a function of your network hardware.

Test sandbag (100) took 49.522us per iteration (1000 iterations)
The "sandbag" is my function that does CPU work.

Test send/recv_int + sandbag(100) took 96.148us per iteration (10000 iterations)
If we both compute, and communicate, the time should add close to linearly if the CPU and network are not running at the same time.
Test isend/irecv_int + sandbag(100) took 53.740us per iteration (100 iterations)
For better communication performance, use Isend and Irecv, which are "nonblocking": the CPU can keep working while the network talks.
Test send/recv_zero length took 46.548us per iteration (10000 iterations)
Test send/recv_1k took 72.320us per iteration (100 iterations)
Test send/recv_1meg took 9169.580us per iteration (100 iterations)
Short messages, even zero length, are expensive due to alpha cost.  A 1-kilobyte message costs only 30% more than a one-byte message!  Communication costs under 10ns/byte for long messages, since the startup overhead amortizes away.  OpenMPI really can deliver over 100MB/sec on gigabit ethernet.
Test send/recv_int_overlap took 2.792us per iteration (1000 iterations)
Here's another curiousity--if I do repeated one-int sends to the same destination, MPI is smart enough to start bundling the ints together into longer messages.  Of course, it's still far more expensive than just bundling them yourself.
Test send/recv_1meg + sandbag(10000) took 14111.791us per iteration (100 iterations)
Test isend/irecv_1meg + sandbag(10000) took 14018.261us per iteration (100 iterations)
However, MPI doesn't seem to overlap long messages very well--this is clearly first compute, then communicate.  (I might need to add an "MPI_Test" call or something inside my sandbag function.)  Isend doesn't help much here either, for some reason.
(2 cpus) Test barrier took 54.131us per iteration (1000 iterations)
(10 cpus)Test barrier took 242.433us per iteration (1000 iterations)
A barrier requires only one synchronizing message on two CPUs.
(2 cpus) Test bcast_int + barrier took 100.409us per iteration (1000 iterations)
(10 cpus)Test bcast_int + barrier took 390.508us per iteration (1000 iterations)
A broadcast costs about the same as one network message on two processors.  On more processors, it's more expensive, but not tenfold more like the naive "process zero sends once to everybody" algorithm.  On modern hardware, broadcast is very fast.
(2 cpus) Test reduce_int + barrier took 100.939us per iteration (1000 iterations)
(10 cpus)Test reduce_int + barrier took 328.141us per iteration (1000 iterations)
Similarly, a reduction costs basically one message (on two processors).