import sys
import numpy as np
import pandas as pd
from .. import util
from ..dimension import Dimension
from ..element import Element
from ..ndmapping import NdMapping, item_check, sorted_context
from .interface import Interface
from .pandas import PandasInterface
[docs]class DaskInterface(PandasInterface):
"""
The DaskInterface allows a Dataset objects to wrap a dask
DataFrame object. Using dask allows loading data lazily
and performing out-of-core operations on the data, making
it possible to work on datasets larger than memory.
The DaskInterface covers almost the complete API exposed
by the PandasInterface with two notable exceptions:
1) Sorting is not supported and any attempt at sorting will
be ignored with an warning.
2) Dask does not easily support adding a new column to an existing
dataframe unless it is a scalar, add_dimension will therefore
error when supplied a non-scalar value.
4) Not all functions can be easily applied to a dask dataframe so
some functions applied with aggregate and reduce will not work.
"""
types = ()
datatype = 'dask'
default_partitions = 100
[docs] @classmethod
def loaded(cls):
return 'dask.dataframe' in sys.modules and 'pandas' in sys.modules
[docs] @classmethod
def applies(cls, obj):
if not cls.loaded():
return False
import dask.dataframe as dd
return isinstance(obj, (dd.DataFrame, dd.Series))
@classmethod
def init(cls, eltype, data, kdims, vdims):
import dask.dataframe as dd
data, dims, extra = PandasInterface.init(eltype, data, kdims, vdims)
if not isinstance(data, dd.DataFrame):
data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False)
kdims = [d.name if isinstance(d, Dimension) else d for d in dims['kdims']]
# If a key dimension can be found, speculatively reset index
# to work around lacking dask support for MultiIndex
if any(d for d in kdims if d not in data.columns):
reset = data.reset_index()
if all(d for d in kdims if d in reset.columns):
data = reset
return data, dims, extra
[docs] @classmethod
def compute(cls, dataset):
return dataset.clone(dataset.data.compute())
[docs] @classmethod
def persist(cls, dataset):
return dataset.clone(dataset.data.persist())
@classmethod
def shape(cls, dataset):
return (len(dataset.data), len(dataset.data.columns))
@classmethod
def range(cls, dataset, dimension):
import dask.dataframe as dd
dimension = dataset.get_dimension(dimension, strict=True)
column = dataset.data[dimension.name]
if column.dtype.kind == 'O':
column = np.sort(column[column.notnull()].compute())
return (column[0], column[-1]) if len(column) else (None, None)
else:
if dimension.nodata is not None:
column = cls.replace_value(column, dimension.nodata)
return dd.compute(column.min(), column.max())
@classmethod
def sort(cls, dataset, by=None, reverse=False):
if by is None:
by = []
dataset.param.warning('Dask dataframes do not support sorting')
return dataset.data
@classmethod
def values(cls, dataset, dim, expanded=True, flat=True, compute=True, keep_index=False):
dim = dataset.get_dimension(dim)
data = dataset.data[dim.name]
if not expanded:
data = data.unique()
if keep_index:
return data.compute() if compute else data
else:
return data.compute().values if compute else data.values
[docs] @classmethod
def select_mask(cls, dataset, selection):
"""
Given a Dataset object and a dictionary with dimension keys and
selection keys (i.e. tuple ranges, slices, sets, lists. or literals)
return a boolean mask over the rows in the Dataset object that
have been selected.
"""
select_mask = None
for dim, k in selection.items():
if isinstance(k, tuple):
k = slice(*k)
masks = []
alias = dataset.get_dimension(dim).name
series = dataset.data[alias]
if isinstance(k, slice):
if k.start is not None:
# Workaround for dask issue #3392
kval = util.numpy_scalar_to_python(k.start)
masks.append(kval <= series)
if k.stop is not None:
kval = util.numpy_scalar_to_python(k.stop)
masks.append(series < kval)
elif isinstance(k, (set, list)):
iter_slc = None
for ik in k:
mask = series == ik
if iter_slc is None:
iter_slc = mask
else:
iter_slc |= mask
masks.append(iter_slc)
elif callable(k):
masks.append(k(series))
else:
masks.append(series == k)
for mask in masks:
if select_mask is not None:
select_mask &= mask
else:
select_mask = mask
return select_mask
@classmethod
def select(cls, dataset, selection_mask=None, **selection):
df = dataset.data
if selection_mask is not None:
return df[selection_mask]
selection_mask = cls.select_mask(dataset, selection)
indexed = cls.indexed(dataset, selection)
df = df if selection_mask is None else df[selection_mask]
if indexed and len(df) == 1 and len(dataset.vdims) == 1:
return df[dataset.vdims[0].name].compute().iloc[0]
return df
@classmethod
def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs):
index_dims = [dataset.get_dimension(d) for d in dimensions]
element_dims = [kdim for kdim in dataset.kdims
if kdim not in index_dims]
group_kwargs = {}
if group_type != 'raw' and issubclass(group_type, Element):
group_kwargs = dict(util.get_param_values(dataset),
kdims=element_dims)
group_kwargs.update(kwargs)
# Propagate dataset
group_kwargs['dataset'] = dataset.dataset
data = []
group_by = [d.name for d in index_dims]
groupby = dataset.data.groupby(group_by)
if len(group_by) == 1:
column = dataset.data[group_by[0]]
if column.dtype.name == 'category':
try:
indices = ((ind,) for ind in column.cat.categories)
except NotImplementedError:
indices = ((ind,) for ind in column.unique().compute())
else:
indices = ((ind,) for ind in column.unique().compute())
else:
group_tuples = dataset.data[group_by].itertuples()
indices = util.unique_iterator(ind[1:] for ind in group_tuples)
for coord in indices:
if any(isinstance(c, float) and np.isnan(c) for c in coord):
continue
if len(coord) == 1:
coord = coord[0]
group = group_type(groupby.get_group(coord), **group_kwargs)
data.append((coord, group))
if issubclass(container_type, NdMapping):
with item_check(False), sorted_context(False):
return container_type(data, kdims=index_dims)
else:
return container_type(data)
@classmethod
def aggregate(cls, dataset, dimensions, function, **kwargs):
data = dataset.data
cols = [d.name for d in dataset.kdims if d in dimensions]
vdims = dataset.dimensions('value', label='name')
dtypes = data.dtypes
numeric = [c for c, dtype in zip(dtypes.index, dtypes.values)
if dtype.kind in 'iufc' and c in vdims]
reindexed = data[cols+numeric]
inbuilts = {'amin': 'min', 'amax': 'max', 'mean': 'mean',
'std': 'std', 'sum': 'sum', 'var': 'var'}
if len(dimensions):
groups = reindexed.groupby(cols)
if (function.__name__ in inbuilts):
agg = getattr(groups, inbuilts[function.__name__])()
else:
agg = groups.apply(function)
df = agg.reset_index()
else:
if (function.__name__ in inbuilts):
agg = getattr(reindexed, inbuilts[function.__name__])()
else:
raise NotImplementedError
df = pd.DataFrame(agg.compute()).T
dropped = []
for vd in vdims:
if vd not in df.columns:
dropped.append(vd)
return df, dropped
[docs] @classmethod
def unpack_scalar(cls, dataset, data):
"""
Given a dataset object and data in the appropriate format for
the interface, return a simple scalar.
"""
import dask.dataframe as dd
if len(data.columns) > 1 or len(data) != 1:
return data
if isinstance(data, dd.DataFrame):
data = data.compute()
return data.iat[0,0]
@classmethod
def sample(cls, dataset, samples=None):
if samples is None:
samples = []
data = dataset.data
dims = dataset.dimensions('key', label='name')
mask = None
for sample in samples:
if np.isscalar(sample): sample = [sample]
for c, v in zip(dims, sample):
dim_mask = data[c]==v
if mask is None:
mask = dim_mask
else:
mask |= dim_mask
return data[mask]
@classmethod
def add_dimension(cls, dataset, dimension, dim_pos, values, vdim):
data = dataset.data
if dimension.name not in data.columns:
if not np.isscalar(values):
if len(values):
err = ('Dask dataframe does not support assigning '
'non-scalar value.')
raise NotImplementedError(err)
values = None
data = data.assign(**{dimension.name: values})
return data
@classmethod
def concat_fn(cls, dataframes, **kwargs):
import dask.dataframe as dd
return dd.concat(dataframes, **kwargs)
@classmethod
def dframe(cls, dataset, dimensions):
if dimensions:
return dataset.data[dimensions].compute()
else:
return dataset.data.compute()
@classmethod
def nonzero(cls, dataset):
return True
[docs] @classmethod
def iloc(cls, dataset, index):
"""
Dask does not support iloc, therefore iloc will execute
the call graph and lose the laziness of the operation.
"""
rows, cols = index
scalar = False
if isinstance(cols, slice):
cols = [d.name for d in dataset.dimensions()][cols]
elif np.isscalar(cols):
scalar = np.isscalar(rows)
cols = [dataset.get_dimension(cols).name]
else:
cols = [dataset.get_dimension(d).name for d in index[1]]
if np.isscalar(rows):
rows = [rows]
data = {}
for c in cols:
data[c] = dataset.data[c].compute().iloc[rows].values
if scalar:
return data[cols[0]][0]
return tuple(data.values())
Interface.register(DaskInterface)