Source code for holoviews.plotting.bokeh.util

import calendar
import datetime as dt
import re
import time
from collections import defaultdict
from contextlib import contextmanager, suppress
from itertools import permutations

import bokeh
import numpy as np
import pandas as pd
from bokeh.core.json_encoder import serialize_json  # noqa (API import)
from bokeh.core.property.datetime import Datetime
from bokeh.core.validation import silence
from bokeh.layouts import Column, Row, group_tools
from bokeh.models import (
    CategoricalAxis,
    CopyTool,
    CustomJS,
    DataRange1d,
    DatetimeAxis,
    ExamineTool,
    FactorRange,
    FullscreenTool,
    GridBox,
    GridPlot,
    LayoutDOM,
    LinearAxis,
    LogAxis,
    MercatorAxis,
    Model,
    Plot,
    Range1d,
    SaveTool,
    Spacer,
    Tabs,
    Toolbar,
    tools,
)
from bokeh.models.formatters import PrintfTickFormatter, TickFormatter
from bokeh.models.scales import CategoricalScale, LinearScale, LogScale
from bokeh.models.widgets import DataTable, Div
from bokeh.plotting import figure
from bokeh.themes import built_in_themes
from bokeh.themes.theme import Theme
from packaging.version import Version

from ...core.layout import Layout
from ...core.ndmapping import NdMapping
from ...core.overlay import NdOverlay, Overlay
from ...core.spaces import DynamicMap, get_nested_dmaps
from ...core.util import (
    arraylike_types,
    callable_name,
    cftime_to_timestamp,
    cftime_types,
    isnumeric,
    unique_array,
)
from ...util.warnings import warn
from ..util import dim_axis_label

bokeh_version = Version(bokeh.__version__)
bokeh32 = bokeh_version >= Version("3.2")
bokeh33 = bokeh_version >= Version("3.3")
bokeh34 = bokeh_version >= Version("3.4")

TOOL_TYPES = {
    'pan': tools.PanTool,
    'xpan': tools.PanTool,
    'ypan': tools.PanTool,
    'xwheel_pan': tools.WheelPanTool,
    'ywheel_pan': tools.WheelPanTool,
    'wheel_zoom': tools.WheelZoomTool,
    'xwheel_zoom': tools.WheelZoomTool,
    'ywheel_zoom': tools.WheelZoomTool,
    'zoom_in': tools.ZoomInTool,
    'xzoom_in': tools.ZoomInTool,
    'yzoom_in': tools.ZoomInTool,
    'zoom_out': tools.ZoomOutTool,
    'xzoom_out': tools.ZoomOutTool,
    'yzoom_out': tools.ZoomOutTool,
    'click': tools.TapTool,
    'tap': tools.TapTool,
    'crosshair': tools.CrosshairTool,
    'box_select': tools.BoxSelectTool,
    'xbox_select': tools.BoxSelectTool,
    'ybox_select': tools.BoxSelectTool,
    'poly_select': tools.PolySelectTool,
    'lasso_select': tools.LassoSelectTool,
    'box_zoom': tools.BoxZoomTool,
    'xbox_zoom': tools.BoxZoomTool,
    'ybox_zoom': tools.BoxZoomTool,
    'hover': tools.HoverTool,
    'save': tools.SaveTool,
    'undo': tools.UndoTool,
    'redo': tools.RedoTool,
    'reset': tools.ResetTool,
    'help': tools.HelpTool,
    'box_edit': tools.BoxEditTool,
    'point_draw': tools.PointDrawTool,
    'poly_draw': tools.PolyDrawTool,
    'poly_edit': tools.PolyEditTool,
    'freehand_draw': tools.FreehandDrawTool
}


