Source code for wpilib.interruptablesensorbase

# validated: 2016-12-22 JW aafca4ed7fff athena/java/edu/wpi/first/wpilibj/InterruptableSensorBase.java
#----------------------------------------------------------------------------
# Copyright (c) FIRST 2008-2012. All Rights Reserved.
# Open Source Software - may be modified and shared by FRC teams. The code
# must be accompanied by the FIRST BSD license file in the root directory of
# the project.
#----------------------------------------------------------------------------

import hal
import weakref

from .sensorbase import SensorBase

__all__ = ["InterruptableSensorBase"]

[docs]class InterruptableSensorBase(SensorBase): """Base for sensors to be used with interrupts""" def __init__(self): """Create a new InterrupatableSensorBase""" # The interrupt resource self._interrupt = None self._interrupt_finalizer = None # Flags if the interrupt being allocated is synchronous self.isSynchronousInterrupt = False
[docs] def getAnalogTriggerTypeForRouting(self): raise NotImplementedError
[docs] def getPortHandleForRouting(self): raise NotImplementedError
@property def interrupt(self): if self._interrupt_finalizer is None: return None if not self._interrupt_finalizer.alive: return None return self._interrupt
[docs] def requestInterrupts(self, handler=None): """Request one of the 8 interrupts asynchronously on this digital input. :param handler: (optional) The function that will be called whenever there is an interrupt on this device. Request interrupts in synchronous mode where the user program interrupt handler will be called when an interrupt occurs. The default is interrupt on rising edges only. If not specified, the user program will have to explicitly wait for the interrupt to occur using waitForInterrupt. """ if self.interrupt is not None: raise ValueError("The interrupt has already been allocated") self.allocateInterrupts(handler is not None) assert self.interrupt is not None hal.requestInterrupts(self.interrupt, self.getPortHandleForRouting(), self.getAnalogTriggerTypeForRouting()) self.setUpSourceEdge(True, False) if handler is not None: hal.attachInterruptHandler(self.interrupt, handler)
[docs] def allocateInterrupts(self, watcher): """Allocate the interrupt :param watcher: True if the interrupt should be in synchronous mode where the user program will have to explicitly wait for the interrupt to occur. """ if self.interrupt is not None: raise ValueError("The interrupt has already been allocated") self.isSynchronousInterrupt = watcher self._interrupt = hal.initializeInterrupts(watcher) self._interrupt_finalizer = weakref.finalize(self, hal.cleanInterrupts, self._interrupt)
[docs] def cancelInterrupts(self): """Cancel interrupts on this device. This deallocates all the chipobject structures and disables any interrupts. """ if self.interrupt is None: raise ValueError("The interrupt is not allocated.") self._interrupt_finalizer() self.interrupt = None
[docs] def waitForInterrupt(self, timeout, ignorePrevious=True): """In synchronous mode, wait for the defined interrupt to occur. You should **NOT** attempt to read the sensor from another thread while waiting for an interrupt. This is not threadsafe, and can cause memory corruption :param timeout: Timeout in seconds :param ignorePrevious: If True (default), ignore interrupts that happened before waitForInterrupt was called. """ if self.interrupt is None: raise ValueError("The interrupt is not allocated.") return hal.waitForInterrupt(self.interrupt, timeout, ignorePrevious)
[docs] def enableInterrupts(self): """Enable interrupts to occur on this input. Interrupts are disabled when the RequestInterrupt call is made. This gives time to do the setup of the other options before starting to field interrupts. """ if self.interrupt is None: raise ValueError("The interrupt is not allocated.") if self.isSynchronousInterrupt: raise ValueError("You do not need to enable synchronous interrupts") hal.enableInterrupts(self.interrupt)
[docs] def disableInterrupts(self): """Disable Interrupts without without deallocating structures.""" if self.interrupt is None: raise ValueError("The interrupt is not allocated.") if self.isSynchronousInterrupt: raise ValueError("You can not disable synchronous interrupts") hal.disableInterrupts(self.interrupt)
[docs] def readRisingTimestamp(self): """Return the timestamp for the rising interrupt that occurred most recently. This is in the same time domain as getClock(). The rising-edge interrupt should be enabled with setUpSourceEdge. :returns: Timestamp in seconds since boot. """ if self.interrupt is None: raise ValueError("The interrupt is not allocated.") return hal.readInterruptRisingTimestamp(self.interrupt)
[docs] def readFallingTimestamp(self): """Return the timestamp for the falling interrupt that occurred most recently. This is in the same time domain as getClock(). The falling-edge interrupt should be enabled with setUpSourceEdge. :returns: Timestamp in seconds since boot. """ if self.interrupt is None: raise ValueError("The interrupt is not allocated.") return hal.readInterruptFallingTimestamp(self.interrupt)
[docs] def setUpSourceEdge(self, risingEdge, fallingEdge): """Set which edge to trigger interrupts on :param risingEdge: True to interrupt on rising edge :param fallingEdge: True to interrupt on falling edge """ if self.interrupt is not None: hal.setInterruptUpSourceEdge(self.interrupt, 1 if risingEdge else 0, 1 if fallingEdge else 0) else: raise ValueError("You must call RequestInterrupts before setUpSourceEdge")