A guide to using OpenMP and MPI on SeaWulf
This article will discuss how to successfully parallelize jobs using OpenMP, Message Passing Interface (MPI), and a combination of the two.
In part 1, we saw how to run embarrassingly parallel workflows on SeaWulf. In many cases, however, users may find that some jobs require parallelization where different parts of the analysis (e.g., threads or processes) need to communicate with each other to accomplish some task. In those instances, a few different parallelization frameworks can be utilized.
- For single-node, shared memory jobs, code written in C, C++, and Fortran can take advantage of the OpenMP API to tackle a problem using multiple threads.
- Conversely, MPI can be used in distributed memory situations, to enable parallel processing and communication across multiple physically distinct nodes. In fact, MPI is almost always required to successfully use multiple nodes for a single job (one major exception to this being Slurm array jobs).
- Finally, so-called "hybrid" parallelization can be performed, where MPI and OpenMP are used in combination.
We will illustrate OpenMP, MPI, and hybrid parallelization using example C codes that estimate the value of pi using the method discussed here.
All codes used in this article are taken from the pi calculation benchmark repository. Sample codes can be found on SeaWulf in the following directory:
/gpfs/projects/samples/pi
Serial pi calculation
To contrast with the following parallel examples, here is a single-core, serial code that sums 1 billion floating point operations to estimate pi:
#include <stdio.h> #include <time.h> #define NPTS 1000000000 void main() { int i; double a,b,c,pi,dt,mflops; struct timespec tstart,tend; clock_gettime(CLOCK_REALTIME,&tstart); a = 0.5; b = 0.75; c = 0.25; pi = 0; for (i = 1; i <= NPTS; ++i) pi += a/((i-b)*(i-c)); clock_gettime(CLOCK_REALTIME,&tend); dt = (tend.tv_sec+tend.tv_nsec/1e9)-(tstart.tv_sec+tstart.tv_nsec/1e9); mflops = NPTS*5.0/(dt*1e6); printf("NPTS = %d, pi = %f\n",NPTS,pi); printf("time = %f, estimated MFlops = %f\n",dt,mflops); }
Let's write and submit a Slurm job script to run the serial calculation:
#!/usr/bin/env bash #SBATCH --job-name=serial_pi #SBATCH --output=serial_pi.log #SBATCH --nodes=1 #SBATCH --time=05:00 #SBATCH -p short-28core # load a gcc module module load gcc/12.1.0 # compile the code gcc /gpfs/projects/samples/pi/pi.c -o pi -lm # execute the code ./pi
In the above, we load a module to access the latest version of the GCC compilers (although for the serial and OpenMP examples, we could also just use the system default GCC), and then we compile and execute the code.
The output should look something like this:
NPTS = 1000000000, pi = 3.141593 time = 4.218491, estimated MFlops = 1185.257929
We have approximated pi to 6 decimal points in 4.2 seconds.
OpenMP pi calculation
Now, let's try using OpenMP to enable the code to utilize multiple threads on the same node:
#include <stdio.h> #include <time.h> #include <omp.h> #include <stdint.h> #define NPTS 1000000000 void main() { uint64_t i; double a,b,c,pi,dt,mflops; struct timespec tstart,tend; clock_gettime(CLOCK_REALTIME,&tstart); a = 0.5; b = 0.75; c = 0.25; pi = 0; #pragma omp parallel for reduction(+:pi) for (i = 1; i <= NPTS; ++i) pi += a/((i-b)*(i-c)); clock_gettime(CLOCK_REALTIME,&tend); dt = (tend.tv_sec+tend.tv_nsec/1e9)-(tstart.tv_sec+tstart.tv_nsec/1e9); mflops = NPTS*5.0/(dt*1e6); printf("NPTS = %ld, pi = %f, threads = %d\n",NPTS,pi,omp_get_max_threads()); printf("time = %f, estimated MFlops = %f\n",dt,mflops); }
This code is similar to the previous serial version, but the calculations in the for loop are now being parallelized across a user-defined number of different threads. Let's submit another Slurm job to compile and run the code:
#!/usr/bin/env bash #SBATCH --job-name=openmp_pi #SBATCH --output=openmp_pi.log #SBATCH --time=05:00 #SBATCH -p short-28core # load a gcc module module load gcc/12.1.0 # Environment variable to set how many threads we'll be using export OMP_NUM_THREADS=28 # compile the code gcc /gpfs/projects/samples/pi/openmp_pi.c -o openmp_pi -fopenmp # execute the code ./openmp_pi
We've done a couple of important things in this script that are worth highlighting.
First, we have set the OMP_NUM_THREADS environment variable. This controls how many threads OpenMP will use when running the code. In this case, we've set the value equal to the number of cores on the node that job is running on. Often, you'll want to do the same for your jobs. However, it is important to note that most codes do not scale well to an ever-increasing number of threads/cores. At some point the performance will level off (or even decrease) as more threads are added. You may want to experiment with your codes to determine the number of threads that maximizes performance.
Second, when compiling the code we added the -fopenmp flag. This is a GCC-specific flag that tells the compiler to use OpenMP when compiling the code.
After running the job, you should get output in the log file that looks something like this:
NPTS = 1000000000, pi = 3.141593, threads = 28 time = 0.375795, estimated MFlops = 13305.130447
By using executing the code using 28 threads while keeping the total number of calculations constant, we have seen an order of magnitude speed up compared to the serial execution!
MPI pi calculation
Now let's use MPI to parallelize the pi estimation across all the cores on multiple nodes. Here is the code:
#include <stdio.h> #include <mpi.h> #include <sys/time.h> #define NPTS 1000000000 void main(int argc, char** argv) { int rank,nproc,i,istart,iend; unsigned long int start_time,end_time; struct timeval start,end; double sum,pi,mflops; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nproc); if (rank == 0) { gettimeofday(&start, NULL); istart = 1 + NPTS*((rank+0.0)/nproc); iend = NPTS*((rank+1.0)/nproc); sum = 0.0; for (i = istart; i <= iend; ++i) sum += 0.5/((i-0.75)*(i-0.25)); MPI_Reduce(&sum,&pi,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD); gettimeofday(&end, NULL); start_time = start.tv_sec * 1e6 + start.tv_usec; end_time = end.tv_sec * 1e6 + end.tv_usec; mflops = NPTS*(5.0/(end_time-start_time)); printf("processes = %d, NPTS = %ld, pi = %f\n",nproc,NPTS,pi); printf("time = %f, estimated MFlops = %f\n",(end_time-start_time)/1.0e6,mflops); } else { istart = 1 + NPTS*((rank+0.0)/nproc); iend = NPTS*((rank+1.0)/nproc); sum = 0.0; for (i = istart; i <= iend; ++i) sum += 0.5/((i-0.75)*(i-0.25)); MPI_Reduce(&sum,&pi,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD); } MPI_Finalize(); }
As we can see above, the addition of MPI functions has increased the complexity of the code significantly compared to the serial version. Luckily, there are many helpful tutorials on the web to help you get up-to-speed with writing MPI code.
Let's submit a 2 node job to compile and run this code:
#!/usr/bin/env bash #SBATCH --job-name=mpi_pi #SBATCH --output=mpi_pi.log #SBATCH --nodes=2 #SBATCH --ntasks-per-node=28 #SBATCH --time=05:00 #SBATCH -p short-28core # load an MPI module module load mvapich2/gcc12.1/2.3.7 # set env variables which may help performance export MV2_HOMOGENEOUS_CLUSTER=1 export MV2_ENABLE_AFFINITY=0 # compile the code with the mpi compiler wrapper mpicc /gpfs/projects/samples/pi/mpi_pi.c -o mpi_pi # execute the code with MPI mpirun ./mpi_pi
We've made quite a few changes to the Slurm script that need to be highlighted.
First, we've requested two nodes using the #SBATCH --nodes=2 flag. In general, you will almost never want to request multiple nodes for your Slurm job unless you're using MPI in the job. In addition, we've told Slurm that each node should utilize 28 tasks with the #SBATCH --ntasks-per-node=28 flag. This will allow us to create as many separate MPI processes (often referred to as "ranks") as there are cores on each node.
Next, we're loading a module to access our desired implementation of MPI. There are many different flavors of MPI that exist. But no MPI is available by default, so you must load a module to access the MPI executables and libraries.
On SeaWulf the officially supported flavors are Mvapich and Intel MPI, though we also have modules for OpenMPI available. Typically, we will have different versions of each MPI implementation compiled with different compilers or compiler versions (excluding Intel MPI, which uses its own Intel compilers). In nearly all cases, you will only want to have one MPI module loaded at a time.
Here, we've loaded the module for a recent implementation of Mvapich compiled with a recent version of GCC. Note that in this case loading the mvapich2/gcc12.1/2.3.7 modules also loads the associated GCC module.
Next, we're setting some Mvapich-specific environment variables, which can sometimes improve performance. Please see the official documentation or more details on these environment variables.
After that, we compile the code using an MPI wrapper script that wraps the GCC compiler commands. For GCC, the Mvapich compiler wrappers are:
Language | GCC command | Mvapich wrapper |
C | gcc | mpicc |
C++ | g++ | mpicxx |
Fortran | gfortran | mpif90 |
Finally, we execute the code using the mpirun (or mpiexec) command followed by the path to the compiled binary. The code will be parallelized based on the SBATCH flags that were provided to the Slurm workload manager. Note that there are many useful flags that can be provided to mpirun to customize the job's behavior. Please see the official documentation for your preferred MPI implementation for more details.
Let's look at the output:
processes = 48, NPTS = 1000000000, pi = 3.141593 time = 0.156269, estimated MFlops = 31996.109273
Our pi estimate using 48 processes across 2 nodes has finished in roughly half the time of the 28 thread OpenMP execution and ~28 times faster than the serial execution. What would happen if we double the number of nodes to 4 and the total number of processes to 96?
processes = 96, NPTS = 1000000000, pi = 3.141593 time = 0.092446, estimated MFlops = 54085.628367
In this case, the execution time has dropped further, but it is nowhere close to 96x faster than the serial execution.
While this is a dramatic and helpful speedup, it does illustrate one point of caution: parallelization with MPI (and even OpenMP) often incurs a computational penalty associated with communication between different nodes, processes, or threads. You are unlikely to see a persistent linear increase in job speed as the number of OpenMP threads or MPI processes increases.
Note also that MPI is not just limited to C, C++, and Fortran. On SeaWulf, MPI can be used with python (MPI4py) and R (Rmpi) as well. MPI4py is available via modules (e.g., mpi4py/3.0.3) that control access to custom conda environments. Likewise, Rmpi is implemented as an R library installed with its associated R module (e.g., R/4.2.1).
Hybrid OpenMP-MPI pi calculation
In some cases, it can be useful to combine OpenMP and MPI, such that MPI is used to parallelize a job across different nodes (or physical cpu socket within a node) but the intra-node (or intra-socket) parallelization is handled by OpenMP. This can be particularly useful for applications whose performance requires efficient memory use. By reducing the number of MPI ranks in favor of more OpenMP threads, shared memory can be used more efficiently and communication computational penalties may be reduced. However, every application is different and whether your code will perform better via a pure MPI or a hybrid model of parallelization requires experimentation to determine.
Here is the hybrid version of our pi calculation code:
#include <stdio.h> #include <mpi.h> #include <omp.h> #include <sys/time.h> #include <stdint.h> #define NPTS 10000000000 #define NLOOP 1 void main(int argc, char** argv) { uint32_t rank,nproc; uint64_t i,j,istart,iend,start_time,end_time; struct timeval start,end; double sum,pi,mflops,max; MPI_Init(&argc,&argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); MPI_Comm_size(MPI_COMM_WORLD,&nproc); max = 0; if (rank == 0) { istart = 1+rank*NPTS; iend = (rank+1)*NPTS; for (j = 0; j < NLOOP; ++j) { MPI_Barrier(MPI_COMM_WORLD); gettimeofday(&start,NULL); sum = 0.0; #pragma omp parallel for reduction(+:sum) for (i = istart; i <= iend; ++i) sum += 0.5/((i-0.75)*(i-0.25)); MPI_Reduce(&sum,&pi,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD); gettimeofday(&end,NULL); start_time = start.tv_sec*1e6+start.tv_usec; end_time = end.tv_sec*1e6+end.tv_usec; mflops = nproc*NPTS*(5.0/(end_time-start_time)); printf("processes = %d, threads = %d, NPTS = %ld, pi = %f\n",nproc,omp_get_max_threads(),NPTS,pi); if (mflops > max) max = mflops; printf("time = %f, estimated MFlops = %f, max = %f\n",(end_time-start_time)/1.0e6,mflops,max); } } else { istart = 1+rank*NPTS; iend = (rank+1)*NPTS; for (j = 0; j < NLOOP; ++j) { MPI_Barrier(MPI_COMM_WORLD); sum = 0.0; #pragma omp parallel for reduction(+:sum) for (i = istart; i <= iend; ++i) sum += 0.5/((i-0.75)*(i-0.25)); MPI_Reduce(&sum,&pi,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD); } } MPI_Finalize(); }
In this case, we generate a certain number of MPI processes, and then for each process spawn a number of OpenMP threads to perform the floating point calculations. Note that this code is designed to be run multiple times in a loop, but in order to simplify the output, the loop size has been reduced to 1.
Let's submit another Slurm job to compile and run the code:
#!/usr/bin/env bash #SBATCH --job-name=hybrid_pi #SBATCH --output=hybrid_pi.log #SBATCH --nodes=4 #SBATCH --ntasks-per-node=1 #SBATCH --cpus-per-task=28 #SBATCH --time=05:00 #SBATCH -p short-28core # load an mpi module module load mvapich2/gcc12.1/2.3.7 # set env variables which may help performance export MV2_HOMOGENEOUS_CLUSTER=1 export MV2_ENABLE_AFFINITY=0 # set the number of OpenMP threads to the number of cores per task export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK # compile the code with the mpi compiler wrapper mpicc /gpfs/projects/samples/pi/hybrid_pi.c -o hybrid_pi -fopenmp # execute the code with MPI mpirun ./hybrid_pi
With hybrid parallelization, it's even more critical than normal to clearly specify the computational resources for your job. In this case, we're requesting 4 nodes, with a single MPI task per node. However, each task is assigned all the cores on the node with the #SBATCH --cpus-per-task=28 flag. This flag also sets the SLURM_CPUS_PER_TASK environment variable, which we use to specify the number of OpenMP threads.
There are many other ways we could have assigned all the cores for this job, perhaps running a handful of MPI tasks per node (e.g., one per socket or NUMA region) with multiple threads assigned to each MPI task. Again, trial and error will likely be required to determine the optimal configuration for your job.
In addition, we compile the code with the mpicc MPI wrapper but also include the -fopenmp GCC flag to enable OpenMP.
Let's look at the output:
processes = 4, threads = 28, NPTS = 10000000000, pi = 3.141593 time = 2.078727, estimated MFlops = 96212.730195, max = 96212.730195
In this case, with 4 MPI tasks, each using 28 threads, the performance is around 2x the serial performance, but worse than either the single node OpenMP performance or the pure MPI performance using an equivalent number of CPUs. It seems that for this pi estimation job, pure MPI parallelization is probably the superior option. Be sure to test on multiple types of nodes to determine what type of parallelization provides the best performance.
Hopefully these examples make clear that both OpenMP and MPI are powerful, if complex, tools that can be used to help parallelize your communication-dependent workflows.
For more information on how to handle embarrassingly parallel workflows, please see part 1.