# validated: 2024-01-20 DS f29a7d2e501b Command.java
# Don't import stuff from the package here, it'll cause a circular import
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Set, Union
from typing_extensions import Self, TypeAlias
if TYPE_CHECKING:
from .instantcommand import InstantCommand
from .subsystem import Subsystem
from .parallelracegroup import ParallelRaceGroup
from .sequentialcommandgroup import SequentialCommandGroup
from .paralleldeadlinegroup import ParallelDeadlineGroup
from .parallelcommandgroup import ParallelCommandGroup
from .repeatcommand import RepeatCommand
from .proxycommand import ProxyCommand
from .conditionalcommand import ConditionalCommand
from .wrappercommand import WrapperCommand
from wpiutil import Sendable, SendableRegistry, SendableBuilder
[docs]
class InterruptionBehavior(Enum):
"""
An enum describing the command's behavior when another command with a shared requirement is
scheduled.
"""
kCancelIncoming = 0
"""
This command ends, :func:`commands2.Command.end` is called, and the incoming command is
scheduled normally.
This is the default behavior.
"""
kCancelSelf = 1
""" This command continues, and the incoming command is not scheduled."""
[docs]
class Command(Sendable):
"""
A state machine representing a complete action to be performed by the robot. Commands are run by
the :class:`commands2.CommandScheduler`, and can be composed into CommandGroups to allow users to build
complicated multistep actions without the need to roll the state machine logic themselves.
Commands are run synchronously from the main robot loop; no multithreading is used, unless
specified explicitly from the command implementation.
"""
InterruptionBehavior: TypeAlias = (
InterruptionBehavior # type alias for 2023 location
)
requirements: Set[Subsystem]
def __new__(cls, *args, **kwargs) -> Self:
instance = super().__new__(
cls,
)
super().__init__(instance)
SendableRegistry.add(instance, cls.__name__)
instance.requirements = set()
return instance
def __init__(self):
pass
[docs]
def initialize(self):
"""The initial subroutine of a command. Called once when the command is initially scheduled."""
pass
[docs]
def execute(self):
"""The main body of a command. Called repeatedly while the command is scheduled."""
pass
[docs]
def end(self, interrupted: bool):
"""
The action to take when the command ends. Called when either the command finishes normally, or
when it interrupted/canceled.
Do not schedule commands here that share requirements with this command. Use :meth:`.andThen` instead.
:param interrupted: whether the command was interrupted/canceled
"""
pass
[docs]
def isFinished(self) -> bool:
"""
Whether the command has finished. Once a command finishes, the scheduler will call its :meth:`commands2.Command.end`
method and un-schedule it.
:returns: whether the command has finished.
"""
return False
[docs]
def getRequirements(self) -> Set[Subsystem]:
"""
Specifies the set of subsystems used by this command. Two commands cannot use the same
subsystem at the same time. If another command is scheduled that shares a requirement, :meth:`.getInterruptionBehavior` will be checked and followed. If no subsystems are required, return
an empty set.
.. note:: it is recommended that user implementations contain the requirements as a field, and
return that field here, rather than allocating a new set every time this is called.
:returns: the set of subsystems that are required
"""
return self.requirements
[docs]
def addRequirements(self, *requirements: Subsystem):
"""
Adds the specified subsystems to the requirements of the command. The scheduler will prevent
two commands that require the same subsystem from being scheduled simultaneously.
.. note:: The scheduler determines the requirements of a command when it is scheduled, so
this method should normally be called from the command's constructor.
:param requirements: the requirements to add
"""
self.requirements.update(requirements)
[docs]
def getName(self) -> str:
"""
Gets the name of this Command.
:returns: Name
"""
return SendableRegistry.getName(self)
[docs]
def setName(self, name: str):
"""
Sets the name of this Command.
:param name: Name
"""
SendableRegistry.setName(self, name)
[docs]
def getSubsystem(self) -> str:
"""
Gets the subsystem name of this Subsystem.
:returns: Subsystem name
"""
return SendableRegistry.getSubsystem(self)
[docs]
def setSubsystem(self, subsystem: str):
"""
Sets the subsystem name of this Subsystem.
:param subsystem: subsystem name
"""
SendableRegistry.setSubsystem(self, subsystem)
[docs]
def withTimeout(self, seconds: float) -> ParallelRaceGroup:
"""
Decorates this command with a timeout. If the specified timeout is exceeded before the command
finishes normally, the command will be interrupted and un-scheduled. Note that the timeout only
applies to the command returned by this method; the calling command is not itself changed.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param seconds: the timeout duration
:returns: the command with the timeout added
"""
from .waitcommand import WaitCommand
return self.raceWith(WaitCommand(seconds))
[docs]
def until(self, condition: Callable[[], bool]) -> ParallelRaceGroup:
"""
Decorates this command with an interrupt condition. If the specified condition becomes true
before the command finishes normally, the command will be interrupted and un-scheduled.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param condition: the interrupt condition
:returns: the command with the interrupt condition added
"""
from .waituntilcommand import WaitUntilCommand
return self.raceWith(WaitUntilCommand(condition))
[docs]
def onlyWhile(self, condition: Callable[[], bool]) -> ParallelRaceGroup:
"""
Decorates this command with a run condition. If the specified condition becomes false before
the command finishes normally, the command will be interrupted and un-scheduled.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param condition: the interrupt condition
:returns: the command with the interrupt condition added
"""
assert callable(condition)
return self.until(lambda: not condition())
[docs]
def beforeStarting(
self, before: Union[Command, Callable[[], None]]
) -> SequentialCommandGroup:
"""
Decorates this command with a callable or command to run before this command starts.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param before: the command to run before this one
:returns: the decorated command
"""
from .instantcommand import InstantCommand
from .sequentialcommandgroup import SequentialCommandGroup
if callable(before):
before = InstantCommand(before)
return SequentialCommandGroup(before, self)
[docs]
def andThen(self, *next: Command) -> SequentialCommandGroup:
"""
Decorates this command with a set of commands to run after it in sequence. Often more
convenient/less-verbose than constructing a new SequentialCommandGroup explicitly.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param next: the commands to run next
:returns: the decorated command
"""
from .sequentialcommandgroup import SequentialCommandGroup
return SequentialCommandGroup(self, *next)
[docs]
def deadlineWith(self, *parallel: Command) -> ParallelDeadlineGroup:
"""
Decorates this command with a set of commands to run parallel to it, ending when the calling
command ends and interrupting all the others. Often more convenient/less-verbose than
constructing a new ParallelDeadlineGroup explicitly.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param parallel: the commands to run in parallel
:returns: the decorated command
"""
import warnings
warnings.warn(
"deadlineWith is deprecated use deadlineFor instead",
DeprecationWarning,
stacklevel=2,
)
from .paralleldeadlinegroup import ParallelDeadlineGroup
return ParallelDeadlineGroup(self, *parallel)
[docs]
def deadlineFor(self, *parallel: Command) -> ParallelDeadlineGroup:
"""
Decorates this command with a set of commands to run parallel to it, ending when the calling
command ends and interrupting all the others. Often more convenient/less-verbose than
constructing a new ParallelDeadlineGroup explicitly.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param parallel: the commands to run in parallel
:returns: the decorated command
"""
from .paralleldeadlinegroup import ParallelDeadlineGroup
return ParallelDeadlineGroup(self, *parallel)
[docs]
def alongWith(self, *parallel: Command) -> ParallelCommandGroup:
"""
Decorates this command with a set of commands to run parallel to it, ending when the last
command ends. Often more convenient/less-verbose than constructing a new {@link
ParallelCommandGroup} explicitly.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param parallel: the commands to run in parallel
:returns: the decorated command
"""
from .parallelcommandgroup import ParallelCommandGroup
return ParallelCommandGroup(self, *parallel)
[docs]
def raceWith(self, *parallel: Command) -> ParallelRaceGroup:
"""
Decorates this command with a set of commands to run parallel to it, ending when the first
command ends. Often more convenient/less-verbose than constructing a new {@link
ParallelRaceGroup} explicitly.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param parallel: the commands to run in parallel
:returns: the decorated command
"""
from .parallelracegroup import ParallelRaceGroup
return ParallelRaceGroup(self, *parallel)
[docs]
def repeatedly(self) -> RepeatCommand:
"""
Decorates this command to run repeatedly, restarting it when it ends, until this command is
interrupted. The decorated command can still be canceled.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:returns: the decorated command
"""
from .repeatcommand import RepeatCommand
return RepeatCommand(self)
[docs]
def asProxy(self) -> ProxyCommand:
"""
Decorates this command to run "by proxy" by wrapping it in a ProxyCommand. Use this for
"forking off" from command compositions when the user does not wish to extend the command's
requirements to the entire command composition. ProxyCommand has unique implications and
semantics, see the `WPILib docs <https://docs.wpilib.org/en/stable/docs/software/commandbased/command-compositions.html#scheduling-other-commands>`_ for a full explanation.
:returns: the decorated command
"""
from .proxycommand import ProxyCommand
return ProxyCommand(self)
[docs]
def fork(self, *commands: Command) -> ProxyCommand:
"""
Decorates this command to run "forked" by wrapping it in a ScheduleCommand. Use this for
"forking off" from command compositions when the user does not wish to extend the command's
requirements to the entire command composition. Note that if run from a composition, the
composition will not know about the status of the scheduled commands, and will treat this
command as finishing instantly. Commands can be added to this and will be scheduled in order
with this command scheduled first., see the `WPILib docs <https://docs.wpilib.org/en/stable/docs/software/commandbased/command-compositions.html#scheduling-other-commands>`_ for a full explanation.
:param other: other commands to schedule along with this one. This command is scheduled first.
:returns: the decorated command
"""
from .schedulecommand import ScheduleCommand
return ScheduleCommand(self, [self] + commands)
[docs]
def unless(self, condition: Callable[[], bool]) -> ConditionalCommand:
"""
Decorates this command to only run if this condition is not met. If the command is already
running and the condition changes to true, the command will not stop running. The requirements
of this command will be kept for the new conditional command.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param condition: the condition that will prevent the command from running
:returns: the decorated command
"""
from .conditionalcommand import ConditionalCommand
from .instantcommand import InstantCommand
return ConditionalCommand(InstantCommand(), self, condition)
[docs]
def onlyIf(self, condition: Callable[[], bool]) -> ConditionalCommand:
"""
Decorates this command to only run if this condition is met. If the command is already running
and the condition changes to false, the command will not stop running. The requirements of this
command will be kept for the new conditional command.
.. note:: This decorator works by adding this command to a composition.
The command the decorator was called on cannot be scheduled
independently or be added to a different composition (namely,
decorators), unless it is manually cleared from the list of composed
commands with :func:`commands2.CommandScheduler.removeComposedCommand`.
The command composition returned from this method can be further
decorated without issue.
:param condition: the condition that will allow the command to run
:returns: the decorated command
"""
return self.unless(lambda: not condition())
[docs]
def ignoringDisable(self, doesRunWhenDisabled: bool) -> WrapperCommand:
"""
Decorates this command to run or stop when disabled.
:param doesRunWhenDisabled: true to run when disabled.
:returns: the decorated command
"""
from .wrappercommand import WrapperCommand
class W(WrapperCommand):
def runsWhenDisabled(self) -> bool:
return doesRunWhenDisabled
return W(self)
[docs]
def withInterruptBehavior(self, behavior: InterruptionBehavior) -> WrapperCommand:
"""
Decorates this command to have a different InterruptionBehavior interruption behavior.
:param interruptBehavior: the desired interrupt behavior
:returns: the decorated command
"""
from .wrappercommand import WrapperCommand
class W(WrapperCommand):
def getInterruptionBehavior(self) -> InterruptionBehavior:
return behavior
return W(self)
[docs]
def finallyDo(self, end: Callable[[bool], Any]) -> WrapperCommand:
"""
Decorates this command with a lambda to call on interrupt or end, following the command's
inherent :func:`commands2.Command.end` method.
:param end: a lambda accepting a boolean parameter specifying whether the command was
interrupted.
:return: the decorated command
"""
from .wrappercommand import WrapperCommand
class W(WrapperCommand):
def end(self, interrupted: bool) -> None:
self._command.end(interrupted)
end(interrupted)
return W(self)
[docs]
def handleInterrupt(self, handler: Callable[[], Any]) -> WrapperCommand:
"""
Decorates this command with a lambda to call on interrupt, following the command's inherent
:func:`commands2.Command.end` method.
:param handler: a lambda to run when the command is interrupted
:returns: the decorated command
"""
return self.finallyDo(lambda interrupted: handler() if interrupted else None)
[docs]
def schedule(self) -> None:
"""Schedules this command."""
from .commandscheduler import CommandScheduler
CommandScheduler.getInstance().schedule(self)
[docs]
def cancel(self) -> None:
"""
Cancels this command. Will call ``end(true)``. Commands will be canceled
regardless of InterruptionBehavior interruption behavior.
"""
from .commandscheduler import CommandScheduler
CommandScheduler.getInstance().cancel(self)
[docs]
def isScheduled(self) -> bool:
"""
Whether the command is currently scheduled. Note that this does not detect whether the command
is in a composition, only whether it is directly being run by the scheduler.
:returns: Whether the command is scheduled.
"""
from .commandscheduler import CommandScheduler
return CommandScheduler.getInstance().isScheduled(self)
[docs]
def hasRequirement(self, requirement: Subsystem) -> bool:
"""
Whether the command requires a given subsystem.
:param requirement: the subsystem to inquire about
:returns: whether the subsystem is required
"""
return requirement in self.requirements
[docs]
def getInterruptionBehavior(self) -> InterruptionBehavior:
"""
How the command behaves when another command with a shared requirement is scheduled.
:returns: a variant of InterruptionBehavior, defaulting to {@link InterruptionBehavior#kCancelSelf kCancelSelf}.
"""
return InterruptionBehavior.kCancelSelf
[docs]
def runsWhenDisabled(self) -> bool:
"""
Whether the given command should run when the robot is disabled. Override to return true if the
command should run when disabled.
:returns: whether the command should run when the robot is disabled
"""
return False
[docs]
def withName(self, name: str) -> WrapperCommand:
"""
Decorates this command with a name.
:param name: the name of the command
:returns: the decorated command
"""
from .wrappercommand import WrapperCommand
wrapper = WrapperCommand(self)
wrapper.setName(name)
return wrapper
[docs]
def initSendable(self, builder: SendableBuilder) -> None:
from .commandscheduler import CommandScheduler
builder.setSmartDashboardType("Command")
builder.addStringProperty(
".name",
self.getName,
lambda _: None,
)
def on_set(value: bool) -> None:
if value:
if not self.isScheduled():
self.schedule()
else:
if self.isScheduled():
self.cancel()
builder.addBooleanProperty(
"running",
self.isScheduled,
on_set,
)
builder.addBooleanProperty(
".isParented",
lambda: CommandScheduler.getInstance().isComposed(self),
lambda _: None,
)
builder.addStringProperty(
"interruptBehavior",
lambda: self.getInterruptionBehavior().name,
lambda _: None,
)
builder.addBooleanProperty(
"runsWhenDisabled",
self.runsWhenDisabled,
lambda _: None,
)