Source code for cscore.imagewriter

import os.path
import time
import threading

import cv2
import numpy as np

import logging

logger = logging.getLogger("cscore.storage")


[docs] class ImageWriter: """ Creates a thread that periodically writes images to a specified directory. Useful for looking at images after a match has completed. The default location is ``/media/sda1/camera``. The folder ``/media/sda1`` is the default location that USB drives inserted into the RoboRIO are mounted at. The USB drive must have a directory in it named ``camera``. .. note:: It is recommended to only write images when something useful (such as targeting) is happening, otherwise you'll end up with a lot of images written to disk that you probably aren't interested in. Intended usage is:: self.image_writer = ImageWriter() .. while True: img = .. if self.logging_enabled: self.image_writer.setImage(img) """ def __init__( self, *, location_root="/media/sda1/camera", capture_period=0.5, image_format="jpg" ): """ :param location_root: Directory to write images to. A subdirectory with the current time will be created, and timestamped images will be written to the subdirectory. :param capture_period: How often to write images to disk :param image_format: File extension of files to write """ self.location_root = os.path.abspath(location_root) self.capture_period = capture_period self.image_format = image_format self.active = True self._location = None self.has_image = False self.size = None self.lock = threading.Condition() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start()
[docs] def setImage(self, img): """ Call this function when you wish to write the image to disk. Not every image is written to disk. Makes a copy of the image. :param img: A numpy array representing an OpenCV image """ if not self.active: return if ( self.size is None or self.size[0] != img.shape[0] or self.size[1] != img.shape[1] ): h, w = img.shape[:2] self.size = (h, w) self.out1 = np.empty((h, w, 3), dtype=np.uint8) self.out2 = np.empty((h, w, 3), dtype=np.uint8) with self.lock: cv2.copyMakeBorder( img, 0, 0, 0, 0, cv2.BORDER_CONSTANT, value=(0, 0, 255), dst=self.out1 ) self.has_image = True self.lock.notify()
@property def location(self): if self._location is None: # This assures that we only log when a USB memory stick is plugged in if not os.path.exists(self.location_root): raise IOError( "Logging disabled, %s does not exist" % self.location_root ) # Can't do this when program starts, time might be wrong. Ideally by now the DS # has connected, so the time will be correct self._location = self.location_root + "/%s" % time.strftime( "%Y-%m-%d %H.%M.%S" ) logger.info("Logging to %s", self._location) os.makedirs(self._location, exist_ok=True) return self._location def _run(self): last = time.time() logger.info("Storage thread started") try: while True: with self.lock: now = time.time() while (not self.has_image) or (now - last) < self.capture_period: self.lock.wait() now = time.time() self.out2, self.out1 = self.out1, self.out2 self.has_image = False fname = "%s/%.2f.%s" % (self.location, now, self.image_format) cv2.imwrite(fname, self.out2) last = now except IOError as e: logger.error("Error logging images: %s", e) logger.warn("Storage thread exited") self.active = False