Source code for holoviews.plotting.bokeh.callbacks

import asyncio
import base64
import time
from collections import defaultdict

import numpy as np
from bokeh.models import (
    BoxEditTool,
    CustomJS,
    DataRange1d,
    DatetimeAxis,
    FactorRange,
    FreehandDrawTool,
    PointDrawTool,
    PolyDrawTool,
    PolyEditTool,
    Range1d,
)
from panel.io.state import set_curdoc, state

from ...core.options import CallbackError
from ...core.util import datetime_types, dimension_sanitizer, dt64_to_dt, isequal
from ...element import Table
from ...streams import (
    BoundsX,
    BoundsXY,
    BoundsY,
    BoxEdit,
    CDSStream,
    CurveEdit,
    DoubleTap,
    Draw,
    FreehandDraw,
    Lasso,
    MouseEnter,
    MouseLeave,
    PanEnd,
    PlotReset,
    PlotSize,
    PointDraw,
    PointerX,
    PointerXY,
    PointerY,
    PolyDraw,
    PolyEdit,
    PressUp,
    RangeX,
    RangeXY,
    RangeY,
    Selection1D,
    SelectionXY,
    SelectMode,
    SingleTap,
    Stream,
    Tap,
)
from ...util.warnings import warn
from .util import bokeh33, convert_timestamp


