Source code for ntcore.util

import threading
import weakref

from typing import Callable, Dict, Optional, Sequence

from ._ntcore import NetworkTableInstance, NetworkTableEntry, NetworkTableType, Value

__all__ = ["ntproperty", "ChooserControl"]


class _NtProperty:
    """
    Don't use this directly, use @ntproperty instead
    """

    entry: NetworkTableEntry
    defaultValue: Value

    _instlock = threading.Lock()
    _instances: Dict[int, weakref.WeakSet] = {}

    @classmethod
    def attach(cls, self: "_NtProperty", inst: NetworkTableInstance):
        with cls._instlock:
            handle = inst._getHandle()
            props = cls._instances.get(handle)
            if not props:
                props = weakref.WeakSet()
                cls._instances[handle] = props
            props.add(self)

    @classmethod
    def onInstancePostReset(cls, inst: NetworkTableInstance):
        with cls._instlock:
            props = cls._instances.get(inst._getHandle(), [])
            for prop in props:
                prop.reset()

    @classmethod
    def onInstanceDestroy(cls, inst: NetworkTableInstance):
        with cls._instlock:
            cls._instances.pop(inst._getHandle(), None)

    def __init__(
        self,
        key: str,
        defaultValue,
        writeDefault: bool,
        persistent: bool,
        type: Optional[NetworkTableType],
        inst: NetworkTableInstance,
    ) -> None:
        # Autodetect the type if not provided, and store the default
        # value as that specific type
        if type is None:
            self.defaultValue = Value.makeValue(defaultValue)
            self.mkv = Value.getFactoryByType(self.defaultValue.type())
        else:
            self.mkv = Value.getFactoryByType(type)
            self.defaultValue = self.mkv(defaultValue)

        self.key = key
        self.writeDefault = writeDefault
        self.persistent = persistent
        # never overwrite persistent values with defaults
        if persistent:
            self.writeDefault = False
        self.inst = inst
        _NtProperty.attach(self, inst)

        # Set it up
        self.reset()

    def reset(self):
        self.entry = self.inst.getEntry(self.key)
        if self.writeDefault:
            print("set v", self.defaultValue)
            self.entry.setValue(self.defaultValue)
        else:
            print("set default", self.defaultValue)
            self.entry.setDefaultValue(self.defaultValue)

        if self.persistent:
            self.entry.setPersistent()

    def get(self, _):
        return self.entry.value

    def set(self, _, value):
        self.entry.setValue(self.mkv(value))


[docs]def ntproperty( key: str, defaultValue, *, writeDefault: bool = True, doc: str = None, persistent: bool = False, type: Optional[NetworkTableType] = None, inst: Optional[NetworkTableInstance] = None ) -> property: """ A property that you can add to your classes to access NetworkTables variables like a normal variable. :param key: A full NetworkTables key (eg ``/SmartDashboard/foo``) :param defaultValue: Default value to use if not in the table :type defaultValue: any :param writeDefault: If True, put the default value to the table, overwriting existing values :param doc: If given, will be the docstring of the property. :param persistent: If True, persist set values across restarts. *writeDefault* is ignored if this is True. :param type: Specify the type of this entry. If not specified, will autodetect the type from the default value :param inst: The NetworkTables instance to use. Example usage:: class Foo(object): something = ntproperty('/SmartDashboard/something', True) ... def do_thing(self): if self.something: # reads from value ... self.something = False # writes value .. note:: When using empty lists/tuples, you must explicitly specify the type. .. warning:: This function assumes that the value's type never changes. If it does, you'll get really strange errors... so don't do that. """ if inst is None: inst = NetworkTableInstance.getDefault() ntprop = _NtProperty(key, defaultValue, writeDefault, persistent, type, inst) return property(fget=ntprop.get, fset=ntprop.set, doc=doc)
[docs]class ChooserControl: """ Interacts with a :class:`wpilib.SendableChooser` object over NetworkTables. """ def __init__( self, key: str, on_choices: Optional[Callable[[Sequence[str]], None]] = None, on_selected: Optional[Callable[[str], None]] = None, *, inst: Optional[NetworkTableInstance] = None ) -> None: """ :param key: NetworkTables key :param on_choices: A function that will be called when the choices change. :param on_selection: A function that will be called when the selection changes. :param inst: The NetworkTables instance to use. """ if inst is None: inst = NetworkTableInstance.getDefault() self.subtable = inst.getTable("SmartDashboard").getSubTable(key) self.on_choices = on_choices self.on_selected = on_selected if on_choices or on_selected: self.subtable.addTableListener(self._on_change, True)
[docs] def close(self) -> None: """Stops listening for changes to the ``SendableChooser``""" if self.on_choices or self.on_selected: self.subtable.removeTableListener(self._on_change)
[docs] def getChoices(self) -> Sequence[str]: """ Returns the current choices. If the chooser doesn't exist, this will return an empty tuple. """ return self.subtable.getStringArray("options", [])
[docs] def getSelected(self) -> Optional[str]: """ Returns the current selection or None """ selected = self.subtable.getString("selected", None) if selected is None: selected = self.subtable.getString("default", None) return selected
[docs] def setSelected(self, selection: str) -> None: """ Sets the active selection on the chooser :param selection: Active selection name """ self.subtable.putString("selected", selection)
def _on_change(self, table, key, value, isNew): if key == "options": if self.on_choices is not None: self.on_choices(value) elif key == "selected": if self.on_selected is not None: self.on_selected(value) elif key == "default": if ( self.on_selected is not None and self.subtable.getString("selected", None) is None ): self.on_selected(value)