[docs]def convert_timestamp(timestamp): """ Converts bokehJS timestamp to datetime64. """ datetime = dt.datetime.fromtimestamp(timestamp/1000, tz=dt.timezone.utc) return np.datetime64(datetime.replace(tzinfo=None))
[docs]def prop_is_none(value): """ Checks if property value is None. """ return (value is None or (isinstance(value, dict) and 'value' in value and value['value'] is None))
[docs]def decode_bytes(array): """ Decodes an array, list or tuple of bytestrings to avoid python 3 bokeh serialization errors """ if (not len(array) or (isinstance(array, arraylike_types) and array.dtype.kind != 'O')): return array decoded = [v.decode('utf-8') if isinstance(v, bytes) else v for v in array] if isinstance(array, np.ndarray): return np.asarray(decoded) elif isinstance(array, tuple): return tuple(decoded) return decoded
[docs]def layout_padding(plots, renderer): """ Pads Nones in a list of lists of plots with empty plots. """ widths, heights = defaultdict(int), defaultdict(int) for r, row in enumerate(plots): for c, p in enumerate(row): if p is not None: width, height = renderer.get_size(p) widths[c] = max(widths[c], width) heights[r] = max(heights[r], height) expanded_plots = [] for r, row in enumerate(plots): expanded_plots.append([]) for c, p in enumerate(row): if p is None: p = empty_plot(widths[c], heights[r]) elif hasattr(p, 'width') and p.width == 0 and p.height == 0: p.width = widths[c] p.height = heights[r] expanded_plots[r].append(p) return expanded_plots
[docs]def compute_plot_size(plot): """ Computes the size of bokeh models that make up a layout such as figures, rows, columns, and Plot. """ if isinstance(plot, (GridBox, GridPlot)): ndmapping = NdMapping({(x, y): fig for fig, y, x in plot.children}, kdims=['x', 'y']) cols = ndmapping.groupby('x') rows = ndmapping.groupby('y') width = sum([max([compute_plot_size(f)[0] for f in col]) for col in cols]) height = sum([max([compute_plot_size(f)[1] for f in row]) for row in rows]) return width, height elif isinstance(plot, (Div, Toolbar)): # Cannot compute size for Div or Toolbar return 0, 0 elif isinstance(plot, (Row, Column, Tabs)): if not plot.children: return 0, 0 if isinstance(plot, Row) or (isinstance(plot, Toolbar) and plot.toolbar_location not in ['right', 'left']): w_agg, h_agg = (np.sum, np.max) elif isinstance(plot, Tabs): w_agg, h_agg = (np.max, np.max) else: w_agg, h_agg = (np.max, np.sum) widths, heights = zip(*[compute_plot_size(child) for child in plot.children]) return w_agg(widths), h_agg(heights) elif isinstance(plot, figure): if plot.width: width = plot.width else: width = plot.frame_width + plot.min_border_right + plot.min_border_left if plot.height: height = plot.height else: height = plot.frame_height + plot.min_border_bottom + plot.min_border_top return width, height elif isinstance(plot, (Plot, DataTable, Spacer)): return plot.width, plot.height else: return 0, 0
[docs]def compute_layout_properties( width, height, frame_width, frame_height, explicit_width, explicit_height, aspect, data_aspect, responsive, size_multiplier, logger=None): """ Utility to compute the aspect, plot width/height and sizing_mode behavior. Args: width (int): Plot width height (int): Plot height frame_width (int): Plot frame width frame_height (int): Plot frame height explicit_width (list): List of user supplied widths explicit_height (list): List of user supplied heights aspect (float): Plot aspect data_aspect (float): Scaling between x-axis and y-axis ranges responsive (boolean): Whether the plot should resize responsively size_multiplier (float): Multiplier for supplied plot dimensions logger (param.Parameters): Parameters object to issue warnings on Returns: Returns two dictionaries one for the aspect and sizing modes, and another for the plot dimensions. """ fixed_width = (explicit_width or frame_width) fixed_height = (explicit_height or frame_height) fixed_aspect = aspect or data_aspect if aspect == 'square': aspect = 1 elif aspect == 'equal': data_aspect = 1 # Plot dimensions height = None if height is None else int(height*size_multiplier) width = None if width is None else int(width*size_multiplier) frame_height = None if frame_height is None else int(frame_height*size_multiplier) frame_width = None if frame_width is None else int(frame_width*size_multiplier) actual_width = frame_width or width actual_height = frame_height or height if frame_width is not None: width = None if frame_height is not None: height = None sizing_mode = 'fixed' if responsive: if fixed_height and fixed_width: responsive = False if logger: logger.warning("responsive mode could not be enabled " "because fixed width and height were " "specified.") elif fixed_width: height = None sizing_mode = 'fixed' if fixed_aspect else 'stretch_height' elif fixed_height: width = None sizing_mode = 'fixed' if fixed_aspect else 'stretch_width' else: width, height = None, None if fixed_aspect: if responsive == 'width': sizing_mode = 'scale_width' elif responsive == 'height': sizing_mode = 'scale_height' else: sizing_mode = 'scale_both' elif responsive == 'width': sizing_mode = 'stretch_both' elif responsive == 'height': sizing_mode = 'stretch_height' else: sizing_mode = 'stretch_both' if fixed_aspect: if ((explicit_width and not frame_width) != (explicit_height and not frame_height)) and logger: logger.warning('Due to internal constraints, when aspect and ' 'width/height is set, the bokeh backend uses ' 'those values as frame_width/frame_height instead. ' 'This ensures the aspect is respected, but means ' 'that the plot might be slightly larger than ' 'anticipated. Set the frame_width/frame_height ' 'explicitly to suppress this warning.') aspect_type = 'data_aspect' if data_aspect else 'aspect' if fixed_width and fixed_height and aspect: if aspect == 'equal': data_aspect = 1 elif not data_aspect: aspect = None if logger: logger.warning( "%s value was ignored because absolute width and " "height values were provided. Either supply " "explicit frame_width and frame_height to achieve " "desired aspect OR supply a combination of width " "or height and an aspect value." % aspect_type) elif fixed_width and responsive: height = None responsive = False if logger: logger.warning("responsive mode could not be enabled " "because fixed width and aspect were " "specified.") elif fixed_height and responsive: width = None responsive = False if logger: logger.warning("responsive mode could not be enabled " "because fixed height and aspect were " "specified.") elif responsive == 'width': sizing_mode = 'scale_width' elif responsive == 'height': sizing_mode = 'scale_height' if responsive == 'width' and fixed_width: responsive = False if logger: logger.warning("responsive width mode could not be enabled " "because a fixed width was defined.") if responsive == 'height' and fixed_height: responsive = False if logger: logger.warning("responsive height mode could not be enabled " "because a fixed height was defined.") match_aspect = False aspect_scale = 1 aspect_ratio = None if data_aspect: match_aspect = True if (fixed_width and fixed_height): frame_width, frame_height = frame_width or width, frame_height or height elif fixed_width or not fixed_height: height = None elif fixed_height or not fixed_width: width = None aspect_scale = data_aspect if aspect == 'equal': aspect_scale = 1 elif responsive: aspect_ratio = aspect elif (fixed_width and fixed_height): pass elif isnumeric(aspect): if responsive: aspect_ratio = aspect elif fixed_width: frame_width = actual_width frame_height = int(actual_width/aspect) width, height = None, None else: frame_width = int(actual_height*aspect) frame_height = actual_height width, height = None, None elif aspect is not None and logger: logger.warning('aspect value of type %s not recognized, ' 'provide a numeric value, \'equal\' or ' '\'square\'.') aspect_info = { 'aspect_ratio': aspect_ratio, 'aspect_scale': aspect_scale, 'match_aspect': match_aspect, 'sizing_mode' : sizing_mode } dimension_info = { 'frame_width' : frame_width, 'frame_height': frame_height, 'height' : height, 'width' : width } return aspect_info, dimension_info
[docs]def merge_tools(plot_grid, disambiguation_properties=None): """ Merges tools defined on a grid of plots into a single toolbar. All tools of the same type are merged unless they define one of the disambiguation properties. By default `name`, `icon`, `tags` and `description` can be used to prevent tools from being merged. """ tools = [] for row in plot_grid: for item in row: if isinstance(item, LayoutDOM): for p in item.select(dict(type=Plot)): tools.extend(p.toolbar.tools) if isinstance(item, GridPlot): item.toolbar_location = None def merge(tool, group): if issubclass(tool, (SaveTool, CopyTool, ExamineTool, FullscreenTool)): return tool() else: return None if not disambiguation_properties: disambiguation_properties = {'name', 'icon', 'tags', 'description'} ignore = set() for tool in tools: for p in tool.properties_with_values(): if p not in disambiguation_properties: ignore.add(p) return Toolbar(tools=group_tools(tools, merge=merge, ignore=ignore) if merge_tools else tools)
[docs]def sync_legends(bokeh_layout): """This syncs the legends of all plots in a grid based on their name. Parameters ---------- bokeh_layout : bokeh.models.{GridPlot, Row, Column} Gridplot to sync legends of. """ if len(bokeh_layout.children) < 2: return # Collect all glyph with names items = defaultdict(list) click_policies = set() for fig in bokeh_layout.children: if isinstance(fig, tuple): # GridPlot fig = fig[0] if not isinstance(fig, figure): continue for r in fig.renderers: if r.name: items[r.name].append(r) if fig.legend: click_policies.add(fig.legend[0].click_policy) click_policies.discard("none") # If legend is not visible, click_policy is "none" if len(click_policies) > 1: warn("Click policy of legends are not the same, no syncing will happen.") return elif not click_policies: return # Link all glyphs with the same name mapping = {"mute": "muted", "hide": "visible"} policy = mapping.get(next(iter(click_policies))) code = f"dst.{policy} = src.{policy}" for item in items.values(): for src, dst in permutations(item, 2): src.js_on_change( policy, CustomJS(code=code, args=dict(src=src, dst=dst)), )
[docs]def select_legends(holoviews_layout, figure_index=None, legend_position="top_right"): """ Only displays selected legends in plot layout. Parameters ---------- holoviews_layout : Holoviews Layout Holoviews Layout with legends. figure_index : list[int] | bool | int | None Index of the figures which legends to show. If None is chosen, only the first figures legend is shown If True is chosen, all legends are shown. legend_position : str Position of the legend(s). """ if figure_index is None: figure_index = [0] elif isinstance(figure_index, bool): figure_index = range(len(holoviews_layout)) if figure_index else [] elif isinstance(figure_index, int): figure_index = [figure_index] if not isinstance(holoviews_layout, Layout): holoviews_layout = [holoviews_layout] for i, plot in enumerate(holoviews_layout): if not isinstance(plot, (NdOverlay, Overlay)): continue if i in figure_index: plot.opts(show_legend=True, legend_position=legend_position) else: plot.opts(show_legend=False) if isinstance(holoviews_layout, list): return holoviews_layout[0] return holoviews_layout
[docs]@contextmanager def silence_warnings(*warnings): """ Context manager for silencing bokeh validation warnings. """ for warning in warnings: silence(warning) try: yield finally: for warning in warnings: silence(warning, False)
[docs]def empty_plot(width, height): """ Creates an empty and invisible plot of the specified size. """ return Spacer(width=width, height=height)
[docs]def remove_legend(plot, legend): """ Removes a legend from a bokeh plot. """ valid_places = ['left', 'right', 'above', 'below', 'center'] plot.legend[:] = [l for l in plot.legend if l is not legend] for place in valid_places: place = getattr(plot, place) if legend in place: place.remove(legend)
[docs]def font_size_to_pixels(size): """ Convert a fontsize to a pixel value """ if size is None or not isinstance(size, str): return conversions = {'em': 16, 'pt': 16/12.} val = re.findall(r'\d+', size) unit = re.findall('[a-z]+', size) if (val and not unit) or (val and unit[0] == 'px'): return int(val[0]) elif val and unit[0] in conversions: return (int(int(val[0]) * conversions[unit[0]]))
def make_axis(axis, size, factors, dim, flip=False, rotation=0, label_size=None, tick_size=None, axis_height=35): factors = list(map(dim.pprint_value, factors)) nchars = np.max([len(f) for f in factors]) ranges = FactorRange(factors=factors) ranges2 = Range1d(start=0, end=1) axis_label = dim_axis_label(dim) reset = "range.setv({start: 0, end: range.factors.length})" customjs = CustomJS(args=dict(range=ranges), code=reset) ranges.js_on_change('start', customjs) axis_props = {} if label_size: axis_props['axis_label_text_font_size'] = label_size if tick_size: axis_props['major_label_text_font_size'] = tick_size tick_px = font_size_to_pixels(tick_size) if tick_px is None: tick_px = 8 label_px = font_size_to_pixels(label_size) if label_px is None: label_px = 10 rotation = np.radians(rotation) if axis == 'x': align = 'center' # Adjust height to compensate for label rotation height = int(axis_height + np.abs(np.sin(rotation)) * ((nchars*tick_px)*0.82)) + tick_px + label_px opts = dict(x_axis_type='auto', x_axis_label=axis_label, x_range=ranges, y_range=ranges2, height=height, width=size) else: # Adjust width to compensate for label rotation align = 'left' if flip else 'right' width = int(axis_height + np.abs(np.cos(rotation)) * ((nchars*tick_px)*0.82)) + tick_px + label_px opts = dict(y_axis_label=axis_label, x_range=ranges2, y_range=ranges, height=size, width=width) p = figure(toolbar_location=None, tools=[], **opts) p.outline_line_alpha = 0 p.grid.grid_line_alpha = 0 if axis == 'x': p.align = 'end' p.yaxis.visible = False axis = p.xaxis[0] if flip: p.above = p.below p.below = [] p.xaxis[:] = p.above else: p.xaxis.visible = False axis = p.yaxis[0] if flip: p.right = p.left p.left = [] p.yaxis[:] = p.right axis.major_label_orientation = rotation axis.major_label_text_align = align axis.major_label_text_baseline = 'middle' axis.update(**axis_props) return p
[docs]def hsv_to_rgb(hsv): """ Vectorized HSV to RGB conversion, adapted from: http://stackoverflow.com/questions/24852345/hsv-to-rgb-color-conversion """ h, s, v = (hsv[..., i] for i in range(3)) shape = h.shape i = np.int_(h*6.) f = h*6.-i q = f t = 1.-f i = np.ravel(i) f = np.ravel(f) i%=6 t = np.ravel(t) q = np.ravel(q) s = np.ravel(s) v = np.ravel(v) clist = (1-s*np.vstack([np.zeros_like(f),np.ones_like(f),q,t]))*v #0:v 1:p 2:q 3:t order = np.array([[0,3,1],[2,0,1],[1,0,3],[1,2,0],[3,1,0],[0,1,2]]) rgb = clist[order[i], np.arange(np.prod(shape))[:,None]] return rgb.reshape(shape+(3,))
[docs]def pad_width(model, table_padding=0.85, tabs_padding=1.2): """ Computes the width of a model and sets up appropriate padding for Tabs and DataTable types. """ if isinstance(model, Row): vals = [pad_width(child) for child in model.children] width = np.max([v for v in vals if v is not None]) elif isinstance(model, Column): vals = [pad_width(child) for child in model.children] width = np.sum([v for v in vals if v is not None]) elif isinstance(model, Tabs): vals = [pad_width(t) for t in model.tabs] width = np.max([v for v in vals if v is not None]) for submodel in model.tabs: submodel.width = width width = int(tabs_padding*width) elif isinstance(model, DataTable): width = model.width model.width = int(table_padding*width) elif isinstance(model, Div): width = model.width elif model: width = model.width else: width = 0 return width
[docs]def pad_plots(plots): """ Accepts a grid of bokeh plots in form of a list of lists and wraps any DataTable or Tabs in a Column with appropriate padding. Required to avoid overlap in gridplot. """ widths = [] for row in plots: row_widths = [] for p in row: width = pad_width(p) row_widths.append(width) widths.append(row_widths) plots = [[Column(p, width=w) if isinstance(p, (DataTable, Tabs)) else p for p, w in zip(row, ws)] for row, ws in zip(plots, widths)] return plots
[docs]def filter_toolboxes(plots): """ Filters out toolboxes out of a list of plots to be able to compose them into a larger plot. """ if isinstance(plots, list): plots = [filter_toolboxes(plot) for plot in plots] elif hasattr(plots, 'toolbar'): plots.toolbar_location = None elif hasattr(plots, 'children'): plots.children = [filter_toolboxes(child) for child in plots.children if not isinstance(child, Toolbar)] return plots
[docs]def get_tab_title(key, frame, overlay): """ Computes a title for bokeh tabs from the key in the overlay, the element and the containing (Nd)Overlay. """ if isinstance(overlay, Overlay): if frame is not None: title = [] if frame.label: title.append(frame.label) if frame.group != frame.param.objects('existing')['group'].default: title.append(frame.group) else: title.append(frame.group) else: title = key title = ' '.join(title) else: title = ' | '.join([d.pprint_value_string(k) for d, k in zip(overlay.kdims, key)]) return title
[docs]def get_default(model, name, theme=None): """ Looks up the default value for a bokeh model property. """ overrides = None if theme is not None: if isinstance(theme, str): theme = built_in_themes[theme] overrides = theme._for_class(model) descriptor = model.lookup(name) return descriptor.property.themed_default(model, name, overrides)
[docs]def filter_batched_data(data, mapping): """ Iterates over the data and mapping for a ColumnDataSource and replaces columns with repeating values with a scalar. This is purely and optimization for scalar types. """ for k, v in list(mapping.items()): if isinstance(v, dict) and 'field' in v: if 'transform' in v: continue v = v['field'] elif not isinstance(v, str): continue values = data[v] try: if len(unique_array(values)) == 1: mapping[k] = values[0] del data[v] except Exception: pass
[docs]def cds_column_replace(source, data): """ Determine if the CDS.data requires a full replacement or simply needs to be updated. A replacement is required if untouched columns are not the same length as the columns being updated. """ current_length = [len(v) for v in source.data.values() if isinstance(v, (list,)+arraylike_types)] new_length = [len(v) for v in data.values() if isinstance(v, (list, np.ndarray))] untouched = [k for k in source.data if k not in data] return bool(untouched and current_length and new_length and current_length[0] != new_length[0])
[docs]@contextmanager def hold_policy(document, policy, server=False): """ Context manager to temporary override the hold policy. """ old_policy = document.callbacks.hold_value document.callbacks._hold = policy try: yield finally: if server and not old_policy: document.unhold() else: document.callbacks._hold = old_policy
[docs]def recursive_model_update(model, props): """ Recursively updates attributes on a model including other models. If the type of the new model matches the old model properties are simply updated, otherwise the model is replaced. """ updates = {} valid_properties = model.properties_with_values() for k, v in props.items(): if isinstance(v, Model): nested_model = getattr(model, k) if type(v) is type(nested_model): nested_props = v.properties_with_values(include_defaults=False) recursive_model_update(nested_model, nested_props) else: try: setattr(model, k, v) except Exception as e: if isinstance(v, dict) and 'value' in v: setattr(model, k, v['value']) else: raise e elif k in valid_properties and v != valid_properties[k]: if isinstance(v, dict) and 'value' in v: v = v['value'] updates[k] = v model.update(**updates)
[docs]def update_shared_sources(f): """ Context manager to ensures data sources shared between multiple plots are cleared and updated appropriately avoiding warnings and allowing empty frames on subplots. Expects a list of shared_sources and a mapping of the columns expected columns for each source in the plots handles. """ def wrapper(self, *args, **kwargs): source_cols = self.handles.get('source_cols', {}) shared_sources = self.handles.get('shared_sources', []) doc = self.document for source in shared_sources: source.data.clear() if doc: event_obj = doc.callbacks event_obj._held_events = event_obj._held_events[:-1] ret = f(self, *args, **kwargs) for source in shared_sources: expected = source_cols[id(source)] found = [c for c in expected if c in source.data] empty = np.full_like(source.data[found[0]], np.nan) if found else [] patch = {c: empty for c in expected if c not in source.data} source.data.update(patch) return ret return wrapper
[docs]def categorize_array(array, dim): """ Uses a Dimension instance to convert an array of values to categorical (i.e. string) values and applies escaping for colons, which bokeh treats as a categorical suffix. """ return np.array([dim.pprint_value(x) for x in array])
[docs]class periodic: """ Mocks the API of periodic Thread in hv.core.util, allowing a smooth API transition on bokeh server. """ def __init__(self, document): self.document = document self.callback = None self.period = None self.count = None self.counter = None self._start_time = None self.timeout = None self._pcb = None @property def completed(self): return self.counter is None def start(self): self._start_time = time.time() if self.document is None: raise RuntimeError('periodic was registered to be run on bokeh' 'server but no document was found.') self._pcb = self.document.add_periodic_callback(self._periodic_callback, self.period) def __call__(self, period, count, callback, timeout=None, block=False): if isinstance(count, int): if count < 0: raise ValueError('Count value must be positive') elif type(count) is not type(None): raise ValueError('Count value must be a positive integer or None') self.callback = callback self.period = period*1000. self.timeout = timeout self.count = count self.counter = 0 return self def _periodic_callback(self): self.callback(self.counter) self.counter += 1 if self.timeout is not None: dt = (time.time() - self._start_time) if dt > self.timeout: self.stop() if self.counter == self.count: self.stop() def stop(self): self.counter = None self.timeout = None try: self.document.remove_periodic_callback(self._pcb) except ValueError: # Already stopped pass self._pcb = None def __repr__(self): return f'periodic({self.period}, {self.count}, {callable_name(self.callback)})' def __str__(self): return repr(self)
[docs]def attach_periodic(plot): """ Attaches plot refresh to all streams on the object. """ def append_refresh(dmap): for subdmap in get_nested_dmaps(dmap): subdmap.periodic._periodic_util = periodic(plot.document) return plot.hmap.traverse(append_refresh, [DynamicMap])
[docs]def date_to_integer(date): """Converts support date types to milliseconds since epoch Attempts highest precision conversion of different datetime formats to milliseconds since the epoch (1970-01-01 00:00:00). If datetime is a cftime with a non-standard calendar the caveats described in hv.core.util.cftime_to_timestamp apply. Args: date: Date- or datetime-like object Returns: Milliseconds since 1970-01-01 00:00:00 """ if isinstance(date, pd.Timestamp): try: date = date.to_datetime64() except Exception: date = date.to_datetime() if isinstance(date, np.datetime64): return date.astype('datetime64[ms]').astype(float) elif isinstance(date, cftime_types): return cftime_to_timestamp(date, 'ms') if hasattr(date, 'timetuple'): dt_int = calendar.timegm(date.timetuple())*1000 else: raise ValueError('Datetime type not recognized') return dt_int
[docs]def glyph_order(keys, draw_order=None): """ Orders a set of glyph handles using regular sort and an explicit sort order. The explicit draw order must take the form of a list of glyph names while the keys should be glyph names with a custom suffix. The draw order may only match subset of the keys and any matched items will take precedence over other entries. """ if draw_order is None: draw_order = [] keys = sorted(keys) def order_fn(glyph): matches = [item for item in draw_order if glyph.startswith(item)] return ((draw_order.index(matches[0]), glyph) if matches else (1e9+keys.index(glyph), glyph)) return sorted(keys, key=order_fn)
[docs]def colormesh(X, Y): """ Generates line paths for a quadmesh given 2D arrays of X and Y coordinates. """ X1 = X[0:-1, 0:-1].ravel() Y1 = Y[0:-1, 0:-1].ravel() X2 = X[1:, 0:-1].ravel() Y2 = Y[1:, 0:-1].ravel() X3 = X[1:, 1:].ravel() Y3 = Y[1:, 1:].ravel() X4 = X[0:-1, 1:].ravel() Y4 = Y[0:-1, 1:].ravel() X = np.column_stack([X1, X2, X3, X4, X1]) Y = np.column_stack([Y1, Y2, Y3, Y4, Y1]) return X, Y
def theme_attr_json(theme, attr): if isinstance(theme, str) and theme in built_in_themes: return built_in_themes[theme]._json['attrs'].get(attr, {}) elif isinstance(theme, Theme): return theme._json['attrs'].get(attr, {}) else: return {}
[docs]def multi_polygons_data(element): """ Expands polygon data which contains holes to a bokeh multi_polygons representation. Multi-polygons split by nans are expanded and the correct list of holes is assigned to each sub-polygon. """ xs, ys = (element.dimension_values(kd, expanded=False) for kd in element.kdims) holes = element.holes() xsh, ysh = [], [] for x, y, multi_hole in zip(xs, ys, holes): xhs = [[h[:, 0] for h in hole] for hole in multi_hole] yhs = [[h[:, 1] for h in hole] for hole in multi_hole] array = np.column_stack([x, y]) splits = np.where(np.isnan(array[:, :2].astype('float')).sum(axis=1))[0] arrays = np.split(array, splits+1) if len(splits) else [array] multi_xs, multi_ys = [], [] for i, (path, hx, hy) in enumerate(zip(arrays, xhs, yhs)): if i != (len(arrays)-1): path = path[:-1] multi_xs.append([path[:, 0]]+hx) multi_ys.append([path[:, 1]]+hy) xsh.append(multi_xs) ysh.append(multi_ys) return xsh, ysh
[docs]def match_dim_specs(specs1, specs2): """Matches dimension specs used to link axes. Axis dimension specs consists of a list of tuples corresponding to each dimension, each tuple spec has the form (name, label, unit). The name and label must match exactly while the unit only has to match if both specs define one. """ if (specs1 is None or specs2 is None) or (len(specs1) != len(specs2)): return False for spec1, spec2 in zip(specs1, specs2): for s1, s2 in zip(spec1, spec2): if s1 is None or s2 is None: continue if s1 != s2: return False return True
def get_scale(range_input, axis_type): if isinstance(range_input, (DataRange1d, Range1d)) and axis_type in ["linear", "datetime", "mercator", "auto", None]: return LinearScale() elif isinstance(range_input, (DataRange1d, Range1d)) and axis_type == "log": return LogScale() elif isinstance(range_input, FactorRange): return CategoricalScale() else: raise ValueError(f"Unable to determine proper scale for: '{range_input}'") def get_axis_class(axis_type, range_input, dim): # Copied from bokeh if axis_type is None: return None, {} elif axis_type == "linear": return LinearAxis, {} elif axis_type == "log": return LogAxis, {} elif axis_type == "datetime": return DatetimeAxis, {} elif axis_type == "mercator": return MercatorAxis, dict(dimension='lon' if dim == 0 else 'lat') elif axis_type == "auto": if isinstance(range_input, FactorRange): return CategoricalAxis, {} elif isinstance(range_input, Range1d): try: value = range_input.start # Datetime accepts ints/floats as timestamps, but we don't want # to assume that implies a datetime axis if Datetime.is_timestamp(value): return LinearAxis, {} Datetime.validate(Datetime(), value) return DatetimeAxis, {} except ValueError: pass return LinearAxis, {} else: raise ValueError(f"Unrecognized axis_type: '{axis_type!r}'")
[docs]def match_ax_type(ax, range_type): """ Ensure the range_type matches the axis model being matched. """ if isinstance(ax, CategoricalAxis): return range_type == 'categorical' elif isinstance(ax, DatetimeAxis): return range_type == 'datetime' else: return range_type in ('auto', 'log')
[docs]def match_yaxis_type_to_range(yax, range_type, range_name): "Apply match_ax_type to the y-axis found by the given range name " for axis in yax: if axis.y_range_name == range_name: return match_ax_type(axis, range_type) raise ValueError('No axis with given range found')
[docs]def wrap_formatter(formatter, axis): """ Wraps formatting function or string in appropriate bokeh formatter type. """ if isinstance(formatter, TickFormatter): pass else: formatter = PrintfTickFormatter(format=formatter) return formatter
[docs]def property_to_dict(x): """ Convert Bokeh's property Field and Value to a dictionary """ try: from bokeh.core.property.vectorization import Field, Unspecified, Value if isinstance(x, (Field, Value)): x = {k: v for k, v in x.__dict__.items() if v != Unspecified} except ImportError: pass return x
def dtype_fix_hook(plot, element): # Work-around for problems seen in: # https://github.com/holoviz/holoviews/issues/5722 # https://github.com/holoviz/holoviews/issues/5726 # https://github.com/holoviz/holoviews/issues/5941 # Should be fixed in Bokeh: # https://github.com/bokeh/bokeh/issues/13155 with suppress(Exception): renderers = plot.handles["plot"].renderers for renderer in renderers: with suppress(Exception): data = renderer.data_source.data for k, v in data.items(): if hasattr(v, "dtype") and v.dtype.kind == "U": data[k] = v.tolist()