Logo Search packages:      
Sourcecode: python-scientific version File versions  Download package

Scientific_mpi.c

/*
 * Low-level MPI interface routines
 *
 * Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
 *        and Jakob Schiotz <schiotz@fysik.dtu.dk>
 *        and Ciro Cattuto <ciro@prosa.it>
 * last revision: 2004-3-29
 */


#include "Python.h"
#include "assert.h"

#define _MPI_MODULE
#include "Scientific/mpimodule.h"

#define PyMPI_TEST(rc, funcname) if ((rc) != MPI_SUCCESS) \
      return PyMPI_SetError(rc, funcname)

/* Global variables */

PyObject *PyExc_MPIError;
const int max_tag = 32767;  /* Minimal value required by MPI norm */

/* We need a few forward declarations */
staticforward PyMPIRequestObject *
newPyMPIRequestObject(MPI_Request rq, PyObject *buffer, int operation,
                  MPI_Datatype mpi_type);

staticforward PyMPIOperationObject *
newPyMPIOperationObject(MPI_Op mpi_op, char *op_name);


/* Utility functions */

static MPI_Datatype
mpi_type(int py_type)
{
  switch(py_type) {
  case PyArray_CHAR:
    return MPI_CHAR;
  case PyArray_UBYTE:
  case PyArray_SBYTE:
    return MPI_BYTE;
  case PyArray_SHORT:
    return MPI_SHORT;
  case PyArray_INT:
    return MPI_INT;
  case PyArray_LONG:
    return MPI_LONG;
  case PyArray_FLOAT:
  case PyArray_CFLOAT:
    return MPI_FLOAT;
  case PyArray_DOUBLE:
  case PyArray_CDOUBLE:
    return MPI_DOUBLE;
  }
  return 0;
}

/* ...should probably be a macro */
static MPI_Op
mpi_op(PyMPIOperationObject *op)
{
  return op->mpi_op;
}

/* Doubles the count for sending/receiving complex numbers. */
static int
mpi_count_factor(int py_type)
{
  switch(py_type) {
  case PyArray_CFLOAT:
  case PyArray_CDOUBLE:
    return 2;
  default:
    return 1;
  }
}

static PyObject *
PyMPI_SetError(int errcode, char *funcname)
{
  char mpierr[MPI_MAX_ERROR_STRING];
  int errlen;
  
  MPI_Error_string(errcode, mpierr, &errlen);
  PyErr_Format(PyExc_MPIError, "%s failed: %s", funcname, mpierr);
  return NULL;
}


/***************************************/
/* MPI Operation object implementation */
/***************************************/

staticforward PyTypeObject PyMPIOperation_Type;

static PyMPIOperationObject *
newPyMPIOperationObject(MPI_Op mpi_op, char *op_name)
{
  PyMPIOperationObject *self;

  self = PyObject_NEW(PyMPIOperationObject, &PyMPIOperation_Type);
  if (self == NULL)
    return NULL;
  self->mpi_op = mpi_op;
      strcpy(self->op_name, op_name);
  return self;
}

/* Deallocate the object */
static void
PyMPIOperation_dealloc(PyMPIOperationObject *self)
{
  PyMem_DEL(self);
}

/* __repr__ */
/* should check for rbuffer overflow */
static PyObject *
PyMPIOperation_repr(PyMPIOperationObject *self)
{
  char rbuffer[256];

  sprintf(rbuffer,
        "<PyMPIOperation at %lx: operation = %s>",
        (long)self, self->op_name);
  return PyString_FromString(rbuffer);
}

statichere PyTypeObject PyMPIOperation_Type = {
      PyObject_HEAD_INIT(NULL)
      0,                      /*ob_size*/
      "PyMPIOperation",       /*tp_name*/
      sizeof(PyMPIOperationObject), /*tp_basicsize*/
      0,                      /*tp_itemsize*/
      /* methods */
      (destructor) PyMPIOperation_dealloc, /*tp_dealloc*/
      0,                      /*tp_print*/
      0,                      /*tp_getattr*/
      0,                            /*tp_setattr*/
      0,                      /*tp_compare*/
      (reprfunc) PyMPIOperation_repr,     /*tp_repr*/
      0,                      /*tp_as_number*/
      0,                      /*tp_as_sequence*/
      0,                      /*tp_as_mapping*/
      0,                      /*tp_hash*/
      0,                      /*tp_call*/
      0                         /*tp_str*/
};

/******************************************/
/* MPI communicator object implementation */
/******************************************/

staticforward PyTypeObject PyMPICommunicator_Type;

static PyMPICommunicatorObject *
newPyMPICommunicatorObject(MPI_Comm handle)
{
  PyMPICommunicatorObject *self;

  self = PyObject_NEW(PyMPICommunicatorObject, &PyMPICommunicator_Type);
  if (self == NULL)
    return NULL;
  self->handle = handle;
  if (MPI_Comm_rank(handle, &self->rank) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "couldn't obtain communicator rank");
    PyMem_DEL(self);
    return NULL;
  }
  if (MPI_Comm_size(handle, &self->size) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "couldn't obtain communicator size");
    PyMem_DEL(self);
    return NULL;
  }
  return self;
}

/*
 * PyMPICommunicator methods
 */

static void
PyMPICommunicator_dealloc(PyMPICommunicatorObject *self)
{
  if (self->handle != MPI_COMM_WORLD)
    MPI_Comm_free(&self->handle);
  PyMem_DEL(self);
}


