# validated: 2016-12-22 JW aafca4ed7fff athena/java/edu/wpi/first/wpilibj/InterruptableSensorBase.java
#----------------------------------------------------------------------------
# Copyright (c) FIRST 2008-2012. All Rights Reserved.
# Open Source Software - may be modified and shared by FRC teams. The code
# must be accompanied by the FIRST BSD license file in the root directory of
# the project.
#----------------------------------------------------------------------------
import hal
import weakref
from .sensorbase import SensorBase
__all__ = ["InterruptableSensorBase"]
[docs]class InterruptableSensorBase(SensorBase):
"""Base for sensors to be used with interrupts"""
def __init__(self):
"""Create a new InterrupatableSensorBase"""
# The interrupt resource
self._interrupt = None
self._interrupt_finalizer = None
# Flags if the interrupt being allocated is synchronous
self.isSynchronousInterrupt = False
[docs] def getAnalogTriggerTypeForRouting(self):
raise NotImplementedError
[docs] def getPortHandleForRouting(self):
raise NotImplementedError
@property
def interrupt(self):
if self._interrupt_finalizer is None:
return None
if not self._interrupt_finalizer.alive:
return None
return self._interrupt
[docs] def requestInterrupts(self, handler=None):
"""Request one of the 8 interrupts asynchronously on this digital
input.
:param handler: (optional)
The function that will be called whenever there is an interrupt
on this device. Request interrupts in synchronous mode where the
user program interrupt handler will be called when an interrupt
occurs. The default is interrupt on rising edges only. If not
specified, the user program will have to explicitly wait for the
interrupt to occur using waitForInterrupt.
"""
if self.interrupt is not None:
raise ValueError("The interrupt has already been allocated")
self.allocateInterrupts(handler is not None)
assert self.interrupt is not None
hal.requestInterrupts(self.interrupt, self.getPortHandleForRouting(),
self.getAnalogTriggerTypeForRouting())
self.setUpSourceEdge(True, False)
if handler is not None:
hal.attachInterruptHandler(self.interrupt, handler)
[docs] def allocateInterrupts(self, watcher):
"""Allocate the interrupt
:param watcher: True if the interrupt should be in synchronous mode
where the user program will have to explicitly wait for the interrupt
to occur.
"""
if self.interrupt is not None:
raise ValueError("The interrupt has already been allocated")
self.isSynchronousInterrupt = watcher
self._interrupt = hal.initializeInterrupts(watcher)
self._interrupt_finalizer = weakref.finalize(self, hal.cleanInterrupts,
self._interrupt)
[docs] def cancelInterrupts(self):
"""Cancel interrupts on this device. This deallocates all the
chipobject structures and disables any interrupts.
"""
if self.interrupt is None:
raise ValueError("The interrupt is not allocated.")
self._interrupt_finalizer()
self.interrupt = None
[docs] def waitForInterrupt(self, timeout, ignorePrevious=True):
"""In synchronous mode, wait for the defined interrupt to occur.
You should **NOT** attempt to read the sensor from another thread
while waiting for an interrupt. This is not threadsafe, and can cause
memory corruption
:param timeout: Timeout in seconds
:param ignorePrevious: If True (default), ignore interrupts that
happened before waitForInterrupt was called.
"""
if self.interrupt is None:
raise ValueError("The interrupt is not allocated.")
return hal.waitForInterrupt(self.interrupt, timeout, ignorePrevious)
[docs] def enableInterrupts(self):
"""Enable interrupts to occur on this input. Interrupts are disabled
when the RequestInterrupt call is made. This gives time to do the
setup of the other options before starting to field interrupts.
"""
if self.interrupt is None:
raise ValueError("The interrupt is not allocated.")
if self.isSynchronousInterrupt:
raise ValueError("You do not need to enable synchronous interrupts")
hal.enableInterrupts(self.interrupt)
[docs] def disableInterrupts(self):
"""Disable Interrupts without without deallocating structures."""
if self.interrupt is None:
raise ValueError("The interrupt is not allocated.")
if self.isSynchronousInterrupt:
raise ValueError("You can not disable synchronous interrupts")
hal.disableInterrupts(self.interrupt)
[docs] def readRisingTimestamp(self):
"""Return the timestamp for the rising interrupt that occurred most
recently. This is in the same time domain as getClock(). The
rising-edge interrupt should be enabled with setUpSourceEdge.
:returns: Timestamp in seconds since boot.
"""
if self.interrupt is None:
raise ValueError("The interrupt is not allocated.")
return hal.readInterruptRisingTimestamp(self.interrupt)
[docs] def readFallingTimestamp(self):
"""Return the timestamp for the falling interrupt that occurred most
recently. This is in the same time domain as getClock(). The
falling-edge interrupt should be enabled with setUpSourceEdge.
:returns: Timestamp in seconds since boot.
"""
if self.interrupt is None:
raise ValueError("The interrupt is not allocated.")
return hal.readInterruptFallingTimestamp(self.interrupt)
[docs] def setUpSourceEdge(self, risingEdge, fallingEdge):
"""Set which edge to trigger interrupts on
:param risingEdge: True to interrupt on rising edge
:param fallingEdge: True to interrupt on falling edge
"""
if self.interrupt is not None:
hal.setInterruptUpSourceEdge(self.interrupt,
1 if risingEdge else 0,
1 if fallingEdge else 0)
else:
raise ValueError("You must call RequestInterrupts before setUpSourceEdge")