**[Michael Wehner](mailto:mfwehner@lbl.gov) and [Hari Krishnan](mailto:hkrishnan@lbl.gov), Lawrence Berkeley National Laboratory**

## Introduction

Most climate data analyses have at least one dimension that be exploited at NERSC in an embarrassingly parallel manner. In fact, the most common of these is simply time. The scripts presented here are a general solution to take advantage of temporal parallelism for a wide variety of lengthy UVCDAT calculations.

Typically, long time series climate data sets are spread across multiple files, usually to keep the file sizes manageable. The UVCDAT script called `cdscan` is used to construct an xml file to be read by the `cdms2` module as a single pseudo data file that contains the entire time domain. In the case of exploiting temporal parallelism, it is most straightforward to instead keep the files separate and assign a single processor to each file. Hence, in order to get high degrees of parallelism, more files of short time intervals is actually better than a few lengthy files. Another often parallel dimension is found in ensemble simulations, where files are often arranged by realization number.

In order to assign individual files to individual processors, we use the `mpi4py` module. The total number of MPI tasks is set equal to the total number of processors. For instance, if the number of input files is 48, the following simple batch command will work on hopper.nersc.gov or edison.nersc.gov.

“`
qsub python_test.pbs
“`

where the batch input file to execute the parallel UVCDAT script, `python_test.py`, described below, is as follows:

### python_test.pbs

“`
#PBS -q debug
#PBS -l mppwidth=48 #PBS -l walltime=00:30:00
#PBS -N python_test
#PBS -e python_test.$PBS_JOBID.err
#PBS -o python_test.$PBS_JOBID.out
#PBS -V

module load cascade
module load mpi4py

cd $SCRATCH/my-directory

aprun -n 48 python python_test.py –parallel *nc
“`

In this example, it assumed that there are 48 netcdf files in $SCRATCH/my-directory. This job would use 2 nodes on hopper or edison.

**Note: It is highly recommended that you perform all parallel operations in the scratch directories or other parallel file systems. Jobs executed on parallel file systems such as your home directory or /project could be up to 10 times slower than on a parallel file systems such as LUSTRE.**

This script uses the `cascade` module, but you may replace that with `module load uvcdat`

## Memory limits

In many cases, there will not be enough memory per processor to run your script. There are many weird error messages that can be returned from batch runs. In the case of not enough memory per processor, the log file, python_test.$PBS_JOBID.out, where $PBS_JOBID is a number, will contain a message like this:

“`
[NID 05718] 2015-03-18 17:07:02 Apid 47875875: OOM killer terminated this process.
“`

To correct this error, you will need to idle out some processors. This script below will use only 12 of each of the 24 processors per hopper node. Hence, it asks for twice as many processors (96) to run 48 MPI tasks. The NERSC website can tell you how much memory per node is available. If you know the memory footprint of your code, you can calculate `mppnppn` by dividing the available memory on a node by the code’s memory footprint. It is not uncommon to use values of 4 or less for high resolution data sets. Note that the value of –N in the `aprun` command is equal to `mppnppn` and should be an integer divisor of the number of processors per node (24 on hopper and edison). Also note that the newly recommended syntax is to continue to set `mppwidth` to the number of tasks actually required.

### python_test_2xMemory.pbs

“`
#PBS -q debug
#PBS -l mppwidth=48
#PBS -l mppnppn=12
#PBS -l walltime=00:30:00
#PBS -N python_test
#PBS -e python_test.$PBS_JOBID.err #PBS -o python_test.$PBS_JOBID.out
#PBS -V

module load cascade
module load mpi4py

cd /global/project/projectdirs/m1517/tmp/python_test

aprun -n 48 –N 12 python python_test.py –parallel *nc
“`

This job would use 4 nodes on hopper or edison.

## UVCDAT example using mpi4py

In order to robustly enable multiple MPI tasks, see the python script below. Real scripts will be more complex, but this will work as a stencil.

### python_test.py