/* Duplicate */

static PyObject *
PyMPI_DuplicateCommunicator(PyMPICommunicatorObject *comm)
{
  MPI_Comm new;
  if (MPI_Comm_dup(comm->handle, &new) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Comm_dup failed");
    return NULL;
  }
  return (PyObject *)newPyMPICommunicatorObject(new);
}

static PyObject *
PyMPICommunicator_duplicate(PyMPICommunicatorObject *self, PyObject *args)
{
  if (!PyArg_ParseTuple(args, ""))
    return NULL;
  return PyMPI_DuplicateCommunicator(self);
}


/* Subset */

static PyObject *
PyMPI_SubsetCommunicator(PyMPICommunicatorObject *comm, PyArrayObject *array)
{
  MPI_Group group;
  MPI_Group newgroup;
  MPI_Comm new;
  int *ranks;
  int dimension, i1, i2;

  if (MPI_Comm_group(comm->handle, &group) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Comm_group failed");
    return NULL;
  }
  dimension = array->dimensions[0];
  ranks = (int*)(array->data);
  for (i1 = 0; i1 < dimension; i1++) {
    if (ranks[i1] < 0 || ranks[i1] >= comm->size) {
      PyErr_SetString(PyExc_MPIError, "invalid MPI rank");
      return NULL;
    }
    for (i2 = 0; i2 < i1; i2++)
      if (ranks[i1] == ranks[i2]) {
      PyErr_SetString(PyExc_MPIError, "duplicated MPI rank");
      return NULL;
      }
  }
  if (MPI_Group_incl(group, dimension, ranks, &newgroup) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Group_incl failed");
    return NULL;
  }
  if (MPI_Comm_create(comm->handle, newgroup, &new) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Comm_create failed");
    return NULL;
  }
  if (MPI_Group_free(&newgroup) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Group_free failed");
    return NULL;
  }
  if (MPI_Group_free(&group) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Group_free failed");
    return NULL;
  }
  if (new == MPI_COMM_NULL) {
    Py_INCREF(Py_None);
    return Py_None;
  }
  else
    return (PyObject *)newPyMPICommunicatorObject(new);
}

static PyObject *
PyMPICommunicator_subset(PyMPICommunicatorObject *self, PyObject *args)
{
  PyObject *ranks;
  PyObject *new;
  if (!PyArg_ParseTuple(args, "O", &ranks))
    return NULL;
  ranks = PyArray_ContiguousFromObject(ranks, PyArray_INT, 1, 1);
  if (ranks == NULL)
    return NULL;
  new = PyMPI_SubsetCommunicator(self, (PyArrayObject *)ranks);
  Py_DECREF(ranks);
  return new;
}


/* Barrier */

static int
PyMPI_Barrier(PyMPICommunicatorObject *comm)
{
  return MPI_Barrier(comm->handle);
}

static PyObject *
PyMPICommunicator_barrier(PyMPICommunicatorObject *self, PyObject *args)
{
  if (!PyArg_ParseTuple(args, ""))
    return NULL;
  if (PyMPI_Barrier(self) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Barrier failed");
    return NULL;
  }
  Py_INCREF(Py_None);
  return Py_None;
}

/* Send data */

static int
PyMPI_Send(PyMPICommunicatorObject *comm, void *data, int type, int len,
         int dest, int tag)
{
  return MPI_Send(data, len, mpi_type(type), dest, tag, comm->handle);
}

static int
PyMPI_SendArray(PyMPICommunicatorObject *comm, PyArrayObject *array,
            int dest, int tag)
{
  int count;
  int error;
  int i;

  if (tag < 0 || tag > max_tag) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI tag");
    return -1;
  }
  if (dest < 0 || dest >= comm->size) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI destination");
    return -1;
  }
  if (PyArray_ISCONTIGUOUS(array))
    Py_INCREF(array);
  else {
    array = (PyArrayObject *)PyArray_ContiguousFromObject((PyObject *)array,
                                            PyArray_NOTYPE,
                                            0, 0);
    if (array == NULL)
      return -1;
  }
  count = 1;
  for (i = 0; i < array->nd; i++)
    count *= array->dimensions[i];
  count *= mpi_count_factor(array->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Send(comm, array->data, array->descr->type_num,
                 count, dest, tag);
  Py_END_ALLOW_THREADS;
  Py_DECREF(array);
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Send failed");
    return -1;
  }
  return 0;
}

static int
PyMPI_SendString(PyMPICommunicatorObject *comm, PyStringObject *string,
            int dest, int tag)
{
  char *data;
  int error, count;

  if (tag < 0 || tag > max_tag) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI tag");
    return -1;
  }
  if (dest < 0 || dest >= comm->size) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI destination");
    return -1;
  }
  data = PyString_AsString((PyObject *)string);
  count = PyString_GET_SIZE(string);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Send(comm, data, PyArray_CHAR, count, dest, tag);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Send failed");
    return -1;
  }
  return 0;
}

static PyObject *
PyMPICommunicator_send(PyMPICommunicatorObject *self, PyObject *args)
{
  PyObject *data;
  int dest, tag;

  if (!PyArg_ParseTuple(args, "Oii", &data, &dest, &tag))
    return NULL;
  if (PyArray_Check(data)) {
    if (PyMPI_SendArray(self, (PyArrayObject *)data, dest, tag) != 0)
      return NULL;
  }
  else if (PyString_Check(data)) {
    if (PyMPI_SendString(self, (PyStringObject *)data, dest, tag) != 0)
      return NULL;
  }
  else {
    PyErr_SetString(PyExc_MPIError, "can send only array or string");
    return NULL;
  }
  Py_INCREF(Py_None);
  return Py_None;
}

