Logo Search packages:      
Sourcecode: python-scientific version File versions  Download package

IO.py

# Parallel IO
#
# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2002-3-22
#

"""This module provides parallel acces to netCDF files. One netCDF dimension
is defined for splitting the data among processors such that each processor
is responsible for one slice of the file along that dimension.

Since netCDF files can be very big, the distribution algorithm gives
priority to memory efficiency over CPU time efficiency. The processor
that handles the file treats only one slice per superstep, which means
that at no time more than one slice must be stored in any processor.
"""

from Scientific.IO.NetCDF import NetCDFFile
from core import ParClass, ParBase, ParInvalid, is_invalid
import Numeric

00021 class _ParNetCDFFile(ParBase):

    """Distributed netCDF file

    Constructor: ParNetCDFFile(|filename|, |split_dimension|, |mode|='r',
                               |local_access| = 0)

    Arguments:

    |filename| -- the name of the netCDF file

    |split_dimension| -- the name of the dimension along which the data
                         is distributed over the processors

    |mode| -- read ('r'), write ('w'), or append ('a'). Default is 'r'.

    |local_access| -- if 0 (default), processor 0 is the only one to access
                      the file, all others communicate with processor 0. If
                      1 (only for reading), each processor accesses the
                      file directly. In the latter case, the file must be
                      accessible on all processors under the same name.
                      A third mode is 'auto', which uses some heuristics
                      to decide if the file is accessible everywhere:
                      it checks for existence of the file, then compares
                      the size on all processors, and finally verifies
                      that the same variables exist everywhere, with
                      identical names, types, and sizes.

    A ParNetCDFFile object acts as much as possible like a NetCDFFile object.
    Variables become ParNetCDFVariable objects, which behave like
    distributed sequences. Variables that use the dimension named by
    |split_dimension| are automatically distributed among the processors
    such that each treats only one slice of the whole file.
    """

    def __parinit__(self, pid, nprocs, filename, split_dimension,
                    mode = 'r', local_access = 0):
        if mode != 'r': local_access = 0
        self.pid = pid
        self.nprocs = nprocs
        self.filename = filename
        self.split = split_dimension
        self.local_access = local_access
        self.read_only = mode == 'r'
        if local_access or pid == 0:
            self.file = NetCDFFile(filename, mode)
            try:
                length = self.file.dimensions[split_dimension]
                if length is None:
                    length = -1
            except KeyError:
                length = None
            vars = {}
            for name, var in self.file.variables.items():
                vars[name] = (name, var.dimensions)
                if length < 0 and split_dimension in var.dimensions:
                    index = list(var.dimensions).index(split_dimension)
                    length = var.shape[index]
        else:
            self.file = None
            self.split = split_dimension
            length = None
            vars = None
        if not local_access:
            length = self.broadcast(length)
            vars = self.broadcast(vars)
        if length is not None:
            self._divideData(length)
        self.variables = {}
        for name, var in vars.items():
            self.variables[name] = _ParNetCDFVariable(self, var[0], var[1],
                                                      split_dimension)

    def __repr__(self):
        return repr(self.filename)

    def close(self):
        if self.local_access or self.pid == 0:
            self.file.close()

    def createDimension(self, name, length):
        if name == self.split:
            if length is None:
                raise ValueError, "Split dimension cannot be unlimited"
            self._divideData(length)
        if self.pid == 0:
            self.file.createDimension(name, length)

    def createVariable(self, name, typecode, dimensions):
        if self.pid == 0:
            var = self.file.createVariable(name, typecode, dimensions)
            dim = var.dimensions
        else:
            dim = 0
        name, dim = self.broadcast((name, dim))
        self.variables[name] = _ParNetCDFVariable(self, name, dim, self.split)
        return self.variables[name]

    def _divideData(self, length):
        chunk = (length+self.nprocs-1)/self.nprocs
        self.first = min(self.pid*chunk, length)
        self.last = min(self.first+chunk, length)
        if (not self.local_access) and self.pid == 0:
            self.parts = []
            for pid in range(self.nprocs):
                first = pid*chunk
                last = min(first+chunk, length)
                self.parts.append((first, last))

    def sync(self):
        if self.pid == 0:
            self.file.sync()
    flush = sync

