Source code for holoviews.core.data.iris

from __future__ import absolute_import

import datetime
from itertools import product

import iris
from iris.coords import DimCoord
from iris.cube import CubeList
from iris.experimental.equalise_cubes import equalise_attributes
from iris.util import guess_coord_axis

import numpy as np

from .interface import Interface, DataError
from .grid import GridInterface
from ..dimension import Dimension, asdim
from ..element import Element
from ..ndmapping import (NdMapping, item_check, sorted_context)
from ..spaces import HoloMap
from .. import util



def get_date_format(coord):
    def date_formatter(val, pos=None):
        date = coord.units.num2date(val)
        date_format = Dimension.type_formatters.get(datetime.datetime, None)
        if date_format:
            return date.strftime(date_format)
        else:
            return date

    return date_formatter


[docs]def coord_to_dimension(coord): """ Converts an iris coordinate to a HoloViews dimension. """ kwargs = {} if coord.units.is_time_reference(): kwargs['value_format'] = get_date_format(coord) else: kwargs['unit'] = str(coord.units) return Dimension(coord.name(), **kwargs)
[docs]def sort_coords(coord): """ Sorts a list of DimCoords trying to ensure that dates and pressure levels appear first and the longitude and latitude appear last in the correct order. """ order = {'T': -2, 'Z': -1, 'X': 1, 'Y': 2} axis = guess_coord_axis(coord) return (order.get(axis, 0), coord and coord.name())
[docs]class CubeInterface(GridInterface): """ The CubeInterface provides allows HoloViews to interact with iris Cube data. When passing an iris Cube to a HoloViews Element the init method will infer the dimensions of the Cube from its coordinates. Currently the interface only provides the basic methods required for HoloViews to work with an object. """ types = (iris.cube.Cube,) datatype = 'cube' @classmethod def init(cls, eltype, data, kdims, vdims): if kdims: kdims = [asdim(kd) for kd in kdims] kdim_names = [kd.name for kd in kdims] else: kdims = eltype.kdims kdim_names = [kd.name for kd in eltype.kdims] if not isinstance(data, iris.cube.Cube): if vdims is None: vdims = eltype.vdims ndims = len(kdim_names) vdim = asdim(vdims[0]) vdims = [vdim] if isinstance(data, np.ndarray): if data.ndim != 2 or data.shape[1] != 2 or len(kdims) != 1: raise ValueError('Iris interface could not interpret array data.') data = {kdims[0].name: data[:, 0], vdim.name: data[:, 1]} elif isinstance(data, tuple): value_array = data[-1] data = {d: vals for d, vals in zip(kdim_names + [vdim.name], data)} elif isinstance(data, list) and data == []: ndims = len(kdims) dimensions = [d.name for d in kdims+vdims] data = {d: np.array([]) for d in dimensions[:ndims]} data.update({d: np.empty((0,) * ndims) for d in dimensions[ndims:]}) if isinstance(data, dict): value_array = data[vdim.name] coords = [(iris.coords.DimCoord(data[kd.name], long_name=kd.name, units=kd.unit), ndims-n-1) for n, kd in enumerate(kdims)] try: data = iris.cube.Cube(value_array, long_name=vdim.name, dim_coords_and_dims=coords) except: pass if not isinstance(data, iris.cube.Cube): raise TypeError('Data must be be an iris Cube type.') if kdims: coords = [] for kd in kdims: coord = data.coords(kd.name) if len(coord) == 0: raise ValueError('Key dimension %s not found in ' 'Iris cube.' % kd) coords.append(kd if isinstance(kd, Dimension) else coord[0]) else: coords = data.dim_coords coords = sorted(coords, key=sort_coords) kdims = [crd if isinstance(crd, Dimension) else coord_to_dimension(crd) for crd in coords] if vdims is None: vdims = [Dimension(data.name(), unit=str(data.units))] return data, {'kdims':kdims, 'vdims':vdims}, {} @classmethod def validate(cls, dataset, vdims=True): if vdims and len(dataset.vdims) > 1: raise DataError("Iris cubes do not support more than one value dimension", cls)
[docs] @classmethod def irregular(cls, dataset, dim): "CubeInterface does not support irregular data" return False
@classmethod def shape(cls, dataset, gridded=False): if gridded: return dataset.data.shape else: return (cls.length(dataset), len(dataset.dimensions())) @classmethod def coords(cls, dataset, dim, ordered=False, expanded=False): dim = dataset.get_dimension(dim, strict=True) if expanded: return util.expand_grid_coords(dataset, dim.name) data = dataset.data.coords(dim.name)[0].points if ordered and np.all(data[1:] < data[:-1]): data = data[::-1] return data
[docs] @classmethod def values(cls, dataset, dim, expanded=True, flat=True, compute=True): """ Returns an array of the values along the supplied dimension. """ dim = dataset.get_dimension(dim, strict=True) if dim in dataset.vdims: coord_names = [c.name() for c in dataset.data.dim_coords] data = dataset.data.copy().data data = cls.canonicalize(dataset, data, coord_names) return data.T.flatten() if flat else data elif expanded: data = cls.coords(dataset, dim.name, expanded=True) return data.T.flatten() if flat else data else: return cls.coords(dataset, dim.name, ordered=True)
@classmethod def reindex(cls, dataset, kdims=None, vdims=None): dropped_kdims = [kd for kd in dataset.kdims if kd not in kdims] constant = {} for kd in dropped_kdims: vals = cls.values(dataset, kd.name, expanded=False) if len(vals) == 1: constant[kd.name] = vals[0] if len(constant) == len(dropped_kdims): constraints = iris.Constraint(**constant) return dataset.data.extract(constraints) elif dropped_kdims: return tuple(dataset.columns(kdims+vdims).values()) return dataset.data
[docs] @classmethod def groupby(cls, dataset, dims, container_type=HoloMap, group_type=None, **kwargs): """ Groups the data by one or more dimensions returning a container indexed by the grouped dimensions containing slices of the cube wrapped in the group_type. This makes it very easy to break up a high-dimensional dataset into smaller viewable chunks. """ if not isinstance(dims, list): dims = [dims] dims = [dataset.get_dimension(d, strict=True) for d in dims] constraints = [d.name for d in dims] slice_dims = [d for d in dataset.kdims if d not in dims] # Update the kwargs appropriately for Element group types group_kwargs = {} group_type = dict if group_type == 'raw' else group_type if issubclass(group_type, Element): group_kwargs.update(util.get_param_values(dataset)) group_kwargs['kdims'] = slice_dims group_kwargs.update(kwargs) drop_dim = any(d not in group_kwargs['kdims'] for d in slice_dims) unique_coords = product(*[cls.values(dataset, d, expanded=False) for d in dims]) data = [] for key in unique_coords: constraint = iris.Constraint(**dict(zip(constraints, key))) extracted = dataset.data.extract(constraint) if drop_dim: extracted = group_type(extracted, kdims=slice_dims, vdims=dataset.vdims).columns() cube = group_type(extracted, **group_kwargs) data.append((key, cube)) if issubclass(container_type, NdMapping): with item_check(False), sorted_context(False): return container_type(data, kdims=dims) else: return container_type(data)
[docs] @classmethod def concat_dim(cls, datasets, dim, vdims): """ Concatenates datasets along one dimension """ cubes = [] for c, cube in datasets.items(): cube = cube.copy() cube.add_aux_coord(DimCoord([c], var_name=dim.name)) cubes.append(cube) cubes = CubeList(cubes) equalise_attributes(cubes) return cubes.merge_cube()
[docs] @classmethod def range(cls, dataset, dimension): """ Computes the range along a particular dimension. """ dim = dataset.get_dimension(dimension, strict=True) values = dataset.dimension_values(dim.name, False) return (np.nanmin(values), np.nanmax(values))
[docs] @classmethod def redim(cls, dataset, dimensions): """ Rename coords on the Cube. """ new_dataset = dataset.data.copy() for name, new_dim in dimensions.items(): if name == new_dataset.name(): new_dataset.rename(new_dim.name) for coord in new_dataset.dim_coords: if name == coord.name(): coord.rename(new_dim.name) return new_dataset
[docs] @classmethod def length(cls, dataset): """ Returns the total number of samples in the dataset. """ return np.product([len(d.points) for d in dataset.data.coords(dim_coords=True)], dtype=np.intp)
[docs] @classmethod def sort(cls, columns, by=[], reverse=False): """ Cubes are assumed to be sorted by default. """ return columns
[docs] @classmethod def aggregate(cls, columns, kdims, function, **kwargs): """ Aggregation currently not implemented. """ raise NotImplementedError
[docs] @classmethod def sample(cls, dataset, samples=[]): """ Sampling currently not implemented. """ raise NotImplementedError
[docs] @classmethod def add_dimension(cls, columns, dimension, dim_pos, values, vdim): """ Adding value dimensions not currently supported by iris interface. Adding key dimensions not possible on dense interfaces. """ if not vdim: raise Exception("Cannot add key dimension to a dense representation.") raise NotImplementedError
[docs] @classmethod def select_to_constraint(cls, dataset, selection): """ Transform a selection dictionary to an iris Constraint. """ def get_slicer(start, end): def slicer(cell): return start <= cell.point < end return slicer constraint_kwargs = {} for dim, constraint in selection.items(): if isinstance(constraint, slice): constraint = (constraint.start, constraint.stop) if isinstance(constraint, tuple): if constraint == (None, None): continue constraint = get_slicer(*constraint) dim = dataset.get_dimension(dim, strict=True) constraint_kwargs[dim.name] = constraint return iris.Constraint(**constraint_kwargs)
[docs] @classmethod def select(cls, dataset, selection_mask=None, **selection): """ Apply a selection to the data. """ constraint = cls.select_to_constraint(dataset, selection) pre_dim_coords = [c.name() for c in dataset.data.dim_coords] indexed = cls.indexed(dataset, selection) extracted = dataset.data.extract(constraint) if indexed and not extracted.dim_coords: return extracted.data.item() post_dim_coords = [c.name() for c in extracted.dim_coords] dropped = [c for c in pre_dim_coords if c not in post_dim_coords] for d in dropped: extracted = iris.util.new_axis(extracted, d) return extracted
Interface.register(CubeInterface)