/* Nonblocking send data */

static PyObject *
PyMPI_SendNonBlocking(PyMPICommunicatorObject *comm, void *buffer,
                  PyObject* buffer_owner, int type, int len, int dest,
                  int tag)
{
  MPI_Request rq;
  int rc;
  
  if (tag < 0 || tag > max_tag) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI tag");
    return NULL;
  }
  if (dest < 0 || dest >= comm->size) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI destination");
    return NULL;
  }
  Py_BEGIN_ALLOW_THREADS;
  rc = MPI_Isend(buffer, len, mpi_type(type), dest, tag, comm->handle, &rq);
  Py_END_ALLOW_THREADS;
  PyMPI_TEST(rc, "MPI_Isend");
  return (PyObject *) newPyMPIRequestObject(rq, buffer_owner,
                                  PyMPIRequestSend, mpi_type(type));
}

static PyObject *
PyMPI_SendArrayNonBlocking(PyMPICommunicatorObject *comm, PyArrayObject *array,
                     int dest, int tag)
{
  PyObject *request;
  int count;
  int i;
  
  /* Make sure array is contiguous.  Just increase refcount if it is */
  array = (PyArrayObject *) PyArray_ContiguousFromObject((PyObject *) array,
                                           PyArray_NOTYPE, 0, 0);
  if (array == NULL)
    return NULL;
  count = 1;
  for (i = 0; i < array->nd; i++)
    count *= array->dimensions[i];
  count *= mpi_count_factor(array->descr->type_num);
  request = PyMPI_SendNonBlocking(comm, array->data, (PyObject *) array,
                          array->descr->type_num, count,
                          dest, tag);
  Py_DECREF(array);
  return request;
}

static PyObject *
PyMPI_SendStringNonBlocking(PyMPICommunicatorObject *comm,
                      PyStringObject *string, int dest, int tag)
{
  PyObject *request;
  char *data;
  int count;
  
  data = PyString_AS_STRING(string);
  count = PyString_GET_SIZE(string);
  request = PyMPI_SendNonBlocking(comm, data, (PyObject *) string,
                          PyArray_CHAR, count, dest, tag);
  return request;
}

static PyObject *
PyMPICommunicator_nonblocking_send(PyMPICommunicatorObject *self,
                           PyObject *args)
{
  PyObject *data;
  int dest, tag;

  if (!PyArg_ParseTuple(args, "Oii", &data, &dest, &tag))
    return NULL;
  if (PyArray_Check(data))
    return PyMPI_SendArrayNonBlocking(self, (PyArrayObject *) data, dest, tag);
  if (PyString_Check(data))
    return PyMPI_SendStringNonBlocking(self, (PyStringObject *) data, dest,
                               tag);
  PyErr_SetString(PyExc_MPIError, "can only send an array or a string");
  return NULL;
}

/* Receive data */

static int
PyMPI_Receive(PyMPICommunicatorObject *comm, void *buffer,
            int type, int len,
            int source, int tag, int *sourcep, int *tagp, int *lenp)
{
  MPI_Status status;
  int error;

  error = MPI_Recv(buffer, len, mpi_type(type), source, tag,
               comm->handle, &status);
  if (error == MPI_SUCCESS) {
    if (sourcep != NULL)
      *sourcep = status.MPI_SOURCE;
    if (tagp != NULL)
      *tagp = status.MPI_TAG;
    if (lenp != NULL)
      error = MPI_Get_count(&status, mpi_type(type), lenp);
  }
  return error;
}

static int
PyMPI_ReceiveArray(PyMPICommunicatorObject *comm, PyArrayObject *array,
               int source, int tag, int *sourcep, int *tagp, int *lenp)
{
  int count, i;
  int error;

  count = 1;
  for (i = 0; i < array->nd; i++)
    count *= array->dimensions[i];
  count *= mpi_count_factor(array->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Receive(comm, array->data, array->descr->type_num,
                  count, source, tag, sourcep, tagp, lenp);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Recv failed");
    return -1;
  }
  return 0;
}

static PyObject *
PyMPI_ReceiveString(PyMPICommunicatorObject *comm,
                int source, int tag, int *sourcep, int *tagp)
{
  MPI_Status status;
  PyStringObject *string;
  int count;
  int error;

  Py_BEGIN_ALLOW_THREADS;
  error = MPI_Probe(source, tag, comm->handle, &status);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Probe failed");
    return NULL;
  }
  source = status.MPI_SOURCE;
  tag = status.MPI_TAG;
  if (MPI_Get_count(&status, MPI_CHAR, &count) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Get_count failed");
    return NULL;
  }
  string = (PyStringObject *)PyString_FromStringAndSize(NULL, count);
  if (string == NULL)
    return NULL;
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Receive(comm, PyString_AsString((PyObject *)string),
                  PyArray_CHAR, count,
                  source, tag, sourcep, tagp, NULL);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Recv failed");
    Py_DECREF(string);
    return NULL;
  }
  return (PyObject *)string;
}

