# validated: 2024-01-19 DS e07de37e64f2 ParallelDeadlineGroup.java
from __future__ import annotations
from typing import Dict
from wpiutil import SendableBuilder
from .command import Command, InterruptionBehavior
from .commandscheduler import CommandScheduler
from .exceptions import IllegalCommandUse
from .util import flatten_args_commands
[docs]
class ParallelDeadlineGroup(Command):
"""
A command composition that runs a set of commands in parallel, ending only when a specific
command (the "deadline") ends, interrupting all other commands that are still running at that
point.
The rules for command compositions apply: command instances that are passed to it cannot be
added to any other composition or scheduled individually, and the composition requires all
subsystems its components require.
"""
def __init__(self, deadline: Command, *commands: Command):
"""
Creates a new ParallelDeadlineGroup. The given commands (including the
deadline) will be executed simultaneously. The composition will finish when
the deadline finishes, interrupting all other still-running commands. If
the composition is interrupted, only the commands still running will be
interrupted.
:param deadline: the command that determines when the composition ends
:param commands: the commands to be executed
:raises IllegalCommandUse: if the deadline command is also in the otherCommands argument
"""
super().__init__()
self._commands: Dict[Command, bool] = {}
self._runsWhenDisabled = True
self._finished = True
self._interruptBehavior = InterruptionBehavior.kCancelIncoming
self.addCommands(*commands)
self.setDeadline(deadline)
[docs]
def setDeadline(self, deadline: Command):
"""
Sets the deadline to the given command. The deadline is added to the group if it is not already
contained.
:param deadline: the command that determines when the group ends
:raises IllegalCommandUse: if the deadline command is already in the composition
"""
# use getattr here because deadline not set in constructor
isAlreadyDeadline = deadline == getattr(self, "_deadline", None)
if isAlreadyDeadline:
return
if deadline in self._commands:
raise IllegalCommandUse(
f"The deadline command cannot also be in the other commands!",
deadline=deadline,
)
self.addCommands(deadline)
self._deadline = deadline
[docs]
def addCommands(self, *commands: Command):
"""
Adds the given commands to the group.
:param commands: Commands to add to the group.
:raises IllegalCommandUse: if the deadline command is already in the composition
"""
commands = flatten_args_commands(commands)
if not self._finished:
raise IllegalCommandUse(
"Commands cannot be added to a composition while it is running"
)
CommandScheduler.getInstance().registerComposedCommands(commands)
for command in commands:
in_common = command.getRequirements().intersection(self.requirements)
if in_common:
raise IllegalCommandUse(
"Multiple commands in a parallel composition cannot require the same subsystems.",
common=in_common,
)
self._commands[command] = False
self.requirements.update(command.getRequirements())
self._runsWhenDisabled = (
self._runsWhenDisabled and command.runsWhenDisabled()
)
if command.getInterruptionBehavior() == InterruptionBehavior.kCancelSelf:
self._interruptBehavior = InterruptionBehavior.kCancelSelf
[docs]
def initialize(self):
for command in self._commands:
command.initialize()
self._commands[command] = True
self._finished = False
[docs]
def execute(self):
for command, isRunning in self._commands.items():
if not isRunning:
continue
command.execute()
if command.isFinished():
command.end(False)
self._commands[command] = False
if command == self._deadline:
self._finished = True
[docs]
def end(self, interrupted: bool):
for command, isRunning in self._commands.items():
if not isRunning:
continue
command.end(True)
self._commands[command] = False
[docs]
def isFinished(self) -> bool:
return self._finished
[docs]
def runsWhenDisabled(self) -> bool:
return self._runsWhenDisabled
[docs]
def getInterruptionBehavior(self) -> InterruptionBehavior:
return self._interruptBehavior
[docs]
def initSendable(self, builder: SendableBuilder):
super().initSendable(builder)
builder.addStringProperty(
"deadline", lambda: self._deadline.getName(), lambda _: None
)