Source code for holoviews.core.data.dictionary

from collections import OrderedDict, defaultdict

import numpy as np

from .. import util
from ..dimension import dimension_name
from ..element import Element
from ..ndmapping import NdMapping, item_check, sorted_context
from ..util import isscalar
from .interface import DataError, Interface


[docs]class DictInterface(Interface): """ Interface for simple dictionary-based dataset format. The dictionary keys correspond to the column (i.e. dimension) names and the values are collections representing the values in that column. """ types = (dict, OrderedDict) datatype = 'dictionary' @classmethod def dimension_type(cls, dataset, dim): name = dataset.get_dimension(dim, strict=True).name values = dataset.data[name] return type(values) if isscalar(values) else values.dtype.type @classmethod def init(cls, eltype, data, kdims, vdims): if kdims is None: kdims = eltype.kdims if vdims is None: vdims = eltype.vdims dimensions = [dimension_name(d) for d in kdims + vdims] if (isinstance(data, list) and all(isinstance(d, dict) for d in data) and not all(c in d for d in data for c in dimensions)): raise ValueError('DictInterface could not find specified dimensions in the data.') elif isinstance(data, tuple): data = {d: v for d, v in zip(dimensions, data)} elif util.is_dataframe(data) and all(d in data for d in dimensions): data = {d: data[d] for d in dimensions} elif isinstance(data, np.ndarray): if data.ndim == 1: if eltype._auto_indexable_1d and len(kdims)+len(vdims)>1: data = np.column_stack([np.arange(len(data)), data]) else: data = np.atleast_2d(data).T data = {k: data[:,i] for i,k in enumerate(dimensions)} elif isinstance(data, list) and data == []: data = dict([(d, []) for d in dimensions]) elif isinstance(data, list) and isscalar(data[0]): if eltype._auto_indexable_1d: data = {dimensions[0]: np.arange(len(data)), dimensions[1]: data} else: data = {dimensions[0]: data} elif (isinstance(data, list) and isinstance(data[0], tuple) and len(data[0]) == 2 and any(isinstance(v, tuple) for v in data[0])): dict_data = zip(*((util.wrap_tuple(k)+util.wrap_tuple(v)) for k, v in data)) data = {k: np.array(v) for k, v in zip(dimensions, dict_data)} # Ensure that interface does not consume data of other types # with an iterator interface elif not any(isinstance(data, tuple(t for t in interface.types if t is not None)) for interface in cls.interfaces.values()): data = {k: v for k, v in zip(dimensions, zip(*data))} elif (isinstance(data, dict) and not any(isinstance(v, np.ndarray) for v in data.values()) and not any(d in data or any(d in k for k in data if isinstance(k, tuple)) for d in dimensions)): # For data where both keys and values are dimension values # e.g. {('A', 'B'): (1, 2)} (should consider deprecating) dict_data = sorted(data.items()) k, v = dict_data[0] if len(util.wrap_tuple(k)) != len(kdims) or len(util.wrap_tuple(v)) != len(vdims): raise ValueError("Dictionary data not understood, should contain a column " "per dimension or a mapping between key and value dimension " "values.") dict_data = zip(*((util.wrap_tuple(k)+util.wrap_tuple(v)) for k, v in dict_data)) data = {k: np.array(v) for k, v in zip(dimensions, dict_data)} if not isinstance(data, cls.types): raise ValueError("DictInterface interface couldn't convert data.""") unpacked = [] for d, vals in data.items(): if isinstance(d, tuple): vals = np.asarray(vals) if vals.shape == (0,): for sd in d: unpacked.append((sd, np.array([], dtype=vals.dtype))) elif not vals.ndim == 2 and vals.shape[1] == len(d): raise ValueError("Values for %s dimensions did not have " "the expected shape.") else: for i, sd in enumerate(d): unpacked.append((sd, vals[:, i])) elif d not in dimensions: unpacked.append((d, vals)) else: if not isscalar(vals): vals = np.asarray(vals) if not vals.ndim == 1 and d in dimensions: raise ValueError('DictInterface expects data for each column to be flat.') unpacked.append((d, vals)) if not cls.expanded([vs for d, vs in unpacked if d in dimensions and not isscalar(vs)]): raise ValueError('DictInterface expects data to be of uniform shape.') # OrderedDict can't be replaced with dict: https://github.com/holoviz/holoviews/pull/5925 if isinstance(data, OrderedDict): data.update(unpacked) else: data = OrderedDict(unpacked) return data, {'kdims':kdims, 'vdims':vdims}, {} @classmethod def validate(cls, dataset, vdims=True): dim_types = 'all' if vdims else 'key' dimensions = dataset.dimensions(dim_types, label='name') not_found = [d for d in dimensions if d not in dataset.data] if not_found: raise DataError('Following columns specified as dimensions ' 'but not found in data: %s' % not_found, cls) lengths = [(dim, 1 if isscalar(dataset.data[dim]) else len(dataset.data[dim])) for dim in dimensions] if len({l for d, l in lengths if l > 1}) > 1: lengths = ', '.join(['%s: %d' % l for l in sorted(lengths)]) raise DataError('Length of columns must be equal or scalar, ' 'columns have lengths: %s' % lengths, cls)
[docs] @classmethod def unpack_scalar(cls, dataset, data): """ Given a dataset object and data in the appropriate format for the interface, return a simple scalar. """ if len(data) != 1: return data key = next(iter(data.keys())) if len(data[key]) == 1 and key in dataset.vdims: scalar = data[key][0] return scalar.compute() if hasattr(scalar, 'compute') else scalar return data
@classmethod def isscalar(cls, dataset, dim): name = dataset.get_dimension(dim, strict=True).name values = dataset.data[name] if isscalar(values): return True if values.dtype.kind == 'O': unique = set(values) else: unique = np.unique(values) if (~util.isfinite(unique)).all(): return True return len(unique) == 1 @classmethod def shape(cls, dataset): return cls.length(dataset), len(dataset.data), @classmethod def length(cls, dataset): lengths = [len(vals) for d, vals in dataset.data.items() if d in dataset.dimensions() and not isscalar(vals)] return max(lengths) if lengths else 1 @classmethod def array(cls, dataset, dimensions): if not dimensions: dimensions = dataset.dimensions(label='name') else: dimensions = [dataset.get_dimensions(d).name for d in dimensions] arrays = [dataset.data[dim.name] for dim in dimensions] return np.column_stack([np.full(len(dataset), arr) if isscalar(arr) else arr for arr in arrays]) @classmethod def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): dim = dimension_name(dimension) data = list(dataset.data.items()) data.insert(dim_pos, (dim, values)) return dict(data) @classmethod def redim(cls, dataset, dimensions): all_dims = dataset.dimensions() renamed = [] for k, v in dataset.data.items(): if k in dimensions: k = dimensions[k].name elif k in all_dims: k = dataset.get_dimension(k).name renamed.append((k, v)) return dict(renamed) @classmethod def concat(cls, datasets, dimensions, vdims): columns = defaultdict(list) for key, ds in datasets: for k, vals in ds.data.items(): columns[k].append(np.atleast_1d(vals)) for d, k in zip(dimensions, key): columns[d.name].append(np.full(len(ds), k)) template = datasets[0][1] dims = dimensions+template.dimensions() return dict([(d.name, np.concatenate(columns[d.name])) for d in dims]) @classmethod def mask(cls, dataset, mask, mask_value=np.nan): masked = dict(dataset.data) for vd in dataset.vdims: new_array = np.copy(dataset.data[vd.name]) new_array[mask] = mask_value masked[vd.name] = new_array return masked @classmethod def sort(cls, dataset, by=None, reverse=False): if by is None: by = [] by = [dataset.get_dimension(d).name for d in by] if len(by) == 1: sorting = cls.values(dataset, by[0]).argsort() else: arrays = [dataset.dimension_values(d) for d in by] sorting = util.arglexsort(arrays) return dict([(d, v if isscalar(v) else (v[sorting][::-1] if reverse else v[sorting])) for d, v in dataset.data.items()]) @classmethod def range(cls, dataset, dimension): dim = dataset.get_dimension(dimension, strict=True) column = dataset.data[dim.name] if isscalar(column): return column, column return Interface.range(dataset, dimension) @classmethod def values(cls, dataset, dim, expanded=True, flat=True, compute=True, keep_index=False): dim = dataset.get_dimension(dim, strict=True).name values = dataset.data.get(dim) if isscalar(values): if not expanded: return np.array([values]) values = np.full(len(dataset), values, dtype=np.array(values).dtype) else: if not expanded: return util.unique_array(values) values = np.asarray(values) return values @classmethod def assign(cls, dataset, new_data): data = dict(dataset.data) data.update(new_data) return data @classmethod def reindex(cls, dataset, kdims, vdims): dimensions = [dataset.get_dimension(d).name for d in kdims+vdims] return dict([(d, dataset.dimension_values(d)) for d in dimensions]) @classmethod def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): # Get dimensions information dimensions = [dataset.get_dimension(d) for d in dimensions] kdims = [kdim for kdim in dataset.kdims if kdim not in dimensions] vdims = dataset.vdims # Update the kwargs appropriately for Element group types group_kwargs = {} group_type = dict if group_type == 'raw' else group_type if issubclass(group_type, Element): group_kwargs.update(util.get_param_values(dataset)) group_kwargs['kdims'] = kdims group_kwargs.update(kwargs) # Find all the keys along supplied dimensions keys = (tuple(dataset.data[d.name] if isscalar(dataset.data[d.name]) else dataset.data[d.name][i] for d in dimensions) for i in range(len(dataset))) # Iterate over the unique entries applying selection masks grouped_data = [] for unique_key in util.unique_iterator(keys): mask = cls.select_mask(dataset, dict(zip(dimensions, unique_key))) group_data = dict((d.name, dataset.data[d.name] if isscalar(dataset.data[d.name]) else dataset.data[d.name][mask]) for d in kdims+vdims) group_data = group_type(group_data, **group_kwargs) grouped_data.append((unique_key, group_data)) if issubclass(container_type, NdMapping): with item_check(False), sorted_context(False): return container_type(grouped_data, kdims=dimensions) else: return container_type(grouped_data) @classmethod def select(cls, dataset, selection_mask=None, **selection): if selection_mask is None: selection_mask = cls.select_mask(dataset, selection) empty = not selection_mask.sum() dimensions = dataset.dimensions() if empty: return {d.name: np.array([], dtype=cls.dtype(dataset, d)) for d in dimensions} indexed = cls.indexed(dataset, selection) data = {} for k, v in dataset.data.items(): if k not in dimensions or isscalar(v): data[k] = v else: data[k] = v[selection_mask] if indexed and len(next(iter(data.values()))) == 1 and len(dataset.vdims) == 1: value = data[dataset.vdims[0].name] return value if isscalar(value) else value[0] return data @classmethod def sample(cls, dataset, samples=None): if samples is None: samples = [] mask = False for sample in samples: sample_mask = True if isscalar(sample): sample = [sample] for i, v in enumerate(sample): name = dataset.get_dimension(i).name sample_mask &= (dataset.data[name]==v) mask |= sample_mask return {k: col if isscalar(col) else np.array(col)[mask] for k, col in dataset.data.items()} @classmethod def aggregate(cls, dataset, kdims, function, **kwargs): kdims = [dataset.get_dimension(d, strict=True).name for d in kdims] vdims = dataset.dimensions('value', label='name') groups = cls.groupby(dataset, kdims, list, dict) aggregated = dict([(k, []) for k in kdims+vdims]) dropped = [] for key, group in groups: key = key if isinstance(key, tuple) else (key,) for kdim, val in zip(kdims, key): aggregated[kdim].append(val) for vdim, arr in group.items(): if vdim in dataset.vdims: if isscalar(arr): aggregated[vdim].append(arr) continue try: if isinstance(function, np.ufunc): reduced = function.reduce(arr, **kwargs) else: reduced = function(arr, **kwargs) aggregated[vdim].append(reduced) except TypeError: dropped.append(vdim) return aggregated, list(util.unique_iterator(dropped)) @classmethod def iloc(cls, dataset, index): rows, cols = index scalar = False if isscalar(cols): scalar = isscalar(rows) cols = [dataset.get_dimension(cols, strict=True)] elif isinstance(cols, slice): cols = dataset.dimensions()[cols] else: cols = [dataset.get_dimension(d, strict=True) for d in cols] if isscalar(rows): rows = [rows] new_data = {} for d, values in dataset.data.items(): if d in cols: if isscalar(values): new_data[d] = values else: new_data[d] = values[rows] if scalar: arr = new_data[cols[0].name] return arr if isscalar(arr) else arr[0] return new_data @classmethod def geom_type(cls, dataset): return dataset.data.get('geom_type') @classmethod def has_holes(cls, dataset): from holoviews.element import Polygons key = Polygons._hole_key return key in dataset.data and isinstance(dataset.data[key], list) @classmethod def holes(cls, dataset): from holoviews.element import Polygons key = Polygons._hole_key if key in dataset.data: holes = [] for hs in dataset.data[key]: subholes = [] for h in hs: hole = np.asarray(h) if (hole[0, :] != hole[-1, :]).all(): hole = np.concatenate([hole, hole[:1]]) subholes.append(hole) holes.append(subholes) return [holes] else: return super().holes(dataset)
Interface.register(DictInterface)