static PyObject *
PyMPICommunicator_receive(PyMPICommunicatorObject *self, PyObject *args)
{
  PyArrayObject *array;
  PyObject *buffer_ob, *source_ob, *tag_ob, *return_ob;
  int source, tag, count;
  int error;

  source_ob = Py_None;
  tag_ob = Py_None;
  if (!PyArg_ParseTuple(args, "O|OO", &buffer_ob, &source_ob, &tag_ob))
    return NULL;
  if (source_ob == Py_None)
    source = MPI_ANY_SOURCE;
  else if (!PyInt_Check(source_ob)
         || (source = PyInt_AsLong(source_ob)) < 0
         || source >= self->size) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI source");
    return NULL;
  }
  if (tag_ob == Py_None)
    tag = MPI_ANY_TAG;
  else if (!PyInt_Check(tag_ob)
         || (tag = PyInt_AsLong(tag_ob)) < 0
         || tag >= max_tag) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI tag");
    return NULL;
  }

  if (PyArray_Check(buffer_ob)) {
    array = (PyArrayObject *)buffer_ob;
    if (!PyArray_ISCONTIGUOUS(array)) {
      PyErr_SetString(PyExc_ValueError, "buffer must be contiguous");
      return NULL;
    }
    Py_INCREF(array);
  }
  else if (PyString_Check(buffer_ob)) {
    MPI_Status status;
    char type_code = PyString_AsString(buffer_ob)[0];
    int type = PyArray_DescrFromType(type_code)->type_num;
    int factor;
    Py_BEGIN_ALLOW_THREADS;
    error = MPI_Probe(source, tag, self->handle, &status);
    Py_END_ALLOW_THREADS;
    if (error != MPI_SUCCESS) {
      PyErr_SetString(PyExc_MPIError, "MPI_Probe failed");
      return NULL;
    }
    source = status.MPI_SOURCE;
    tag = status.MPI_TAG;
    if (MPI_Get_count(&status, mpi_type(type), &count) != MPI_SUCCESS) {
      PyErr_SetString(PyExc_MPIError, "MPI_Get_count failed");
      return NULL;
    }
    factor = mpi_count_factor(type);
    if (count == MPI_UNDEFINED || count % factor != 0) {
      PyErr_SetString(PyExc_MPIError,
                  "buffer data type incompatible with message");
      return NULL;
    }
    count /= factor;
    array = (PyArrayObject *)PyArray_FromDims(1, &count, type);
    if (array == NULL)
      return NULL;
  }

  if (PyMPI_ReceiveArray(self, array, source, tag,
                   &source, &tag, &count) != 0) {
    Py_DECREF(array);
    return NULL;
  }

  return_ob = Py_BuildValue("Oiii", array, source, tag, count);
  Py_DECREF(array);
  return return_ob;
}

static PyObject *
PyMPICommunicator_receiveString(PyMPICommunicatorObject *self, PyObject *args)
{
  PyObject *source_ob = Py_None, *tag_ob = Py_None, *return_ob;
  PyObject *string;
  int source, tag;

  if (!PyArg_ParseTuple(args, "|OO", &source_ob, &tag_ob))
    return NULL;
  if (source_ob == Py_None)
    source = MPI_ANY_SOURCE;
  else if (!PyInt_Check(source_ob)
         || (source = PyInt_AsLong(source_ob)) < 0
         || source >= self->size) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI source");
    return NULL;
  }
  if (tag_ob == Py_None)
    tag = MPI_ANY_TAG;
  else if (!PyInt_Check(tag_ob)
         || (tag = PyInt_AsLong(tag_ob)) < 0
         || tag >= max_tag) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI tag");
    return NULL;
  }

  string = PyMPI_ReceiveString(self, source, tag, &source, &tag);
  if (string == NULL)
    return NULL;

  return_ob = Py_BuildValue("Oii", string, source, tag);
  Py_DECREF(string);
  return return_ob;

}

/* Nonblocking receive */

static PyObject *
PyMPI_ReceiveArrayNonBlocking(PyMPICommunicatorObject *comm,
                        PyArrayObject *array, int source, int tag)
{
  int count, i;
  MPI_Request rq;
  int error;
  MPI_Datatype type;

  count = 1;
  for (i = 0; i < array->nd; i++)
    count *= array->dimensions[i];
  count *= mpi_count_factor(array->descr->type_num);
  type = mpi_type(array->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = MPI_Irecv(array->data, count, type, source, tag, comm->handle, &rq);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Irecv failed");
    return NULL;
  }
  return (PyObject *) newPyMPIRequestObject(rq, (PyObject *) array,
                                  PyMPIRequestReceive, type);
}

static PyObject *
PyMPICommunicator_nonblocking_receive(PyMPICommunicatorObject *self,
                              PyObject *args)
{
  PyArrayObject *array;
  PyObject *source_ob, *tag_ob;
  int source, tag;

  source_ob = Py_None;
  tag_ob = Py_None;
  if (!PyArg_ParseTuple(args, "O!|OO", &PyArray_Type, &array,
                  &source_ob, &tag_ob))
    return NULL;
  if (source_ob == Py_None)
    source = MPI_ANY_SOURCE;
  else if (!PyInt_Check(source_ob)
         || (source = PyInt_AsLong(source_ob)) < 0
         || source >= self->size) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI source");
    return NULL;
  }
  if (tag_ob == Py_None)
    tag = MPI_ANY_TAG;
  else if (!PyInt_Check(tag_ob)
         || (tag = PyInt_AsLong(tag_ob)) < 0
         || tag >= max_tag) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI tag");
    return NULL;
  }

  if (!PyArray_ISCONTIGUOUS(array)) {
    PyErr_SetString(PyExc_ValueError, "buffer must be contiguous");
    return NULL;
  }

  return PyMPI_ReceiveArrayNonBlocking(self, array, source, tag);
}

