Source code for commands2.sequentialcommandgroup

# validated: 2024-01-19 DS aaea85ff1656 SequentialCommandGroup.java
from __future__ import annotations

from typing import List

from wpiutil import SendableBuilder

from .command import Command, InterruptionBehavior
from .commandscheduler import CommandScheduler
from .exceptions import IllegalCommandUse
from .util import flatten_args_commands


[docs] class SequentialCommandGroup(Command): """ A command composition that runs a list of commands in sequence. The rules for command compositions apply: command instances that are passed to it cannot be added to any other composition or scheduled individually, and the composition requires all subsystems its components require. """ def __init__(self, *commands: Command): """ Creates a new SequentialCommandGroup. The given commands will be run sequentially, with the composition finishing when the last command finishes. :param commands: the commands to include in this composition. """ super().__init__() self._commands: List[Command] = [] self._currentCommandIndex = -1 self._runsWhenDisabled = True self._interruptBehavior = InterruptionBehavior.kCancelIncoming self.addCommands(*commands)
[docs] def addCommands(self, *commands: Command): """ Adds the given commands to the group. :param commands: Commands to add to the group. """ commands = flatten_args_commands(commands) if self._currentCommandIndex != -1: raise IllegalCommandUse( "Commands cannot be added to a composition while it is running" ) CommandScheduler.getInstance().registerComposedCommands(commands) for command in commands: self._commands.append(command) self.requirements.update(command.getRequirements()) self._runsWhenDisabled = ( self._runsWhenDisabled and command.runsWhenDisabled() ) if command.getInterruptionBehavior() == InterruptionBehavior.kCancelSelf: self._interruptBehavior = InterruptionBehavior.kCancelSelf
[docs] def initialize(self): self._currentCommandIndex = 0 if self._commands: self._commands[0].initialize()
[docs] def execute(self): if not self._commands: return currentCommand = self._commands[self._currentCommandIndex] currentCommand.execute() if currentCommand.isFinished(): currentCommand.end(False) self._currentCommandIndex += 1 if self._currentCommandIndex < len(self._commands): self._commands[self._currentCommandIndex].initialize()
[docs] def end(self, interrupted: bool): if ( interrupted and self._commands and -1 < self._currentCommandIndex < len(self._commands) ): self._commands[self._currentCommandIndex].end(True) self._currentCommandIndex = -1
[docs] def isFinished(self) -> bool: return self._currentCommandIndex == len(self._commands)
[docs] def runsWhenDisabled(self) -> bool: return self._runsWhenDisabled
[docs] def getInterruptionBehavior(self) -> InterruptionBehavior: return self._interruptBehavior
[docs] def initSendable(self, builder: SendableBuilder) -> None: super().initSendable(builder) builder.addIntegerProperty( "index", lambda: self._currentCommandIndex, lambda _: None )