“`python
import sys, cdms2, string
# The parallel branch
if sys.argv[1]==”–parallel”:
# note that mpi4py has been imported in the batch script. You may want to do it here instead in some cases.
# import mpi4py
from mpi4py import MPI
comm = MPI.COMM_WORLD
# Size is the number of tasks controlled by –n in aprun. 48 in this example.
size = comm.Get_size()
# rank is the id of this task and hence this processor.
rank = comm.Get_rank()
# files is a list of the input file names determined by the filtered list on the aprun line.
files=sys.argv[2:]
# file_name is the name of the file to be processed by this task.
file_name=files[rank]
# A serial branch that we find useful to test code.
# The execute line is:
# python python_test.py some_netcdf_file.nc
if sys.argv[1]!=’–parallel’:
rank=0
file_name=sys.argv[1]

# The main body of the code. Likely extracted from a current serial code.
print “processor “+string.zfill(rank,4)+” is starting “+file_name

try:
f=cdms2.open(file_name)
# Do some fancy math on file_name with python code here. var=f(‘tas’) # etc.
# Or you can call some other kind of program, such as C, Fortran, ncl, etc. import os
os.system(‘some_serial_code.x ‘+file_name)
print “Rank”,rank, ” succeeded!!”
except:
print “Rank”,rank, ” failed!!!!!! ”

sys.exit(0)
# end of python_test.py
“`

Print statements will be found in the logfile `python_test.$PBS_JOBID.out`. The try/except structure is an attempt to enable parallel jobs with a few bad input files to complete the tasks on the good files. In cases with thousands of files, you don’t want the job killed because one of those files is corrupt. You can find the bad input files quickly by grepping on “failed” in the logfile, then grepping again for the rank numbers that failed. The order of print statements will be pretty random as tasks do not appear to initiate in lock step. Without the try/except coding structure, any error will cause all tasks to end immediately. With this coding structure, if only a few files are corrupted, you can go back and run your script serially afterwards on the repaired files. This will be a lot faster than waiting again in the queue for 10000 processors. This trick appears to work, but we there are segmentation errors that it does not capture. A robust exit strategy to capture errors without ending all tasks remains to be developed.

Also, this example can be used as a quick and dirty way to call an external serial program on many files in parallel as indicated above. This may be a good way to quickly parallize ncl/nco scripts or compiled programs. But note that it will fail when calling serial python scripts due to conflicts. So for python scripts, you will need to modify the code itself per this example.

## Efficiency issues.

As there is no interprocessor communication in this embarrassingly parallel example, you might expect near 100% parallel efficiency. This will not likely be the case. As noted above, efficiency will be much better on parallel file systems. However even then, `cdms2` is not designed for parallel input and output, hence there is contention for i/o resources. We find that i/o is generally the biggest single computational efficiency issue. Nonetheless, we have reduced throughput from weeks to hours in many cases. We do not recommend trying to read xml files constructed by `cdscan`, as contention for the scanned files is large.

Furthermore, we find that a large number of shorter duration files, each read by a single processor, is faster than having multiple processors read segments of longer duration files, due to file contention resulting from the lack of parallel i/o UVCDAT modules. However, we do note that if time is not an embarrassingly parallel dimension than space often is (such as a temporal averaging or similar calendar operations). In this case, you most likely will need to extract data from single files by many processors and efficiency will suffer. However, throughput gains over serial execution may still be possible.

## Limitations of these scripts.

These scripts deal with only one use case. That is a parallel execution of a serial script that reads one file per task. Although this is often useful, especially when time is an embarrassingly parallel dimension, there can be more parallelism to be had if files contain more than one time step. In this case, code could be written to have multiple files read from the same file. However, without a parallel `cdms2`, efficiency is limited. Regarding output, writing multiple files is not too troublesome as `cdscan` can be used to serially join them afterwards.

Finally, depending on your patience, it may not be worth the trouble to run UVCDAT in parallel at all. Queue waiting times can be many hours, even days. We do not generally bother with a parallel execution if a serial execution can be run in a day or less. However, when execution time (including queue waiting times) can be reduced from months to days or hours, parallel UVCDAT execution can enable analyses heretofore impractical if not impossible.

Additionally posted on: https://github.com/UV-CDAT/uvcdat/wiki/How-to-run-UV-CDAT-in-parallel-at-NERSC