/* Nonblocking Probe */

static int
PyMPI_ProbeNonBlocking(PyMPICommunicatorObject *comm, int source, int tag,
                   int *flagp, int *sourcep, int *tagp)
{
  MPI_Status status;
  int error;

  error = MPI_Iprobe(source, tag, comm->handle, flagp, &status);
  if (error == MPI_SUCCESS) {
    if (sourcep != NULL)
      *sourcep = status.MPI_SOURCE;
    if (tagp != NULL)
      *tagp = status.MPI_TAG;
  }
  return error;
}

static PyObject *
PyMPICommunicator_nonblocking_probe(PyMPICommunicatorObject *self,
                            PyObject *args)
{
  PyObject *source_ob, *tag_ob;
  int source, tag, available;
  int rc;

  source_ob = Py_None;
  tag_ob = Py_None;
  if (!PyArg_ParseTuple(args, "|OO", &source_ob, &tag_ob))
    return NULL;
  if (source_ob == Py_None)
    source = MPI_ANY_SOURCE;
  else if (!PyInt_Check(source_ob)
         || (source = PyInt_AsLong(source_ob)) < 0
         || source >= self->size) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI source");
    return NULL;
  }
  if (tag_ob == Py_None)
    tag = MPI_ANY_TAG;
  else if (!PyInt_Check(tag_ob)
         || (tag = PyInt_AsLong(tag_ob)) < 0
         || tag >= max_tag) {
    PyErr_SetString(PyExc_TypeError, "invalid MPI tag");
    return NULL;
  }

  Py_BEGIN_ALLOW_THREADS;
  rc = PyMPI_ProbeNonBlocking(self, source, tag, &available,
                        &source, &tag);
  Py_END_ALLOW_THREADS;
  PyMPI_TEST(rc, "MPI_Iprobe");

  if (available)
    return Py_BuildValue("ii", source, tag);
  else {
    Py_INCREF(Py_None);
    return Py_None;
  }
}


/* Broadcast */

static int
PyMPI_Broadcast(PyMPICommunicatorObject *comm, void *buffer,
            int type, int count, int root)
{
  return MPI_Bcast(buffer, count, mpi_type(type), root, comm->handle);
}

static int
PyMPI_BroadcastArray(PyMPICommunicatorObject *comm,
                 PyArrayObject *array, int root)
{
  int count;
  int error;
  int i;

  if (root < 0 || root >= comm->size) {
    PyErr_SetString(PyExc_MPIError, "invalid MPI rank");
    return -1;
  }
  if (!PyArray_ISCONTIGUOUS(array)) {
    PyErr_SetString(PyExc_ValueError, "array must be contiguous");
    return -1;
  }
  count = 1;
  for (i = 0; i < array->nd; i++)
    count *= array->dimensions[i];
  count *= mpi_count_factor(array->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Broadcast(comm, array->data, array->descr->type_num,
                    count, root);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Bcast failed");
    return -1;
  }
  return 0;
}

static PyObject *
PyMPICommunicator_broadcast(PyMPICommunicatorObject *self, PyObject *args)
{
  PyArrayObject *array;
  int root;
  if (!PyArg_ParseTuple(args, "O!i",
                  &PyArray_Type, &array, &root))
    return NULL;
  if (PyMPI_BroadcastArray(self, array, root) != 0)
    return NULL;
  Py_INCREF(Py_None);
  return Py_None;
}


/* Share data between all processes (Allgather) */

static int
PyMPI_Share(PyMPICommunicatorObject *comm, void *send, void *receive,
          int type, int count)
{
  return MPI_Allgather(send, count, mpi_type(type), receive, count,
                   mpi_type(type), comm->handle);
}

static int
PyMPI_ShareArray(PyMPICommunicatorObject *comm, PyArrayObject *send,
             PyArrayObject *receive)
{
  int count, error, compatible;
  int i;

  compatible = (receive->nd == send->nd+1);
  compatible = compatible && (receive->dimensions[0] == comm->size);
  if (compatible)
    for (i = 0; i < send->nd; i++)
      if (send->dimensions[i] != receive->dimensions[i+1])
      compatible = 0;
  compatible = compatible &&
               (send->descr->type_num == receive->descr->type_num);
  if (!compatible) {
    PyErr_SetString(PyExc_MPIError,
                "send and receive arrays are not compatible");
    return -1;
  }

  count = 1;
  for (i = 0; i < send->nd; i++)
    count *= send->dimensions[i];
  count *= mpi_count_factor(send->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Share(comm, send->data, receive->data, send->descr->type_num, count);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Allgather failed");
    return -1;
  }
  return 0;
}

static PyObject *
PyMPICommunicator_share(PyMPICommunicatorObject *self, PyObject *args)
{
  PyArrayObject *send, *receive;
  if (!PyArg_ParseTuple(args, "O!O!",
                  &PyArray_Type, &send,
                  &PyArray_Type, &receive))
    return NULL;
  if (PyMPI_ShareArray(self, send, receive) != 0)
    return NULL;
  Py_INCREF(Py_None);
  return Py_None;
}