class _ParNetCDFVariable(ParBase):

    def __init__(self, file, name, dimensions, split_dimension):
        self.file = file
        self.pid = file.pid
        self.nprocs = file.nprocs
        self.name = name
        self.dimensions = dimensions
        self.value = self
        self.attributes = {}
        try:
            self.index = list(dimensions).index(split_dimension)
        except ValueError:
            self.index = None

    def __repr__(self):
        return repr(self.name)

    def __getitem__(self, item):
        item = self._prepareIndices(item)
        if self.file.local_access :
            data = self._readData(item, self.file.first, self.file.last)
        elif self.pid == 0:
            for pid in range(1, self.nprocs):
                first, last = self.file.parts[pid]
                data = self._readData(item, first, last)
                self.put(data, [pid])
            data = self._readData(item, self.file.first, self.file.last)
        else:
            for pid in range(1, self.nprocs):
                messages = self.put(None, [])
                if messages:
                    data = messages[0]
        if data is None:
            return ParInvalid
        else:
            return data

    def __getslice__(self, first, last):
        return self.__getitem__(slice(first, last))

    def __setitem__(self, item, value):
        item = self._prepareIndices(item)
        if is_invalid(value):
            value = None
        if self.pid == 0:
            if value is not None:
                self._writeData(item, value, self.file.first, self.file.last)
            if self.index is not None:
                for pid in range(1, self.nprocs):
                    first, last = self.file.parts[pid]
                    data = self.put(None, [])
                    if data and data[0] is not None:
                        self._writeData(item, data[0], first, last)
        else:
            if self.index is not None:
                for pid in range(1, self.nprocs):
                    if pid == self.pid:
                        self.put(value, [0])
                    else:
                        self.put(None, [])

    def __setslice__(self, first, last, value):
        self.__setitem__(slice(first, last), value)

    def _prepareIndices(self, item):
        if not hasattr(item, 'is_parindex'):
            if type(item) != type(()):
                item = (item,)
            item = item + (len(self.dimensions)-len(item))*(slice(None),)
        return item

    def _readData(self, item, part_first, part_last):
        item = self._indices(item, part_first, part_last)
        if item is None:
            return None
        else:
            return self.file.file.variables[self.name][item]

    def _writeData(self, item, data, part_first, part_last):
        try:
            if len(data) == 0:
                return
        except TypeError:
            pass
        item = self._indices(item, part_first, part_last)
        if item is not None:
            try:
                self.file.file.variables[self.name][item] = Numeric.array(data)
            except:
                print self.file.file.variables[self.name].shape
                print item
                print Numeric.array(data).shape
                raise

    def _indices(self, item, part_first, part_last):
        if hasattr(item, 'is_parindex'):
            if not item.valid:
                return None
            if item.skip == 0:
                return item.start+part_first
            else:
                return slice(item.start+part_first, item.stop+part_first,
                             item.skip)
        if self.index is not None:
            split = item[self.index]
            if type(split) == type(0):
                raise ValueError, "Must use slice along split dimension"
            first, last, skip = split.start, split.stop, split.step
            if first is None: first = 0
            if skip is None: skip = 1
            n1 = max(0, (part_first-first+skip-1)/skip)
            first = first + n1*skip
            if last is None:
                last = part_last
            last = min(last, part_last)
            item = item[:self.index] + (slice(first, last, skip),) + \
                   item[self.index+1:]
        return item

    def __getattr__(self, attr):
        if self.file.local_access:
            return getattr(self.file.file.variables[self.name], attr)
        try:
            return self.attributes[attr]
        except KeyError:
            pass
        if self.pid == 0:
            value = getattr(self.file.file.variables[self.name], attr)
        else:
            value = None
        value = self.broadcast(value)
        if self.file.read_only:
            self.attributes[attr] = value
        return value

    def __len__(self):
        return self.file.last - self.file.first

ParNetCDFVariable = ParClass(_ParNetCDFVariable)
ParNetCDFFile = ParClass(_ParNetCDFFile)

Generated by  Doxygen 1.6.0   Back to index