# validated: 2024-01-20 DS 1144115da01f Subsystem.java
# Don't import stuff from the package here, it'll cause a circular import
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Optional
from typing_extensions import Self
if TYPE_CHECKING:
from .command import Command
from .commandscheduler import CommandScheduler
from wpiutil import Sendable, SendableBuilder, SendableRegistry
[docs]
class Subsystem(Sendable):
"""
A robot subsystem. Subsystems are the basic unit of robot organization in the Command-based
framework; they encapsulate low-level hardware objects (motor controllers, sensors, etc.) and
provide methods through which they can be used by Commands. Subsystems are used by the
CommandScheduler's resource management system to ensure multiple robot actions are not
"fighting" over the same hardware; Commands that use a subsystem should include that subsystem in
their :func:`commands2.Command.getRequirements` method, and resources used within a subsystem should
generally remain encapsulated and not be shared by other parts of the robot.
Subsystems must be registered with the scheduler with the :func:`commands2.CommandScheduler.registerSubsystem`
method in order for the :func:`.periodic` method to be called. It is recommended that this method be called from the
constructor of users' Subsystem implementations.
"""
def __new__(cls, *arg, **kwargs) -> Self:
instance = super().__new__(cls)
super().__init__(instance)
SendableRegistry.addLW(instance, cls.__name__, cls.__name__)
# add to the scheduler
from .commandscheduler import CommandScheduler
CommandScheduler.getInstance().registerSubsystem(instance)
return instance
def __init__(self) -> None:
pass
[docs]
def periodic(self) -> None:
"""
This method is called periodically by the CommandScheduler. Useful for updating
subsystem-specific state that you don't want to offload to a Command. Teams should try
to be consistent within their own codebases about which responsibilities will be handled by
Commands, and which will be handled here.
"""
pass
[docs]
def simulationPeriodic(self) -> None:
"""
This method is called periodically by the CommandScheduler. Useful for updating
subsystem-specific state that needs to be maintained for simulations, such as for updating simulation classes and setting simulated sensor readings.
"""
pass
[docs]
def setDefaultCommand(self, command: Command) -> None:
"""
Sets the default Command of the subsystem. The default command will be automatically
scheduled when no other commands are scheduled that require the subsystem. Default commands
should generally not end on their own, i.e. their :func:`commands2.Command.isFinished` method should
always return false. Will automatically register this subsystem with the CommandScheduler.
:param defaultCommand: the default command to associate with this subsystem
"""
from .commandscheduler import CommandScheduler
CommandScheduler.getInstance().setDefaultCommand(self, command)
[docs]
def removeDefaultCommand(self) -> None:
"""
Removes the default command for the subsystem. This will not cancel the default command if it
is currently running.
"""
CommandScheduler.getInstance().removeDefaultCommand(self)
[docs]
def getDefaultCommand(self) -> Optional[Command]:
"""
Gets the default command for this subsystem. Returns None if no default command is currently
associated with the subsystem.
:returns: the default command associated with this subsystem
"""
from .commandscheduler import CommandScheduler
return CommandScheduler.getInstance().getDefaultCommand(self)
[docs]
def getCurrentCommand(self) -> Optional[Command]:
"""
Returns the command currently running on this subsystem. Returns None if no command is
currently scheduled that requires this subsystem.
:returns: the scheduled command currently requiring this subsystem
"""
from .commandscheduler import CommandScheduler
return CommandScheduler.getInstance().requiring(self)
[docs]
def register(self):
"""
Registers this subsystem with the :class:`.CommandScheduler`, allowing its
:func:`.periodic` method to be called when the scheduler runs.
"""
from .commandscheduler import CommandScheduler
return CommandScheduler.getInstance().registerSubsystem(self)
[docs]
def runOnce(self, action: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action once and finishes. Requires this subsystem.
:param action: the action to run
:return: the command
"""
from .cmd import runOnce
return runOnce(action, self)
[docs]
def run(self, action: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action every iteration until interrupted. Requires this
subsystem.
:param action: the action to run
:returns: the command"""
from .cmd import run
return run(action, self)
[docs]
def startEnd(self, start: Callable[[], None], end: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action once and another action when the command is
interrupted. Requires this subsystem.
:param start: the action to run on start
:param end: the action to run on interrupt
:returns: the command
"""
from .cmd import startEnd
return startEnd(start, end, self)
[docs]
def runEnd(self, run: Callable[[], None], end: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action every iteration until interrupted, and then runs a
second action. Requires this subsystem.
:param run: the action to run every iteration
:param end: the action to run on interrupt
:returns: the command
"""
from .cmd import runEnd
return runEnd(run, end, self)
[docs]
def startRun(self, start: Callable[[], None], run: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action once and another action every iteration until interrupted. Requires this subsystem.
:param start: the action to run on start
:param run: the action to run every iteration
:returns: the command
"""
from .cmd import startRun
return startRun(start, run, self)
#
# From SubsystemBase
#
[docs]
def getName(self) -> str:
"""
Gets the name of this Subsystem.
:returns: Name
"""
return SendableRegistry.getName(self)
[docs]
def setName(self, name: str) -> None:
"""
Set the name of this Subsystem.
"""
SendableRegistry.setName(self, name)
[docs]
def getSubsystem(self) -> str:
"""
Gets the subsystem name of this Subsystem.
:returns: Subsystem name
"""
return SendableRegistry.getSubsystem(self)
[docs]
def setSubsystem(self, subsystem: str):
"""
Sets the subsystem name of this Subsystem.
:param subsystem: subsystem name
"""
SendableRegistry.setSubsystem(self, subsystem)
[docs]
def addChild(self, name: str, child: Sendable) -> None:
"""
Associates a :class:`wpiutil.Sendable` with this Subsystem. Also update the child's name.
:param name: name to give child
:param child: sendable
"""
SendableRegistry.addLW(child, self.getSubsystem(), name)
[docs]
def initSendable(self, builder: SendableBuilder) -> None:
builder.setSmartDashboardType("Subsystem")
builder.addBooleanProperty(
".hasDefault", lambda: self.getDefaultCommand() is not None, lambda _: None
)
def get_default():
command = self.getDefaultCommand()
if command is not None:
return command.getName()
return "none"
builder.addStringProperty(".default", get_default, lambda _: None)
builder.addBooleanProperty(
".hasCommand", lambda: self.getCurrentCommand() is not None, lambda _: None
)
def get_current():
command = self.getCurrentCommand()
if command is not None:
return command.getName()
return "none"
builder.addStringProperty(".command", get_current, lambda _: None)