/* Abort - for emergency use only */

static int
PyMPI_Abort(PyMPICommunicatorObject *comm, int err)
{
  return MPI_Abort(comm->handle, err);
}

static PyObject *
PyMPICommunicator_abort(PyMPICommunicatorObject *self, PyObject *args)
{
  int errcode;
    
  if (!PyArg_ParseTuple(args, "i", &errcode))
    return NULL;
  if (PyMPI_Abort(self, errcode) != MPI_SUCCESS) 
  {
    PyErr_SetString(PyExc_MPIError, "MPI_Abort failed");
    return NULL;
  }
    
  Py_INCREF(Py_None);
  return Py_None;
}

/* Reduce and Allreduce*/

static int
PyMPI_Reduce(PyMPICommunicatorObject *comm, void *sendbuf, void *recvbuf,
           int count, int datatype, PyMPIOperationObject *op, int root)
{
  return MPI_Reduce(sendbuf, recvbuf, count, mpi_type(datatype), mpi_op(op), root, comm->handle);
}

static int
PyMPI_Allreduce(PyMPICommunicatorObject *comm, void *sendbuf, void *recvbuf,
            int count, int datatype, PyMPIOperationObject *op)
{
  return MPI_Allreduce(sendbuf, recvbuf, count, mpi_type(datatype), mpi_op(op), comm->handle);
}

static int
PyMPI_ReduceArray(PyMPICommunicatorObject *comm, PyArrayObject *send,
              PyArrayObject *receive, PyMPIOperationObject *op, int root)
{
  int compatible, count, error;
  int i;

  compatible = (receive->nd == send->nd);
  count = 1;
  if (compatible) {    
    for (i = 0; i < send->nd; i++) {
      compatible = compatible && (receive->dimensions[i] ==
                          send->dimensions[i]);
      count *= send->dimensions[i];
    }
  }
  compatible = compatible && (receive->descr->type_num ==
                        send->descr->type_num);
  
  if (!compatible) {
    PyErr_SetString(PyExc_MPIError,
                "send and receive arrays are not compatible.");
    return -1;
  }

  count *= mpi_count_factor(send->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Reduce(comm, send->data, receive->data, count, send->descr->type_num, op, root);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Reduce failed");
    return -1;
  }
  return 0;
}

static int
PyMPI_AllreduceArray(PyMPICommunicatorObject *comm, PyArrayObject *send,
                 PyArrayObject *receive, PyMPIOperationObject *op)
{
  int compatible, count, error;
  int i;

  compatible = (receive->nd == send->nd);
  count = 1;
  if (compatible) {    
    for (i = 0; i < send->nd; i++) {
      compatible = compatible && (receive->dimensions[i] ==
                          send->dimensions[i]);
      count *= send->dimensions[i];
    }
  }
  compatible = compatible && (receive->descr->type_num ==
                        send->descr->type_num);
  
  if (!compatible) {
    PyErr_SetString(PyExc_MPIError,
                "send and receive arrays are not compatible.");
    return -1;
  }

  count *= mpi_count_factor(send->descr->type_num);
  Py_BEGIN_ALLOW_THREADS;
  error = PyMPI_Allreduce(comm, send->data, receive->data, count, send->descr->type_num, op);
  Py_END_ALLOW_THREADS;
  if (error != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Allreduce failed");
    return -1;
  }
  return 0;
}

static PyObject *
PyMPICommunicator_reduce(PyMPICommunicatorObject *self, PyObject *args)
{
  PyArrayObject *send, *receive;
  PyMPIOperationObject *operation;
  int root;

  if (!PyArg_ParseTuple(args, "O!O!O!i",
                  &PyArray_Type, &send,
                  &PyArray_Type, &receive,
                  &PyMPIOperation_Type, &operation,
                  &root))
    return NULL;
  if (PyMPI_ReduceArray(self, send, receive, operation, root) != 0)
    return NULL;
  Py_INCREF(Py_None);
  return Py_None;
}

static PyObject *
PyMPICommunicator_allreduce(PyMPICommunicatorObject *self, PyObject *args)
{
  PyArrayObject *send, *receive;
  PyMPIOperationObject *operation;

  if (!PyArg_ParseTuple(args, "O!O!O!",
                  &PyArray_Type, &send,
                  &PyArray_Type, &receive,
                  &PyMPIOperation_Type, &operation))
    return NULL;
  if (PyMPI_AllreduceArray(self, send, receive, operation) != 0)
    return NULL;
  Py_INCREF(Py_None);
  return Py_None;
}

/* Attribute access */

static PyMethodDef PyMPICommunicator_methods[] = {
  {"duplicate", (PyCFunction)PyMPICommunicator_duplicate, 1},
  {"subset", (PyCFunction)PyMPICommunicator_subset, 1},
  {"send", (PyCFunction)PyMPICommunicator_send, 1},
  {"nonblocking_send", (PyCFunction)PyMPICommunicator_nonblocking_send, 1},
  {"nonblockingSend", (PyCFunction)PyMPICommunicator_nonblocking_send, 1},
  {"receive", (PyCFunction)PyMPICommunicator_receive, 1},
  {"receiveString", (PyCFunction)PyMPICommunicator_receiveString, 1},
  {"nonblocking_receive", (PyCFunction)PyMPICommunicator_nonblocking_receive,
   1},
  {"nonblockingReceive", (PyCFunction)PyMPICommunicator_nonblocking_receive,
   1},
  {"nonblockingProbe", (PyCFunction)PyMPICommunicator_nonblocking_probe,
   1},
  {"broadcast", (PyCFunction)PyMPICommunicator_broadcast, 1},
  {"share", (PyCFunction)PyMPICommunicator_share, 1},
  {"barrier", (PyCFunction)PyMPICommunicator_barrier, 1},
  {"abort", (PyCFunction)PyMPICommunicator_abort, 1},
  {"reduce", (PyCFunction)PyMPICommunicator_reduce, 1},
  {"allreduce", (PyCFunction)PyMPICommunicator_allreduce, 1},
  {NULL, NULL}
};