[docs]class Callback: """ Provides a baseclass to define callbacks, which return data from bokeh model callbacks, events and attribute changes. The callback then makes this data available to any streams attached to it. The definition of a callback consists of a number of components: * models : Defines which bokeh models the callback will be attached on referencing the model by its key in the plots handles, e.g. this could be the x_range, y_range, plot, a plotting tool or any other bokeh mode. * attributes : The attributes define which attributes to send back to Python. They are defined as a dictionary mapping between the name under which the variable is made available to Python and the specification of the attribute. The specification should start with the variable name that is to be accessed and the location of the attribute separated by periods. All models defined by the models and can be addressed in this way, e.g. to get the start of the x_range as 'x' you can supply {'x': 'x_range.attributes.start'}. Additionally certain handles additionally make the cb_obj variables available containing additional information about the event. * on_events : If the Callback should listen to bokeh events this should declare the types of event as a list (optional) * on_changes : If the Callback should listen to model attribute changes on the defined ``models`` (optional) If either on_events or on_changes are declared the Callback will be registered using the on_event or on_change machinery, otherwise it will be treated as a regular callback on the model. The callback can also define a _process_msg method, which can modify the data sent by the callback before it is passed to the streams. A callback supports three different throttling modes: - adaptive (default): The callback adapts the throttling timeout depending on the rolling mean of the time taken to process each message. The rolling window is controlled by the `adaptive_window` value. - throttle: Uses the fixed `throttle_timeout` as the minimum amount of time between events. - debounce: Processes the message only when no new event has been received within the `throttle_timeout` duration. """ # Attributes to sync attributes = {} # The plotting handle(s) to attach the JS callback on models = [] # Additional handles to hash on for uniqueness extra_handles = [] # Conditions when callback should be skipped skip_events = [] skip_changes = [] # Callback will listen to events of the supplied type on the models on_events = [] # List of change events on the models to listen to on_changes = [] # Internal state _callbacks = {} _transforms = [] # Asyncio background task _background_task = set() def __init__(self, plot, streams, source, **params): self.plot = plot self.streams = streams self.source = source self.handle_ids = defaultdict(dict) self.reset() self._active = False self._prev_msg = None def _transform(self, msg): for transform in self._transforms: msg = transform(msg, self) return msg def _process_msg(self, msg): """ Subclassable method to preprocess JSON message in callback before passing to stream. """ return self._transform(msg) def cleanup(self): self.reset() self.handle_ids = None self.plot = None self.source = None self.streams = [] Callback._callbacks = {k: cb for k, cb in Callback._callbacks.items() if cb is not self} def reset(self): if self.handle_ids: handles = self._init_plot_handles() for handle_name in self.models: if handle_name not in handles: continue handle = handles[handle_name] cb_hash = (id(handle), id(type(self))) self._callbacks.pop(cb_hash, None) self.plot_handles = {} self._queue = [] def _filter_msg(self, msg, ids): """ Filter event values that do not originate from the plotting handles associated with a particular stream using their ids to match them. """ filtered_msg = {} for k, v in msg.items(): if isinstance(v, dict) and 'id' in v: if v['id'] in ids: filtered_msg[k] = v['value'] else: filtered_msg[k] = v return filtered_msg def on_msg(self, msg): streams = [] for stream in self.streams: handle_ids = self.handle_ids[stream] ids = list(handle_ids.values()) filtered_msg = self._filter_msg(msg, ids) processed_msg = self._process_msg(filtered_msg) if not processed_msg: continue stream.update(**processed_msg) stream._metadata = {h: {'id': hid, 'events': self.on_events} for h, hid in handle_ids.items()} streams.append(stream) try: with set_curdoc(self.plot.document): Stream.trigger(streams) except CallbackError as e: if self.plot.root and self.plot.root.ref['id'] in state._handles: handle, _ = state._handles[self.plot.root.ref['id']] handle.update({'text/html': str(e)}, raw=True) else: raise e except Exception as e: raise e finally: for stream in streams: stream._metadata = {} def _init_plot_handles(self): """ Find all requested plotting handles and cache them along with the IDs of the models the callbacks will be attached to. """ plots = [self.plot] if self.plot.subplots: plots += list(self.plot.subplots.values()) handles = {} for plot in plots: for k, v in plot.handles.items(): handles[k] = v self.plot_handles = handles requested = {} for h in self.models+self.extra_handles: if h in self.plot_handles: requested[h] = handles[h] self.handle_ids.update(self._get_stream_handle_ids(requested)) return requested def _get_stream_handle_ids(self, handles): """ Gather the ids of the plotting handles attached to this callback This allows checking that a stream is not given the state of a plotting handle it wasn't attached to """ stream_handle_ids = defaultdict(dict) for stream in self.streams: for h in self.models+self.extra_handles: if h in handles: handle_id = handles[h].ref['id'] stream_handle_ids[stream][h] = handle_id return stream_handle_ids
[docs] @classmethod def resolve_attr_spec(cls, spec, cb_obj, model=None): """ Resolves a Callback attribute specification looking the corresponding attribute up on the cb_obj, which should be a bokeh model. If not model is supplied cb_obj is assumed to be the same as the model. """ if not cb_obj: raise AttributeError(f'Bokeh plot attribute {spec} could not be found') if model is None: model = cb_obj spec = spec.split('.') resolved = cb_obj for p in spec[1:]: if p == 'attributes': continue if isinstance(resolved, dict): resolved = resolved.get(p) else: resolved = getattr(resolved, p, None) return {'id': model.ref['id'], 'value': resolved}
def skip_event(self, event): return any(skip(event) for skip in self.skip_events) def skip_change(self, msg): return any(skip(msg) for skip in self.skip_changes) def _set_busy(self, busy): """ Sets panel.state to busy if available. """ if 'busy' not in state.param: return # Check if busy state is supported from panel.util import edit_readonly with edit_readonly(state): state.busy = busy
[docs] async def on_change(self, attr, old, new): """ Process change events adding timeout to process multiple concerted value change at once rather than firing off multiple plot updates. """ self._queue.append((attr, old, new, time.time())) if not self._active and self.plot.document: self._active = True self._set_busy(True) task = asyncio.create_task(self.process_on_change()) self._background_task.add(task) task.add_done_callback(self._background_task.discard)
[docs] async def on_event(self, event): """ Process bokeh UIEvents adding timeout to process multiple concerted value change at once rather than firing off multiple plot updates. """ self._queue.append((event, time.time())) if not self._active and self.plot.document: self._active = True self._set_busy(True) task = asyncio.create_task(self.process_on_event()) self._background_task.add(task) task.add_done_callback(self._background_task.discard)
[docs] async def process_on_event(self, timeout=None): """ Trigger callback change event and triggering corresponding streams. """ await asyncio.sleep(0.01) if not self._queue: self._active = False self._set_busy(False) return # Get unique event types in the queue events = list(dict([(event.event_name, event) for event, dt in self._queue]).values()) self._queue = [] # Process event types for event in events: if self.skip_event(event): continue msg = {} for attr, path in self.attributes.items(): model_obj = self.plot_handles.get(self.models[0]) msg[attr] = self.resolve_attr_spec(path, event, model_obj) self.on_msg(msg) task = asyncio.create_task(self.process_on_event()) self._background_task.add(task) task.add_done_callback(self._background_task.discard)
async def process_on_change(self): # Give on_change time to process new events await asyncio.sleep(0.01) if not self._queue: self._active = False self._set_busy(False) return self._queue = [] msg = {} for attr, path in self.attributes.items(): attr_path = path.split('.') if attr_path[0] == 'cb_obj': obj_handle = self.models[0] path = '.'.join(self.models[:1]+attr_path[1:]) else: obj_handle = attr_path[0] cb_obj = self.plot_handles.get(obj_handle) try: msg[attr] = self.resolve_attr_spec(path, cb_obj) except Exception: # To give BokehJS a chance to update the model # https://github.com/holoviz/holoviews/issues/5746 await asyncio.sleep(0.05) msg[attr] = self.resolve_attr_spec(path, cb_obj) if self.skip_change(msg): equal = True else: equal = isequal(msg, self._prev_msg) if not equal or any(s.transient for s in self.streams): self.on_msg(msg) self._prev_msg = msg task = asyncio.create_task(self.process_on_change()) self._background_task.add(task) task.add_done_callback(self._background_task.discard)
[docs] def set_callback(self, handle): """ Set up on_change events for bokeh server interactions. """ if self.on_events: event_handler = lambda event: ( asyncio.create_task(self.on_event(event)) ) for event in self.on_events: handle.on_event(event, event_handler) if self.on_changes: change_handler = lambda attr, old, new: ( asyncio.create_task(self.on_change(attr, old, new)) ) for change in self.on_changes: if change in ['patching', 'streaming']: # Patch and stream events do not need handling on server continue handle.on_change(change, change_handler)
def initialize(self, plot_id=None): handles = self._init_plot_handles() hash_handles, cb_handles = [], [] for handle_name in self.models+self.extra_handles: if handle_name not in handles: warn_args = (handle_name, type(self.plot).__name__, type(self).__name__) print('{} handle not found on {}, cannot ' 'attach {} callback'.format(*warn_args)) continue handle = handles[handle_name] if handle_name not in self.extra_handles: cb_handles.append(handle) hash_handles.append(handle) # Hash the plot handle with Callback type allowing multiple # callbacks on one handle to be merged hash_ids = [id(h) for h in hash_handles] cb_hash = tuple(hash_ids)+(id(type(self)),) if cb_hash in self._callbacks: # Merge callbacks if another callback has already been attached cb = self._callbacks[cb_hash] cb.streams = list(set(cb.streams+self.streams)) for k, v in self.handle_ids.items(): cb.handle_ids[k].update(v) self.cleanup() return for handle in cb_handles: self.set_callback(handle) self._callbacks[cb_hash] = self
[docs]class PointerXYCallback(Callback): """ Returns the mouse x/y-position on mousemove event. """ attributes = {'x': 'cb_obj.x', 'y': 'cb_obj.y'} models = ['plot'] on_events = ['mousemove'] def _process_out_of_bounds(self, value, start, end): "Clips out of bounds values" if isinstance(value, np.datetime64): v = dt64_to_dt(value) if isinstance(start, (int, float)): start = convert_timestamp(start) if isinstance(end, (int, float)): end = convert_timestamp(end) s, e = start, end if isinstance(s, np.datetime64): s = dt64_to_dt(s) if isinstance(e, np.datetime64): e = dt64_to_dt(e) else: v, s, e = value, start, end if v < s: value = start elif v > e: value = end return value def _process_msg(self, msg): x_range = self.plot.handles.get('x_range') y_range = self.plot.handles.get('y_range') xaxis = self.plot.handles.get('xaxis') yaxis = self.plot.handles.get('yaxis') if 'x' in msg and isinstance(xaxis, DatetimeAxis): msg['x'] = convert_timestamp(msg['x']) if 'y' in msg and isinstance(yaxis, DatetimeAxis): msg['y'] = convert_timestamp(msg['y']) if isinstance(x_range, FactorRange) and isinstance(msg.get('x'), (int, float)): msg['x'] = x_range.factors[int(msg['x'])] elif 'x' in msg and isinstance(x_range, (Range1d, DataRange1d)): xstart, xend = x_range.start, x_range.end if xstart > xend: xstart, xend = xend, xstart x = self._process_out_of_bounds(msg['x'], xstart, xend) if x is None: msg = {} else: msg['x'] = x if isinstance(y_range, FactorRange) and isinstance(msg.get('y'), (int, float)): msg['y'] = y_range.factors[int(msg['y'])] elif 'y' in msg and isinstance(y_range, (Range1d, DataRange1d)): ystart, yend = y_range.start, y_range.end if ystart > yend: ystart, yend = yend, ystart y = self._process_out_of_bounds(msg['y'], ystart, yend) if y is None: msg = {} else: msg['y'] = y return self._transform(msg)
[docs]class PointerXCallback(PointerXYCallback): """ Returns the mouse x-position on mousemove event. """ attributes = {'x': 'cb_obj.x'}
[docs]class PointerYCallback(PointerXYCallback): """ Returns the mouse x/y-position on mousemove event. """ attributes = {'y': 'cb_obj.y'}
[docs]class DrawCallback(PointerXYCallback): on_events = ['pan', 'panstart', 'panend'] models = ['plot'] attributes = {'x': 'cb_obj.x', 'y': 'cb_obj.y', 'event': 'cb_obj.event_name'} def __init__(self, *args, **kwargs): self.stroke_count = 0 super().__init__(*args, **kwargs) def _process_msg(self, msg): event = msg.pop('event') if event == 'panend': self.stroke_count += 1 return self._transform(dict(msg, stroke_count=self.stroke_count))
[docs]class TapCallback(PointerXYCallback): """ Returns the mouse x/y-position on tap event. Note: As of bokeh 0.12.5, there is no way to distinguish the individual tap events within a doubletap event. """ on_events = ['tap', 'doubletap'] def _process_out_of_bounds(self, value, start, end): "Sets out of bounds values to None" if isinstance(value, np.datetime64): v = dt64_to_dt(value) if isinstance(start, (int, float)): start = convert_timestamp(start) if isinstance(end, (int, float)): end = convert_timestamp(end) s, e = start, end if isinstance(s, np.datetime64): s = dt64_to_dt(s) if isinstance(e, np.datetime64): e = dt64_to_dt(e) else: v, s, e = value, start, end if v < s or v > e: value = None return value
[docs]class SingleTapCallback(TapCallback): """ Returns the mouse x/y-position on tap event. """ on_events = ['tap']
[docs]class PressUpCallback(TapCallback): """ Returns the mouse x/y-position of a pressup mouse event. """ on_events = ['pressup']
[docs]class PanEndCallback(TapCallback): """ Returns the mouse x/y-position of a pan end event. """ on_events = ['panend']
[docs]class DoubleTapCallback(TapCallback): """ Returns the mouse x/y-position on doubletap event. """ on_events = ['doubletap']
[docs]class MouseEnterCallback(PointerXYCallback): """ Returns the mouse x/y-position on mouseenter event, i.e. when mouse enters the plot canvas. """ on_events = ['mouseenter']
[docs]class MouseLeaveCallback(PointerXYCallback): """ Returns the mouse x/y-position on mouseleave event, i.e. when mouse leaves the plot canvas. """ on_events = ['mouseleave']
[docs]class RangeXYCallback(Callback): """ Returns the x/y-axis ranges of a plot. """ on_events = ['rangesupdate'] models = ['plot'] extra_handles = ['x_range', 'y_range'] attributes = { 'x0': 'cb_obj.x0', 'y0': 'cb_obj.y0', 'x1': 'cb_obj.x1', 'y1': 'cb_obj.y1', } def _process_msg(self, msg): if self.plot.state.x_range is not self.plot.handles['x_range']: x_range = self.plot.handles['x_range'] msg['x0'], msg['x1'] = x_range.start, x_range.end if self.plot.state.y_range is not self.plot.handles['y_range']: y_range = self.plot.handles['y_range'] msg['y0'], msg['y1'] = y_range.start, y_range.end data = {} if 'x0' in msg and 'x1' in msg: x0, x1 = msg['x0'], msg['x1'] if isinstance(self.plot.handles.get('xaxis'), DatetimeAxis): if not isinstance(x0, datetime_types): x0 = convert_timestamp(x0) if not isinstance(x1, datetime_types): x1 = convert_timestamp(x1) if x0 > x1: x0, x1 = x1, x0 data['x_range'] = (x0, x1) if 'y0' in msg and 'y1' in msg: y0, y1 = msg['y0'], msg['y1'] if isinstance(self.plot.handles.get('yaxis'), DatetimeAxis): if not isinstance(y0, datetime_types): y0 = convert_timestamp(y0) if not isinstance(y1, datetime_types): y1 = convert_timestamp(y1) if y0 > y1: y0, y1 = y1, y0 data['y_range'] = (y0, y1) return self._transform(data)
[docs]class RangeXCallback(RangeXYCallback): """ Returns the x-axis range of a plot. """ on_events = ['rangesupdate'] models = ['plot'] attributes = { 'x0': 'cb_obj.x0', 'x1': 'cb_obj.x1', }
[docs]class RangeYCallback(RangeXYCallback): """ Returns the y-axis range of a plot. """ on_events = ['rangesupdate'] models = ['plot'] attributes = { 'y0': 'cb_obj.y0', 'y1': 'cb_obj.y1' }
[docs]class PlotSizeCallback(Callback): """ Returns the actual width and height of a plot once the layout solver has executed. """ models = ['plot'] attributes = {'width': 'cb_obj.inner_width', 'height': 'cb_obj.inner_height'} on_changes = ['inner_width', 'inner_height'] def _process_msg(self, msg): if msg.get('width') and msg.get('height'): return self._transform(msg) else: return {}
[docs]class SelectModeCallback(Callback): attributes = {'box_mode': 'box_select.mode', 'lasso_mode': 'lasso_select.mode'} models = ['box_select', 'lasso_select'] on_changes = ['mode'] def _process_msg(self, msg): stream = self.streams[0] if 'box_mode' in msg: mode = msg.pop('box_mode') if mode != stream.mode: msg['mode'] = mode if 'lasso_mode' in msg: mode = msg.pop('lasso_mode') if mode != stream.mode: msg['mode'] = mode return msg
[docs]class BoundsCallback(Callback): """ Returns the bounds of a box_select tool. """ attributes = {'x0': 'cb_obj.geometry.x0', 'x1': 'cb_obj.geometry.x1', 'y0': 'cb_obj.geometry.y0', 'y1': 'cb_obj.geometry.y1'} models = ['plot'] on_events = ['selectiongeometry'] skip_events = [lambda event: event.geometry['type'] != 'rect', lambda event: not event.final] def _process_msg(self, msg): if all(c in msg for c in ['x0', 'y0', 'x1', 'y1']): if isinstance(self.plot.handles.get('xaxis'), DatetimeAxis): msg['x0'] = convert_timestamp(msg['x0']) msg['x1'] = convert_timestamp(msg['x1']) if isinstance(self.plot.handles.get('yaxis'), DatetimeAxis): msg['y0'] = convert_timestamp(msg['y0']) msg['y1'] = convert_timestamp(msg['y1']) msg = {'bounds': (msg['x0'], msg['y0'], msg['x1'], msg['y1'])} return self._transform(msg) else: return {}
[docs]class SelectionXYCallback(BoundsCallback): """ Converts a bounds selection to numeric or categorical x-range and y-range selections. """ def _process_msg(self, msg): msg = super()._process_msg(msg) if 'bounds' not in msg: return msg el = self.plot.current_frame x0, y0, x1, y1 = msg['bounds'] x_range = self.plot.handles['x_range'] if isinstance(x_range, FactorRange): x0, x1 = int(round(x0)), int(round(x1)) xfactors = x_range.factors[x0: x1] if x_range.tags and x_range.tags[0]: xdim = el.get_dimension(x_range.tags[0][0][0]) if xdim and hasattr(el, 'interface'): dtype = el.interface.dtype(el, xdim) try: xfactors = list(np.array(xfactors).astype(dtype)) except Exception: pass msg['x_selection'] = xfactors else: msg['x_selection'] = (x0, x1) y_range = self.plot.handles['y_range'] if isinstance(y_range, FactorRange): y0, y1 = int(round(y0)), int(round(y1)) yfactors = y_range.factors[y0: y1] if y_range.tags and y_range.tags[0]: ydim = el.get_dimension(y_range.tags[0][0][0]) if ydim and hasattr(el, 'interface'): dtype = el.interface.dtype(el, ydim) try: yfactors = list(np.array(yfactors).astype(dtype)) except Exception: pass msg['y_selection'] = yfactors else: msg['y_selection'] = (y0, y1) return msg
[docs]class BoundsXCallback(Callback): """ Returns the bounds of a xbox_select tool. """ attributes = {'x0': 'cb_obj.geometry.x0', 'x1': 'cb_obj.geometry.x1'} models = ['plot'] on_events = ['selectiongeometry'] skip_events = [lambda event: event.geometry['type'] != 'quad', lambda event: not event.final] def _process_msg(self, msg): if all(c in msg for c in ['x0', 'x1']): if isinstance(self.plot.handles.get('xaxis'), DatetimeAxis): msg['x0'] = convert_timestamp(msg['x0']) msg['x1'] = convert_timestamp(msg['x1']) msg = {'boundsx': (msg['x0'], msg['x1'])} return self._transform(msg) else: return {}
[docs]class BoundsYCallback(Callback): """ Returns the bounds of a ybox_select tool. """ attributes = {'y0': 'cb_obj.geometry.y0', 'y1': 'cb_obj.geometry.y1'} models = ['plot'] on_events = ['selectiongeometry'] skip_events = [lambda event: event.geometry['type'] != 'quad', lambda event: not event.final] def _process_msg(self, msg): if all(c in msg for c in ['y0', 'y1']): if isinstance(self.plot.handles.get('yaxis'), DatetimeAxis): msg['y0'] = convert_timestamp(msg['y0']) msg['y1'] = convert_timestamp(msg['y1']) msg = {'boundsy': (msg['y0'], msg['y1'])} return self._transform(msg) else: return {}
[docs]class LassoCallback(Callback): attributes = {'xs': 'cb_obj.geometry.x', 'ys': 'cb_obj.geometry.y'} models = ['plot'] on_events = ['selectiongeometry'] skip_events = [lambda event: event.geometry['type'] != 'poly', lambda event: not event.final] def _process_msg(self, msg): if not all(c in msg for c in ('xs', 'ys')): return {} xs, ys = msg['xs'], msg['ys'] if isinstance(xs, dict): xs = ((int(i), x) for i, x in xs.items()) xs = [x for _, x in sorted(xs)] if isinstance(ys, dict): ys = ((int(i), y) for i, y in ys.items()) ys = [y for _, y in sorted(ys)] if xs is None or ys is None: return {} return {'geometry': np.column_stack([xs, ys])}
[docs]class Selection1DCallback(Callback): """ Returns the current selection on a ColumnDataSource. """ attributes = {'index': 'cb_obj.indices'} models = ['selected'] on_changes = ['indices'] def _process_msg(self, msg): el = self.plot.current_frame if 'index' in msg: msg = {'index': [int(v) for v in msg['index']]} if isinstance(el, Table): # Ensure that explicitly applied selection does not # trigger new events sel = el.opts.get('plot').kwargs.get('selected') if sel is not None and list(sel) == msg['index']: return {} return self._transform(msg) else: return {}
[docs]class ResetCallback(Callback): """ Signals the Reset stream if an event has been triggered. """ models = ['plot'] on_events = ['reset'] def _process_msg(self, msg): msg = {'resetting': True} return self._transform(msg)
[docs]class CDSCallback(Callback): """ A Stream callback that syncs the data on a bokeh ColumnDataSource model with Python. """ attributes = {'data': 'source.data'} models = ['source'] on_changes = ['data', 'patching'] def initialize(self, plot_id=None): super().initialize(plot_id) plot = self.plot data = self._process_msg({'data': plot.handles['source'].data})['data'] for stream in self.streams: stream.update(data=data) def _process_msg(self, msg): if 'data' not in msg: return {} msg['data'] = dict(msg['data']) for col, values in msg['data'].items(): if isinstance(values, dict): shape = values.pop('shape', None) dtype = values.pop('dtype', None) values.pop('dimension', None) items = sorted([(int(k), v) for k, v in values.items()]) values = [v for k, v in items] if dtype is not None: values = np.array(values, dtype=dtype).reshape(shape) elif isinstance(values, list) and values and isinstance(values[0], dict): new_values = [] for vals in values: if isinstance(vals, dict): shape = vals.pop('shape', None) dtype = vals.pop('dtype', None) vals.pop('dimension', None) vals = sorted([(int(k), v) for k, v in vals.items()]) vals = [v for k, v in vals] if dtype is not None: vals = np.array(vals, dtype=dtype).reshape(shape) new_values.append(vals) values = new_values elif any(isinstance(v, (int, float)) for v in values): values = [np.nan if v is None else v for v in values] elif ( isinstance(values, list) and len(values) == 4 and values[2] in ("big", "little") and isinstance(values[3], list) ): # Account for issue seen in https://github.com/holoviz/geoviews/issues/584 # This could be fixed in Bokeh 3.0, but has not been tested. # Example: # ['pm9vF9dSY8EAAADgPFNjwQAAAMAmU2PBAAAAAMtSY8E=','float64', 'little', [4]] buffer = base64.decodebytes(values[0].encode()) dtype = np.dtype(values[1]).newbyteorder(values[2]) values = np.frombuffer(buffer, dtype) msg['data'][col] = values return self._transform(msg)
[docs]class GlyphDrawCallback(CDSCallback): _style_callback = """ var types = Bokeh.require("core/util/types"); var changed = false for (var i = 0; i < cb_obj.length; i++) { for (var style in styles) { var value = styles[style]; if (types.isArray(value)) { value = value[i % value.length]; } if (cb_obj.data[style][i] !== value) { cb_obj.data[style][i] = value; changed = true; } } } if (changed) cb_obj.change.emit() """ def _create_style_callback(self, cds, glyph): stream = self.streams[0] col = cds.column_names[0] length = len(cds.data[col]) for style, values in stream.styles.items(): cds.data[style] = [values[i % len(values)] for i in range(length)] setattr(glyph, style, style) cb = CustomJS(code=self._style_callback, args={'styles': stream.styles, 'empty': stream.empty_value}) cds.js_on_change('data', cb) def _update_cds_vdims(self, data): """ Add any value dimensions not already in the data ensuring the element can be reconstituted in entirety. """ element = self.plot.current_frame stream = self.streams[0] for d in element.vdims: dim = dimension_sanitizer(d.name) if dim in data: continue values = element.dimension_values(d) if len(values) != len(next(iter(data.values()))): values = np.concatenate([values, [stream.empty_value]]) data[dim] = values
[docs]class PointDrawCallback(GlyphDrawCallback): def initialize(self, plot_id=None): plot = self.plot stream = self.streams[0] cds = plot.handles['source'] glyph = plot.handles['glyph'] renderers = [plot.handles['glyph_renderer']] kwargs = {} if stream.num_objects: kwargs['num_objects'] = stream.num_objects if stream.tooltip: kwargs['description'] = stream.tooltip if stream.styles: self._create_style_callback(cds, glyph) if stream.empty_value is not None: kwargs['empty_value'] = stream.empty_value point_tool = PointDrawTool( add=all(s.add for s in self.streams), drag=all(s.drag for s in self.streams), renderers=renderers, **kwargs) self.plot.state.tools.append(point_tool) self._update_cds_vdims(cds.data) # Add any value dimensions not already in the CDS data # ensuring the element can be reconstituted in entirety super().initialize(plot_id) def _process_msg(self, msg): self._update_cds_vdims(msg['data']) return super()._process_msg(msg)
[docs]class CurveEditCallback(GlyphDrawCallback): def initialize(self, plot_id=None): plot = self.plot stream = self.streams[0] cds = plot.handles['cds'] glyph = plot.handles['glyph'] renderer = plot.state.scatter(glyph.x, glyph.y, source=cds, visible=False, **stream.style) renderers = [renderer] kwargs = {} if stream.tooltip: kwargs['description'] = stream.tooltip point_tool = PointDrawTool( add=False, drag=True, renderers=renderers, **kwargs ) code="renderer.visible = tool.active || (cds.selected.indices.length > 0)" show_vertices = CustomJS(args={'renderer': renderer, 'cds': cds, 'tool': point_tool}, code=code) point_tool.js_on_change('change:active', show_vertices) cds.selected.js_on_change('indices', show_vertices) self.plot.state.tools.append(point_tool) self._update_cds_vdims(cds.data) super().initialize(plot_id) def _process_msg(self, msg): self._update_cds_vdims(msg['data']) return super()._process_msg(msg) def _update_cds_vdims(self, data): """ Add any value dimensions not already in the data ensuring the element can be reconstituted in entirety. """ element = self.plot.current_frame for d in element.vdims: dim = dimension_sanitizer(d.name) if dim not in data: data[dim] = element.dimension_values(d)
[docs]class PolyDrawCallback(GlyphDrawCallback): def initialize(self, plot_id=None): plot = self.plot stream = self.streams[0] cds = self.plot.handles['cds'] glyph = self.plot.handles['glyph'] renderers = [plot.handles['glyph_renderer']] kwargs = {} if stream.num_objects: kwargs['num_objects'] = stream.num_objects if stream.show_vertices: vertex_style = dict({'size': 10}, **stream.vertex_style) r1 = plot.state.scatter([], [], **vertex_style) kwargs['vertex_renderer'] = r1 if stream.styles: self._create_style_callback(cds, glyph) if stream.tooltip: kwargs['description'] = stream.tooltip if stream.empty_value is not None: kwargs['empty_value'] = stream.empty_value poly_tool = PolyDrawTool( drag=all(s.drag for s in self.streams), renderers=renderers, **kwargs ) plot.state.tools.append(poly_tool) self._update_cds_vdims(cds.data) super().initialize(plot_id) def _process_msg(self, msg): self._update_cds_vdims(msg['data']) return super()._process_msg(msg) def _update_cds_vdims(self, data): """ Add any value dimensions not already in the data ensuring the element can be reconstituted in entirety. """ element = self.plot.current_frame stream = self.streams[0] interface = element.interface scalar_kwargs = {'per_geom': True} if interface.multi else {} for d in element.vdims: scalar = element.interface.isunique(element, d, **scalar_kwargs) dim = dimension_sanitizer(d.name) if dim not in data: if scalar: values = element.dimension_values(d, not scalar) else: values = [arr[:, 0] for arr in element.split(datatype='array', dimensions=[dim])] if len(values) != len(data['xs']): values = np.concatenate([values, [stream.empty_value]]) data[dim] = values
[docs]class FreehandDrawCallback(PolyDrawCallback): def initialize(self, plot_id=None): plot = self.plot cds = plot.handles['cds'] glyph = plot.handles['glyph'] stream = self.streams[0] if stream.styles: self._create_style_callback(cds, glyph) kwargs = {} if stream.tooltip: kwargs['description'] = stream.tooltip if stream.empty_value is not None: kwargs['empty_value'] = stream.empty_value poly_tool = FreehandDrawTool( num_objects=stream.num_objects, renderers=[plot.handles['glyph_renderer']], **kwargs ) plot.state.tools.append(poly_tool) self._update_cds_vdims(cds.data) CDSCallback.initialize(self, plot_id)
[docs]class BoxEditCallback(GlyphDrawCallback): attributes = {'data': 'cds.data'} models = ['cds'] def _path_initialize(self): plot = self.plot cds = plot.handles['cds'] data = cds.data element = self.plot.current_frame l, b, r, t = [], [], [], [] for x, y in zip(data['xs'], data['ys']): x0, x1 = (np.nanmin(x), np.nanmax(x)) y0, y1 = (np.nanmin(y), np.nanmax(y)) l.append(x0) b.append(y0) r.append(x1) t.append(y1) data = {'left': l, 'bottom': b, 'right': r, 'top': t} data.update({vd.name: element.dimension_values(vd, expanded=False) for vd in element.vdims}) cds.data.update(data) style = self.plot.style[self.plot.cyclic_index] style.pop('cmap', None) r1 = plot.state.quad(left='left', bottom='bottom', right='right', top='top', source=cds, **style) if plot.handles['glyph_renderer'] in self.plot.state.renderers: self.plot.state.renderers.remove(plot.handles['glyph_renderer']) data = self._process_msg({'data': data})['data'] for stream in self.streams: stream.update(data=data) return r1 def initialize(self, plot_id=None): from .path import PathPlot stream = self.streams[0] cds = self.plot.handles['cds'] kwargs = {} if stream.num_objects: kwargs['num_objects'] = stream.num_objects if stream.tooltip: kwargs['description'] = stream.tooltip renderer = self.plot.handles['glyph_renderer'] if isinstance(self.plot, PathPlot): renderer = self._path_initialize() if stream.styles: self._create_style_callback(cds, renderer.glyph) if bokeh33: # First version with Quad support box_tool = BoxEditTool(renderers=[renderer], **kwargs) self.plot.state.tools.append(box_tool) else: warn("BoxEditTool requires Bokeh >= 3.3") self._update_cds_vdims(cds.data) super(CDSCallback, self).initialize() def _process_msg(self, msg): data = super()._process_msg(msg) if 'data' not in data: return {} values = dict(data['data']) values['x0'] = values.pop("left") values['y0'] = values.pop("bottom") values['x1'] = values.pop("right") values['y1'] = values.pop("top") msg = {'data': values} self._update_cds_vdims(msg['data']) return self._transform(msg)
[docs]class PolyEditCallback(PolyDrawCallback): def initialize(self, plot_id=None): plot = self.plot cds = plot.handles['cds'] vertex_tool = None if all(s.shared for s in self.streams): tools = [tool for tool in plot.state.tools if isinstance(tool, PolyEditTool)] vertex_tool = tools[0] if tools else None stream = self.streams[0] kwargs = {} if stream.tooltip: kwargs['description'] = stream.tooltip if vertex_tool is None: vertex_style = dict({'size': 10}, **stream.vertex_style) r1 = plot.state.scatter([], [], **vertex_style) vertex_tool = PolyEditTool(vertex_renderer=r1, **kwargs) plot.state.tools.append(vertex_tool) vertex_tool.renderers.append(plot.handles['glyph_renderer']) self._update_cds_vdims(cds.data) CDSCallback.initialize(self, plot_id)
Stream._callbacks['bokeh'].update({ PointerXY : PointerXYCallback, PointerX : PointerXCallback, PointerY : PointerYCallback, Tap : TapCallback, SingleTap : SingleTapCallback, DoubleTap : DoubleTapCallback, PressUp : PressUpCallback, PanEnd : PanEndCallback, MouseEnter : MouseEnterCallback, MouseLeave : MouseLeaveCallback, RangeXY : RangeXYCallback, RangeX : RangeXCallback, RangeY : RangeYCallback, BoundsXY : BoundsCallback, BoundsX : BoundsXCallback, BoundsY : BoundsYCallback, Lasso : LassoCallback, Selection1D : Selection1DCallback, PlotSize : PlotSizeCallback, SelectionXY : SelectionXYCallback, Draw : DrawCallback, PlotReset : ResetCallback, CDSStream : CDSCallback, BoxEdit : BoxEditCallback, PointDraw : PointDrawCallback, CurveEdit : CurveEditCallback, FreehandDraw: FreehandDrawCallback, PolyDraw : PolyDrawCallback, PolyEdit : PolyEditCallback, SelectMode : SelectModeCallback })