[docs]classcuDFInterface(PandasInterface):""" The cuDFInterface allows a Dataset objects to wrap a cuDF DataFrame object. Using cuDF allows working with columnar data on a GPU. Most operations leave the data in GPU memory, however to plot the data it has to be loaded into memory. The cuDFInterface covers almost the complete API exposed by the PandasInterface with two notable exceptions: 1) Aggregation and groupby do not have a consistent sort order (see https://github.com/rapidsai/cudf/issues/4237) 3) Not all functions can be easily applied to a cuDF so some functions applied with aggregate and reduce will not work. """datatype='cuDF'types=()
@classmethoddefinit(cls,eltype,data,kdims,vdims):importcudfelement_params=eltype.param.objects()kdim_param=element_params['kdims']vdim_param=element_params['vdims']ifisinstance(data,(cudf.Series,pd.Series)):data=data.to_frame()ifnotisinstance(data,cudf.DataFrame):data,_,_=PandasInterface.init(eltype,data,kdims,vdims)data=cudf.from_pandas(data)columns=list(data.columns)ncols=len(columns)index_names=[data.index.name]ifindex_names==[None]:index_names=['index']ifeltype._auto_indexable_1dandncols==1andkdimsisNone:kdims=list(index_names)ifisinstance(kdim_param.bounds[1],int):ndim=min([kdim_param.bounds[1],len(kdim_param.default)])else:ndim=Nonenvdim=vdim_param.bounds[1]ifisinstance(vdim_param.bounds[1],int)elseNoneifkdimsandvdimsisNone:vdims=[cforcincolumnsifcnotinkdims]elifvdimsandkdimsisNone:kdims=[cforcincolumnsifcnotinvdims][:ndim]elifkdimsisNone:kdims=list(columns[:ndim])ifvdimsisNone:vdims=[dfordincolumns[ndim:((ndim+nvdim)ifnvdimelseNone)]ifdnotinkdims]elifkdims==[]andvdimsisNone:vdims=list(columns[:nvdimifnvdimelseNone])# Handle reset of index if kdims reference index by nameforkdinkdims:kd=dimension_name(kd)ifkdincolumns:continueifany(kd==('index'ifnameisNoneelsename)fornameinindex_names):data=data.reset_index()breakifany(isinstance(d,(np.int64,int))fordinkdims+vdims):raiseDataError("cudf DataFrame column names used as dimensions ""must be strings not integers.",cls)ifkdims:kdim=dimension_name(kdims[0])ifeltype._auto_indexable_1dandncols==1andkdimnotincolumns:data=data.copy()data.insert(0,kdim,np.arange(len(data)))fordinkdims+vdims:d=dimension_name(d)iflen([cforcincolumnsifc==d])>1:raiseDataError('Dimensions may not reference duplicated DataFrame ''columns (found duplicate %r columns). If you want to plot ''a column against itself simply declare two dimensions ''with the same name. '%d,cls)returndata,{'kdims':kdims,'vdims':vdims},{}@classmethoddefrange(cls,dataset,dimension):dimension=dataset.get_dimension(dimension,strict=True)column=dataset.data[dimension.name]ifdimension.nodataisnotNone:column=cls.replace_value(column,dimension.nodata)ifcolumn.dtype.kind=='O':returnnp.nan,np.nanelse:returnfinite_range(column,column.min(),column.max())@classmethoddefvalues(cls,dataset,dim,expanded=True,flat=True,compute=True,keep_index=False):dim=dataset.get_dimension(dim,strict=True)data=dataset.data[dim.name]ifnotexpanded:data=data.unique()returndata.values_hostifcomputeelsedata.valueselifkeep_index:returndataelifcompute:returndata.values_hosttry:returndata.valuesexceptException:returndata.values_host@classmethoddefgroupby(cls,dataset,dimensions,container_type,group_type,**kwargs):# Get dimensions informationdimensions=[dataset.get_dimension(d).namefordindimensions]kdims=[kdimforkdimindataset.kdimsifkdimnotindimensions]# Update the kwargs appropriately for Element group typesgroup_kwargs={}group_type=dictifgroup_type=='raw'elsegroup_typeifissubclass(group_type,Element):group_kwargs.update(util.get_param_values(dataset))group_kwargs['kdims']=kdimsgroup_kwargs.update(kwargs)# Propagate datasetgroup_kwargs['dataset']=dataset.dataset# Find all the keys along supplied dimensionskeys=product(*(dataset.data[dimensions[0]].unique().values_hostfordindimensions))# Iterate over the unique entries applying selection masksgrouped_data=[]forunique_keyinutil.unique_iterator(keys):group_data=dataset.select(**dict(zip(dimensions,unique_key)))ifnotlen(group_data):continuegroup_data=group_type(group_data,**group_kwargs)grouped_data.append((unique_key,group_data))ifissubclass(container_type,NdMapping):withitem_check(False),sorted_context(False):kdims=[dataset.get_dimension(d)fordindimensions]returncontainer_type(grouped_data,kdims=kdims)else:returncontainer_type(grouped_data)
[docs]@classmethoddefselect_mask(cls,dataset,selection):""" Given a Dataset object and a dictionary with dimension keys and selection keys (i.e. tuple ranges, slices, sets, lists, or literals) return a boolean mask over the rows in the Dataset object that have been selected. """mask=Nonefordim,selinselection.items():ifisinstance(sel,tuple):sel=slice(*sel)arr=cls.values(dataset,dim,keep_index=True)ifutil.isdatetime(arr):try:sel=util.parse_datetime_selection(sel)exceptException:passnew_masks=[]ifisinstance(sel,slice):withwarnings.catch_warnings():warnings.filterwarnings('ignore',r'invalid value encountered')ifsel.startisnotNone:new_masks.append(sel.start<=arr)ifsel.stopisnotNone:new_masks.append(arr<sel.stop)ifnotnew_masks:continuenew_mask=new_masks[0]forimaskinnew_masks[1:]:new_mask&=imaskelifisinstance(sel,(set,list)):forvinsel:new_masks.append(arr==v)ifnotnew_masks:continuenew_mask=new_masks[0]forimaskinnew_masks[1:]:new_mask|=imaskelifcallable(sel):new_mask=sel(arr)else:new_mask=arr==selifmaskisNone:mask=new_maskelse:mask&=new_maskreturnmask
@classmethoddefselect(cls,dataset,selection_mask=None,**selection):df=dataset.dataifselection_maskisNone:selection_mask=cls.select_mask(dataset,selection)indexed=cls.indexed(dataset,selection)ifselection_maskisnotNone:df=df.iloc[selection_mask]ifindexedandlen(df)==1andlen(dataset.vdims)==1:returndf[dataset.vdims[0].name].iloc[0]returndf@classmethoddefconcat_fn(cls,dataframes,**kwargs):importcudfreturncudf.concat(dataframes,**kwargs)@classmethoddefadd_dimension(cls,dataset,dimension,dim_pos,values,vdim):data=dataset.data.copy()ifdimension.namenotindata:data[dimension.name]=valuesreturndata@classmethoddefaggregate(cls,dataset,dimensions,function,**kwargs):data=dataset.datacols=[d.namefordindataset.kdimsifdindimensions]vdims=dataset.dimensions('value',label='name')reindexed=data[cols+vdims]agg=function.__name__iflen(dimensions):agg_map={'amin':'min','amax':'max'}agg=agg_map.get(agg,agg)grouped=reindexed.groupby(cols,sort=False)ifnothasattr(grouped,agg):raiseValueError(f'{agg} aggregation is not supported on cudf DataFrame.')df=getattr(grouped,agg)().reset_index()else:agg_map={'amin':'min','amax':'max','size':'count'}agg=agg_map.get(agg,agg)ifnothasattr(reindexed,agg):raiseValueError(f'{agg} aggregation is not supported on cudf DataFrame.')agg=getattr(reindexed,agg)()try:data={col:[v]forcol,vinzip(agg.index.values_host,agg.to_numpy())}exceptException:# Give FutureWarning: 'The to_array method will be removed in a future cuDF release.# Consider using `to_numpy` instead.'# Seen in cudf=21.12.01data={col:[v]forcol,vinzip(agg.index.values_host,agg.to_array())}df=pd.DataFrame(data,columns=list(agg.index.values_host))dropped=[]forvdinvdims:ifvdnotindf.columns:dropped.append(vd)returndf,dropped@classmethoddefiloc(cls,dataset,index):importcudfrows,cols=indexscalar=Falsecolumns=list(dataset.data.columns)ifisinstance(cols,slice):cols=[d.namefordindataset.dimensions()][cols]elifnp.isscalar(cols):scalar=np.isscalar(rows)cols=[dataset.get_dimension(cols).name]else:cols=[dataset.get_dimension(d).namefordinindex[1]]col_index=[columns.index(c)forcincols]ifnp.isscalar(rows):rows=[rows]ifscalar:returndataset.data[cols[0]].iloc[rows[0]]result=dataset.data.iloc[rows,col_index]# cuDF does not handle single rows and cols indexing correctly# as of cudf=0.10.0 so we have to convert Series back to DataFrameifisinstance(result,cudf.Series):iflen(cols)==1:result=result.to_frame(cols[0])else:result=result.to_frame().Treturnresult@classmethoddefsort(cls,dataset,by=None,reverse=False):ifbyisNone:by=[]cols=[dataset.get_dimension(d,strict=True).namefordinby]returndataset.data.sort_values(by=cols,ascending=notreverse)@classmethoddefdframe(cls,dataset,dimensions):ifdimensions:returndataset.data[dimensions].to_pandas()else:returndataset.data.to_pandas()