static PyObject *
PyMPICommunicator_getattr(PyMPICommunicatorObject *self, char *name)
{
  if (strcmp(name, "rank") == 0) {
    return PyInt_FromLong((long)self->rank);
  }
  else if (strcmp(name, "size") == 0) {
    return PyInt_FromLong((long)self->size);
  }
  else
    return Py_FindMethod(PyMPICommunicator_methods, (PyObject *)self, name);
}

statichere PyTypeObject PyMPICommunicator_Type = {
      /* The ob_type field must be initialized in the module init function
       * to be portable to Windows without using C++. */
      PyObject_HEAD_INIT(NULL)
      0,                /*ob_size*/
      "PyMPICommunicator",                /*tp_name*/
      sizeof(PyMPICommunicatorObject),    /*tp_basicsize*/
      0,                /*tp_itemsize*/
      /* methods */
      (destructor)PyMPICommunicator_dealloc, /*tp_dealloc*/
      0,                /*tp_print*/
      (getattrfunc)PyMPICommunicator_getattr, /*tp_getattr*/
      0,                      /*tp_setattr*/
      0,                /*tp_compare*/
      0,                /*tp_repr*/
      0,                /*tp_as_number*/
      0,                /*tp_as_sequence*/
      0,                /*tp_as_mapping*/
      0,                /*tp_hash*/
};

/*************************************/
/* MPI Request object implementation */
/*************************************/

staticforward PyTypeObject PyMPIRequest_Type;

static PyMPIRequestObject *
newPyMPIRequestObject(MPI_Request rq, PyObject *buffer, int operation,
                  MPI_Datatype mpi_type)
{
  PyMPIRequestObject *self;

  self = PyObject_NEW(PyMPIRequestObject, &PyMPIRequest_Type);
  if (self == NULL)
    return NULL;
  self->handle[0] = rq;
  self->operation = operation;
  Py_INCREF(buffer);
  self->buffer = buffer;
  self->mpi_type = mpi_type;
  self->active = 1;
  return self;
}

/* Deallocate the object */
static void
PyMPIRequest_dealloc(PyMPIRequestObject *self)
{
  Py_XDECREF(self->buffer);  /* Release an eventual reference to the buffer */
  PyMem_DEL(self);
}

/* __repr__ */

static PyObject *
PyMPIRequest_repr(PyMPIRequestObject *self)
{
  char rbuffer[256];
  char opbuffer[50];

  switch (self->operation) {
  case PyMPIRequestSend:
    strcpy(opbuffer, "send");
    break;
  case PyMPIRequestReceive:
    strcpy(opbuffer, "receive");
    break;
  default:
    sprintf(opbuffer, "*** INVALID (%d) ***", self->operation);
  }
  
  sprintf(rbuffer,
        "<PyMPIRequest at %lx: handle = %ld, operation = %s, status = %s>",
        (long)self, (long)self->handle, opbuffer,
        (self->active ? "active" : "expired"));
  return PyString_FromString(rbuffer);
}

/* Wait for a nonblocking operation */

/* The argument is void * to keep MPI_Status out of C API, where it caused
   trouble */
static int
PyMPI_Wait(PyMPIRequestObject *self, void *s)
{
  return MPI_Wait(self->handle, (MPI_Status *) s);
}

static PyObject *
PyMPIRequest_wait(PyMPIRequestObject *self, PyObject *args)
{
  PyObject *return_ob;
  MPI_Status stat;
  int source, tag, count;
  
  if (!PyArg_ParseTuple(args, ""))
    return NULL;
  if (!self->active) {
    PyErr_SetString(PyExc_MPIError, "Cannot wait on expired request");
    return NULL;
  }

  if (PyMPI_Wait(self, &stat) != MPI_SUCCESS) {
    PyErr_SetString(PyExc_MPIError, "MPI_Wait failed");
    return NULL;
  }

  assert(self->buffer->ob_refcnt > 0);
  /* fprintf(stderr, "REFCOUNT: %d\n", (int) self->buffer->ob_refcnt); */
  switch (self->operation) {

  case PyMPIRequestSend:
    self->active = 0;      /* The operation has completed */
    Py_DECREF(self->buffer);  /* Discard the buffer */
    self->buffer = NULL;
    
    Py_INCREF(Py_None);
    return Py_None;

  case PyMPIRequestReceive:
    self->active = 0;      /* The operation has completed */
    source = stat.MPI_SOURCE;
    tag = stat.MPI_TAG;
    if (MPI_Get_count(&stat, self->mpi_type, &count) != MPI_SUCCESS) {
      PyErr_SetString(PyExc_MPIError, "MPI_Get_count failed");
      return NULL;
    }
    return_ob = Py_BuildValue("Oiii", self->buffer, source, tag, count);
    Py_DECREF(self->buffer);
    self->buffer = NULL;

    return return_ob;
  }
  PyErr_SetString(PyExc_MPIError, "Invalid operation field in request object");
  return NULL;
}

