Source code for _pynetworktables.table

__all__ = ["NetworkTable"]

from typing import Callable, List, Optional, Sequence

from ._impl.constants import (
    NT_BOOLEAN,
    NT_DOUBLE,
    NT_STRING,
    NT_RAW,
    NT_BOOLEAN_ARRAY,
    NT_DOUBLE_ARRAY,
    NT_STRING_ARRAY,
    NT_PERSISTENT,
    NT_NOTIFY_IMMEDIATE,
    NT_NOTIFY_LOCAL,
    NT_NOTIFY_NEW,
    NT_NOTIFY_DELETE,
    NT_NOTIFY_UPDATE,
    NT_NOTIFY_FLAGS,
)

from ._impl.value import Value

from .entry import NetworkTableEntry

import logging

logger = logging.getLogger("nt")
_is_new = NT_NOTIFY_IMMEDIATE | NT_NOTIFY_NEW


[docs]class NetworkTable: """ This is a NetworkTable object, it allows you to interact with NetworkTables in a table-based manner. You should not directly create a NetworkTable object, but instead use :meth:`.NetworkTables.getTable` to retrieve a NetworkTable instance. For example, to interact with the SmartDashboard:: from networktables import NetworkTables sd = NetworkTables.getTable('SmartDashboard') someNumberEntry = sd.getEntry('someNumber') someNumberEntry.putNumber(1234) ... .. seealso:: - The examples in the documentation. - :class:`.NetworkTablesInstance` """ PATH_SEPARATOR = "/" def __init__(self, path, api, inst): #: Path of table without trailing slash self.path = path self._path = path + self.PATH_SEPARATOR self._pathsz = len(self._path) self._api = api self._inst = inst self._listeners = {} def __str__(self): return "NetworkTable: %s" % self._path def __repr__(self): return "<NetworkTable path=%s>" % self._path
[docs] def getEntry(self, key: str) -> NetworkTableEntry: """Gets the entry for a subkey. This is the preferred API to use to access NetworkTable keys. :rtype: :class:`.NetworkTableEntry` .. versionadded:: 2018.0.0 """ return self._inst.getEntry(self._path + key)
[docs] def getPath(self) -> str: """Gets the full path of this table. Does not include the trailing "/". :returns: The path (e.g "", "/foo"). """ return self._path
[docs] def addEntryListener( self, listener: Callable, immediateNotify: bool = False, key: Optional[str] = None, localNotify: bool = False, ) -> None: """Adds a listener that will be notified when any key in this NetworkTable is changed, or when a specified key changes. The listener is called from the NetworkTables I/O thread, and should return as quickly as possible. :param listener: A callable with signature `callable(source, key, value, isNew)` :param immediateNotify: If True, the listener will be called immediately with the current values of the table :param key: If specified, the listener will only be called when this key is changed :param localNotify: True if you wish to be notified of changes made locally (default is False) .. warning:: You may call the NetworkTables API from within the listener, but it is not recommended .. versionchanged:: 2017.0.0 Added localNotify parameter (defaults to False, which is different from NT2) """ flags = NT_NOTIFY_NEW | NT_NOTIFY_UPDATE if immediateNotify: flags |= NT_NOTIFY_IMMEDIATE if localNotify: flags |= NT_NOTIFY_LOCAL self.addEntryListenerEx(listener, flags, key=key)
[docs] def addEntryListenerEx( self, listener: Callable, flags: int, key: Optional[str] = None, paramIsNew: bool = True, ) -> None: """Adds a listener that will be notified when any key in this NetworkTable is changed, or when a specified key changes. The listener is called from the NetworkTables I/O thread, and should return as quickly as possible. :param listener: A callable with signature `callable(source, key, value, param)` :param flags: Bitmask of flags that indicate the types of notifications you wish to receive :type flags: :class:`.NotifyFlags` :param key: If specified, the listener will only be called when this key is changed :param paramIsNew: If True, the listener fourth parameter is a boolean set to True if the listener is being called because of a new value in the table. Otherwise, the parameter is an integer of the raw `NT_NOTIFY_*` flags .. warning:: You may call the NetworkTables API from within the listener, but it is not recommended .. versionadded:: 2017.0.0 """ # if key is None: # Any key in this table (but not subtables) if key is None: # Any key in this table (but not subtables) _pathsz = self._pathsz if paramIsNew: def callback(item): key_, value_, flags_, _ = item key_ = key_[_pathsz:] if "/" not in key_: listener(self, key_, value_.value, (flags_ & _is_new) != 0) else: def callback(item): key_, value_, flags_, _ = item key_ = key_[_pathsz:] if "/" not in key_: listener(self, key_, value_.value, flags_) uid = self._api.addEntryListener(self._path, callback, flags) # Hack: Internal flag used by addGlobalListener* elif key == 0xDEADBEEF: if paramIsNew: def callback(item): key_, value_, flags_, _ = item listener(key_, value_.value, (flags_ & _is_new) != 0) else: callback = listener uid = self._api.addEntryListener("/", callback, flags) else: entry_id = self._api.getEntryId(self._path + key) uid = self._api.addEntryListenerByIdEx( self, key, entry_id, listener, flags, paramIsNew ) self._listeners.setdefault(listener, []).append(uid)
# deprecated aliases addTableListener = addEntryListener addTableListenerEx = addEntryListenerEx
[docs] def addSubTableListener( self, listener: Callable, localNotify: bool = False ) -> None: """Adds a listener that will be notified when any key in a subtable of this NetworkTable is changed. The listener is called from the NetworkTables I/O thread, and should return as quickly as possible. :param listener: Callable to call when previously unseen table appears. Function signature is `callable(source, key, subtable, True)` :param localNotify: True if you wish to be notified when local changes result in a new table .. warning:: You may call the NetworkTables API from within the listener, but it is not recommended as we are not currently sure if deadlocks will occur .. versionchanged:: 2017.0.0 Added localNotify parameter """ notified_tables = {} def _callback(item): key, value_, _1, _2 = item key = key[self._pathsz :] if "/" in key: skey = key[: key.index("/")] o = object() if notified_tables.setdefault(skey, o) is o: try: listener(self, skey, self.getSubTable(skey), True) except Exception: logger.warning( "Unhandled exception in %s", listener, exc_info=True ) flags = NT_NOTIFY_NEW | NT_NOTIFY_IMMEDIATE if localNotify: flags |= NT_NOTIFY_LOCAL uid = self._api.addEntryListener(self._path, _callback, flags) self._listeners.setdefault(listener, []).append(uid)
[docs] def removeEntryListener(self, listener: Callable) -> None: """Removes a table listener :param listener: callable that was passed to :meth:`.addTableListener` or :meth:`.addSubTableListener` """ uids = self._listeners.pop(listener, []) for uid in uids: self._api.removeEntryListener(uid)
# Deprecated alias removeTableListener = removeEntryListener
[docs] def getSubTable(self, key: str) -> "NetworkTable": """Returns the table at the specified key. If there is no table at the specified key, it will create a new table :param key: the key name :returns: the networktable to be returned """ path = self._path + key return self._inst.getTable(path)
[docs] def containsKey(self, key: str) -> bool: """Determines whether the given key is in this table. :param key: the key to search for :returns: True if the table as a value assigned to the given key """ path = self._path + key return self._api.getEntryValue(path) is not None
def __contains__(self, key: str) -> bool: return self.containsKey(key)
[docs] def containsSubTable(self, key: str) -> bool: """Determines whether there exists a non-empty subtable for this key in this table. :param key: the key to search for (must not end with path separator) :returns: True if there is a subtable with the key which contains at least one key/subtable of its own """ path = self._path + key + self.PATH_SEPARATOR return len(self._api.getEntryInfo(path, 0)) > 0
[docs] def getKeys(self, types: int = 0) -> List[str]: """ :param types: bitmask of types; 0 is treated as a "don't care". :type types: :class:`.EntryTypes` :returns: keys currently in the table .. versionadded:: 2017.0.0 """ keys = [] for entry in self._api.getEntryInfo(self._path, types): relative_key = entry.name[len(self._path) :] if self.PATH_SEPARATOR in relative_key: continue keys.append(relative_key) return keys
[docs] def getSubTables(self) -> List[str]: """:returns: subtables currently in the table .. versionadded:: 2017.0.0 """ keys = set() for entry in self._api.getEntryInfo(self._path, 0): relative_key = entry.name[len(self._path) :] subst = relative_key.split(self.PATH_SEPARATOR) if len(subst) == 1: continue keys.add(subst[0]) return list(keys)
[docs] def setPersistent(self, key: str) -> None: """Makes a key's value persistent through program restarts. :param key: the key to make persistent .. versionadded:: 2017.0.0 """ self.setFlags(key, NT_PERSISTENT)
[docs] def clearPersistent(self, key: str) -> None: """Stop making a key's value persistent through program restarts. :param key: the key name .. versionadded:: 2017.0.0 """ self.clearFlags(key, NT_PERSISTENT)
[docs] def isPersistent(self, key: str) -> bool: """Returns whether the value is persistent through program restarts. :param key: the key name .. versionadded:: 2017.0.0 """ return self.getFlags(key) & NT_PERSISTENT != 0
[docs] def delete(self, key: str) -> None: """Deletes the specified key in this table. :param key: the key name .. versionadded:: 2017.0.0 """ path = self._path + key self._api.deleteEntry(path)
[docs] def setFlags(self, key: str, flags: int) -> None: """Sets entry flags on the specified key in this table. :param key: the key name :param flags: the flags to set (bitmask) :type flags: :class:`.EntryFlags` .. versionadded:: 2017.0.0 """ path = self._path + key self._api.setEntryFlags(path, self._api.getEntryFlags(path) | flags)
[docs] def clearFlags(self, key: str, flags: int) -> None: """Clears entry flags on the specified key in this table. :param key: the key name :param flags: the flags to clear (bitmask) :type flags: :class:`.EntryFlags` .. versionadded:: 2017.0.0 """ path = self._path + key self._api.setEntryFlags(path, self._api.getEntryFlags(path) & ~flags)
[docs] def getFlags(self, key: str): """Returns the entry flags for the specified key. :param key: the key name :returns: the flags, or 0 if the key is not defined :rtype: :class:`.EntryFlags` .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.getEntryFlags(path)
[docs] def putNumber(self, key: str, value: float) -> bool: """Put a number in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type """ path = self._path + key return self._api.setEntryValue(path, Value.makeDouble(value))
[docs] def setDefaultNumber(self, key: str, defaultValue: float) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :type defaultValue: int, float :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue(path, Value.makeDouble(defaultValue))
[docs] def getNumber(self, key: str, defaultValue: float) -> float: """Gets the number associated with the given name. :param key: the key to look up :param defaultValue: the value to be returned if no value is found :returns: the value associated with the given key or the given default value if there is no value associated with the key """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_DOUBLE: return defaultValue return value.value
[docs] def putString(self, key: str, value: str) -> bool: """Put a string in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type """ path = self._path + key return self._api.setEntryValue(path, Value.makeString(value))
[docs] def setDefaultString(self, key: str, defaultValue: str) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue(path, Value.makeString(defaultValue))
[docs] def getString(self, key: str, defaultValue: str) -> str: """Gets the string associated with the given name. If the key does not exist or is of different type, it will return the default value. :param key: the key to look up :param defaultValue: the value to be returned if no value is found :returns: the value associated with the given key or the given default value if there is no value associated with the key """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_STRING: return defaultValue return value.value
[docs] def putBoolean(self, key: str, value: bool) -> bool: """Put a boolean in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type """ path = self._path + key return self._api.setEntryValue(path, Value.makeBoolean(value))
[docs] def setDefaultBoolean(self, key: str, defaultValue: bool) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue(path, Value.makeBoolean(defaultValue))
[docs] def getBoolean(self, key: str, defaultValue: bool) -> bool: """Gets the boolean associated with the given name. If the key does not exist or is of different type, it will return the default value. :param key: the key name :param defaultValue: the default value if no value is found :returns: the key """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_BOOLEAN: return defaultValue return value.value
[docs] def putBooleanArray(self, key: str, value: Sequence[bool]) -> bool: """Put a boolean array in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setEntryValue(path, Value.makeBooleanArray(value))
[docs] def setDefaultBooleanArray(self, key: str, defaultValue: Sequence[bool]) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue( path, Value.makeBooleanArray(defaultValue) )
[docs] def getBooleanArray(self, key: str, defaultValue) -> Sequence[bool]: """Returns the boolean array the key maps to. If the key does not exist or is of different type, it will return the default value. :param key: the key to look up :type key: str :param defaultValue: the value to be returned if no value is found :returns: the value associated with the given key or the given default value if there is no value associated with the key .. versionadded:: 2017.0.0 """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_BOOLEAN_ARRAY: return defaultValue return value.value
[docs] def putNumberArray(self, key: str, value: Sequence[float]) -> bool: """Put a number array in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setEntryValue(path, Value.makeDoubleArray(value))
[docs] def setDefaultNumberArray(self, key: str, defaultValue: Sequence[float]) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue(path, Value.makeDoubleArray(defaultValue))
[docs] def getNumberArray(self, key: str, defaultValue) -> Sequence[float]: """Returns the number array the key maps to. If the key does not exist or is of different type, it will return the default value. :param key: the key to look up :param defaultValue: the value to be returned if no value is found :returns: the value associated with the given key or the given default value if there is no value associated with the key .. versionadded:: 2017.0.0 """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_DOUBLE_ARRAY: return defaultValue return value.value
[docs] def putStringArray(self, key: str, value: Sequence[str]) -> bool: """Put a string array in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setEntryValue(path, Value.makeStringArray(value))
[docs] def setDefaultStringArray(self, key: str, defaultValue: Sequence[str]) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue(path, Value.makeStringArray(defaultValue))
[docs] def getStringArray(self, key: str, defaultValue) -> Sequence[str]: """Returns the string array the key maps to. If the key does not exist or is of different type, it will return the default value. :param key: the key to look up :param defaultValue: the value to be returned if no value is found :returns: the value associated with the given key or the given default value if there is no value associated with the key .. versionadded:: 2017.0.0 """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_STRING_ARRAY: return defaultValue return value.value
[docs] def putRaw(self, key: str, value: bytes) -> bool: """Put a raw value (byte array) in the table :param key: the key to be assigned to :param value: the value that will be assigned :returns: False if the table key already exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setEntryValue(path, Value.makeRaw(value))
[docs] def setDefaultRaw(self, key: str, defaultValue: bytes) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 """ path = self._path + key return self._api.setDefaultEntryValue(path, Value.makeRaw(defaultValue))
[docs] def getRaw(self, key: str, defaultValue: bytes) -> bytes: """Returns the raw value (byte array) the key maps to. If the key does not exist or is of different type, it will return the default value. :param key: the key to look up :param defaultValue: the value to be returned if no value is found :returns: the value associated with the given key or the given default value if there is no value associated with the key .. versionadded:: 2017.0.0 """ path = self._path + key value = self._api.getEntryValue(path) if not value or value.type != NT_RAW: return defaultValue return value.value
[docs] def putValue(self, key: str, value) -> bool: """Put a value in the table, trying to autodetect the NT type of the value. Refer to this table to determine the type mapping: ======= ============================ ================================= PyType NT Type Notes ======= ============================ ================================= bool :attr:`.EntryTypes.BOOLEAN` int :attr:`.EntryTypes.DOUBLE` float :attr:`.EntryTypes.DOUBLE` str :attr:`.EntryTypes.STRING` bytes :attr:`.EntryTypes.RAW` list Error Use `putXXXArray` methods instead tuple Error Use `putXXXArray` methods instead ======= ============================ ================================= :param key: the key to be assigned to :param value: the value that will be assigned :type value: bool, int, float, str, bytes :returns: False if the table key already exists with a different type .. versionadded:: 2017.0.0 """ value = Value.getFactory(value)(value) path = self._path + key return self._api.setEntryValue(path, value)
[docs] def setDefaultValue(self, key: str, defaultValue) -> bool: """If the key doesn't currently exist, then the specified value will be assigned to the key. :param key: the key to be assigned to :param defaultValue: the default value to set if key doesn't exist. :type defaultValue: bool, int, float, str, bytes :returns: False if the table key exists with a different type .. versionadded:: 2017.0.0 .. seealso:: :meth:`.putValue` """ defaultValue = Value.getFactory(defaultValue)(defaultValue) path = self._path + key return self._api.setDefaultEntryValue(path, defaultValue)
[docs] def getValue(self, key: str, defaultValue): """Gets the value associated with a key. This supports all NetworkTables types (unlike :meth:`putValue`). :param key: the key of the value to look up :param defaultValue: The default value to return if the key doesn't exist :type defaultValue: any :returns: the value associated with the given key :rtype: bool, int, float, str, bytes, tuple .. versionadded:: 2017.0.0 """ path = self._path + key value = self._api.getEntryValue(path) if not value: return defaultValue return value.value
[docs] def getAutoUpdateValue( self, key: str, defaultValue, writeDefault: bool = True ) -> NetworkTableEntry: """Returns an object that will be automatically updated when the value is updated by networktables. :param key: the key name :param defaultValue: Default value to use if not in the table :type defaultValue: any :param writeDefault: If True, put the default value to the table, overwriting existing values :rtype: :class:`.NetworkTableEntry` .. note:: If you modify the returned value, the value will NOT be written back to NetworkTables (though now there are functions you can use to write values). See :func:`.ntproperty` if you're looking for that sort of thing. .. seealso:: :func:`.ntproperty` is a better alternative to use .. versionadded:: 2015.1.3 .. versionchanged:: 2018.0.0 This now returns the same as :meth:`.NetworkTable.getEntry` """ return self._inst.getGlobalAutoUpdateValue( self._path + key, defaultValue, writeDefault )