Posts Tagged ‘mpi4py’

h1

mpi4py parallel IO example

September 23, 2010

For about 9 months I have  been running python jobs in parallel using mpi4py and NumPy. I had to write a new algorithm with MPI  so I decided to do the IO in parallel. Below is a small example of reading data in parallel. Mpi4py is lacking examples. It is not pretty, however, it does work.

import mpi4py.MPI as MPI
import numpy as np
class Particle_parallel():
    """ Particle_parallel - distributed reading of x-y-z coordinates.

    Designed to split the vectors as evenly as possible except for rounding
    ont the last processor.

    File format:
        32bit int :Data dimensions which should = 3
        32bit int :n_particles
        64bit float (n_particles) : x-coordinates
        64bit float (n_particles) : y-coordinates
        64bit float (n_particles) : z-coordinates
    """
    def __init__(self, file_name,comm):
        self.comm = comm
        self.rank = self.comm.Get_rank()
        self.size = self.comm.Get_size()
        self.data_type_size = 8
        self.mpi_file = MPI.File.Open(self.comm, file_name)
        self.data_dim = np.zeros(1, dtype = np.dtype('i4'))
        self.n_particles = np.zeros(1, dtype = np.dtype('i4'))
        self.file_name = file_name
        self.debug = True

    def info(self):
        """ Distrubute the required information for reading to all ranks.

        Every rank must run this funciton.
        Each machine needs data_dim and n_particles.
        """
        # get info on all machines
        self.mpi_file.Read_all([self.data_dim, MPI.INT])
        self.mpi_file.Read_all([self.n_particles, MPI.INT])
        self.data_start = self.mpi_file.Get_position()
    def read(self):
        """ Read data and return the processors part of the coordinates to:
            self.x_proc
            self.y_proc
            self.z_proc
        """
        assert self.data_dim != 0
        # First establish rank's vector sizes
        default_size = np.ceil(self.n_particles / self.size)
        # Rounding errors here should not be a problem unless
        # default size is very small
        end_size = self.n_particles - (default_size * (self.size - 1))
        assert end_size >= 1
        if (self.rank == (self.size - 1)):
            self.proc_vector_size = end_size
        else:
            self.proc_vector_size = default_size
        # Create individual processor pointers
        #
        x_start = int(self.data_start + self.rank * default_size *
                self.data_type_size)
        y_start = int(self.data_start + self.rank * default_size *
                self.data_type_size +  self.n_particles *
                self.data_type_size * 1)
        z_start = int(self.data_start + self.rank * default_size *
                self.data_type_size + self.n_particles *
                self.data_type_size * 2)
        self.x_proc = np.zeros(self.proc_vector_size)
        self.y_proc = np.zeros(self.proc_vector_size)
        self.z_proc = np.zeros(self.proc_vector_size)
        # Seek to x
        self.mpi_file.Seek(x_start)
        if self.debug:
            print 'MPI Read'
        self.mpi_file.Read([self.x_proc, MPI.DOUBLE])
        if self.rank:
            print 'MPI Read done'
        self.mpi_file.Seek(y_start)
        self.mpi_file.Read([self.y_proc, MPI.DOUBLE])
        self.mpi_file.Seek(z_start)
        self.mpi_file.Read([self.z_proc, MPI.DOUBLE])
        self.comm.Barrier()
        return self.x_proc, self.y_proc, self.z_proc
    def Close(self):
        self.mpi_file.Close()
Advertisements