/* Attribute access */

static PyMethodDef PyMPIRequest_methods[] = {
  {"wait", (PyCFunction)PyMPIRequest_wait, METH_VARARGS},
  {NULL, NULL}
};

static PyObject *
PyMPIRequest_getattr(PyMPICommunicatorObject *self, char *name)
{
  return Py_FindMethod(PyMPIRequest_methods, (PyObject *)self, name);
}

statichere PyTypeObject PyMPIRequest_Type = {
      PyObject_HEAD_INIT(NULL)
      0,                      /*ob_size*/
      "PyMPIRequest",               /*tp_name*/
      sizeof(PyMPIRequestObject),   /*tp_basicsize*/
      0,                      /*tp_itemsize*/
      /* methods */
      (destructor) PyMPIRequest_dealloc, /*tp_dealloc*/
      0,                      /*tp_print*/
      (getattrfunc) PyMPIRequest_getattr, /*tp_getattr*/
      0,                            /*tp_setattr*/
      0,                      /*tp_compare*/
      (reprfunc) PyMPIRequest_repr, /*tp_repr*/
      0,                      /*tp_as_number*/
      0,                      /*tp_as_sequence*/
      0,                      /*tp_as_mapping*/
      0,                      /*tp_hash*/
      0,                      /*tp_call*/
      0                       /*tp_str*/
};

/*****************************/
/* Error object registration */
/*****************************/

static PyObject *
register_error_object(PyObject *dummy, PyObject *args)
{
  if (!PyArg_ParseTuple(args, "O", &PyExc_MPIError))
    return NULL;
  Py_INCREF(PyExc_MPIError);
  Py_INCREF(Py_None);
  return Py_None;
}

/********************************************/
/* Table of functions defined in the module */
/********************************************/

static PyMethodDef mpi_methods[] = {
  {"_registerErrorObject", register_error_object, 1},
  {NULL, NULL}          /* sentinel */
};

/*************************/
/* Module initialization */
/*************************/

/* a handy macro to create/register PyMPIOperation objects */
#define NEW_OP(mpi_op,op_name) do { \
      PyObject *op = (PyObject *) newPyMPIOperationObject(mpi_op,op_name); \
      PyDict_SetItemString(d, op_name, op); \
      } while (0)

DL_EXPORT(void)
initScientific_mpi(void)
{
  PyObject *m, *d, *world;
  static void *PyMPI_API[PyMPI_API_pointers];
  int mpi_init_flag;

  /* Set type of locally created static objects. */
  PyMPIOperation_Type.ob_type    = &PyType_Type;
  PyMPICommunicator_Type.ob_type = &PyType_Type;
  PyMPIRequest_Type.ob_type      = &PyType_Type;

  /* Create the module */
  m = Py_InitModule("Scientific_mpi", mpi_methods);
  d = PyModule_GetDict(m);

  /* Initialize C API pointer array and store in module */
  set_PyMPI_API_pointers();
  PyDict_SetItemString(d, "_C_API", PyCObject_FromVoidPtr(PyMPI_API, NULL));

  /* Import the array module */
#ifdef import_array
  import_array();
  if (PyErr_Occurred()) {
    PyErr_SetString(PyExc_ImportError, "Can\'t import Numeric.");
    return;
  }
#endif

  /* Check that MPI has been initialized */
  if (MPI_Initialized(&mpi_init_flag) != MPI_SUCCESS || !mpi_init_flag) {
    Py_INCREF(Py_None);
    PyDict_SetItemString(d, "world", Py_None);
#if 0
    fprintf(stderr, "Use mpipython to run this program!\n");
    _exit(1);
#endif
  }
  else {
    /* Create the world communicator object */
    world = (PyObject *)newPyMPICommunicatorObject(MPI_COMM_WORLD);
    PyDict_SetItemString(d, "world", world);

    NEW_OP(MPI_MAX,     "max");
    NEW_OP(MPI_MIN,     "min");
    NEW_OP(MPI_SUM,     "sum");
    NEW_OP(MPI_PROD,    "prod");
    NEW_OP(MPI_LAND,    "land");
    NEW_OP(MPI_BAND,    "band");
    NEW_OP(MPI_LOR,     "lor");
    NEW_OP(MPI_BOR,     "bor");
    NEW_OP(MPI_LXOR,    "lxor");
    NEW_OP(MPI_BXOR,    "bxor");
    NEW_OP(MPI_MAXLOC,  "maxloc");
    NEW_OP(MPI_MINLOC,  "minloc");
#if MPI_VERSION > 1
    NEW_OP(MPI_REPLACE, "replace");
#endif
  }

  /* Check for errors */
  if (PyErr_Occurred())
    PyErr_SetString(PyExc_ImportError, "Can\'t initialize module.");
}


/* Keep Konrad Hinsens indentation style when using cc mode in (x)emacs. */
/* Local Variables: */
/* c-basic-offset: 2 */
/* c-hanging-braces-alist: ((brace-list-open) (substatement-open after) (class-open after) (class-close before) (block-close . c-snug-do-while)) */
/* End: */

    

Generated by  Doxygen 1.6.0   Back to index