"""
Copyright 2017 Purdue University
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Provide classes to deal with different types of cameras.
This module is used to deal with different types of cameras. The module
provides the Camera base class which provides a uniform way of dealing with
all types of cameras. The module provides different subclasses, each for a
different type of cameras (e.g. IP cameras, and non-IP cameras). The module
also provides the StreamFormat Enum for the different camera stream formats.
Examples
--------
Example 1: To get frames from a non-IP camera image stream:
1. Initialize a NonIPCamera object using the ID and the URL of the camera
image stream.
2. Use the get_frame method to get the most recent frame at any point of time,
as well as the frame size. There is no need to call open_stream or close_stream.
::
camera = NonIPCamera(1, 'http://images.webcams.travel/preview/1169307993.jpg')
frame, frame_size = camera.get_frame()
cv2.imshow('frame', frame)
print frame_size
cv2.waitKey()
Example 2: To get frames from an IP camera image stream:
1. Initialize an IPCamera object using the ID, IP, image stream path, and
other optional parameters.
2. Use the get_frame method to get the most recent frame at any point of time,
as well as the frame size. While dealing with image streams of IP cameras,
there is no need to call open_stream or close_stream.
::
camera = IPCamera(1, '128.10.29.33', '/axis-cgi/jpg/image.cgi')
frame, frame_size = camera.get_frame()
cv2.imshow('frame', frame)
print frame_size
cv2.waitKey()
Example 3: To get frames from an IP camera MJPEG stream:
1. Initialize an IPCamera object using the ID, IP, image stream path, MJPEG
stream path, and other optional parameters.
2. Open the camera MJPEG stream by calling the open_stream method with the
StreamFormat.MJPEG parameter.
3. Use the get_frame method to get the most recent frame at any point of time,
as well as the frame size.
4. At the end when no more frames are needed, Close the camera MJPEG stream by
calling the close_stream method.
::
camera = IPCamera(1, '128.10.29.33', '/axis-cgi/jpg/image.cgi',
'/axis-cgi/mjpg/video.cgi')
camera.open_stream(StreamFormat.MJPEG)
t = time.time()
while time.time() - t < 5:
frame, frame_size = camera.get_frame()
cv2.imshow('frame', frame)
print frame_size
cv2.waitKey(30)
camera.close_stream()
"""
from __future__ import absolute_import
from CAM2ImageArchiver.error import ClosedStreamError
from CAM2ImageArchiver.StreamParser import ImageStreamParser, MJPEGStreamParser, MJPGm3u8StreamParser
[docs]class Camera(object):
"""
Represent the base class for all types of cameras.
Parameters
----------
id : int
The unique camera ID.
Attributes
----------
id : int
The unique camera ID.
parser : StreamParser
The parser of the camera stream.
Notes
-----
A camera handles a single stream at a time. Use the open_stream method to
open the stream of the desired format (e.g. MJPEG). Then, use the method
get_frame to get frames from the currently open stream. Then, use the
method close_stream to close the currently open stream. This class does
not allow dealing with multiple streams from the same camera object at
the same time.
"""
def __init__(self, _id):
self.id = _id
self.parser = None
self.last_frame = None
[docs] def open_stream(self, stream_format):
"""
Open the camera stream of the given format.
Parameters
----------
stream_format : int
The stream format of the camera. This can be any of the StreamFormat
class variables (e.g. StreamFormat.IMAGE or StreamFormat.MJPEG)
Raises
------
error.UnreachableCameraError
If the camera is unreachable.
"""
pass
[docs] def close_stream(self):
""" Close the currently open camera stream.
"""
pass
[docs] def restart_stream(self):
"""
Restart the currently open camera stream.
This method restarts the stream by closing then opening it. This is
useful because some cameras closes a stream if it is open for a long
period of time.
"""
self.parser.restart_stream()
[docs] def get_frame(self):
"""
Get the most recent frame from the currently open camera stream.
Returns
-------
numpy.ndarray
The downloaded frame.
int
The size of the downloaded frame in bytes.
Raises
------
error.CorruptedFrameError
If the frame is corrupted.
error.UnreachableCameraError
If the camera is unreachable.
error.ClosedStreamError
If the stream needs to be opened first.
"""
if self.parser is None:
raise ClosedStreamError
return self.parser.get_frame()
[docs]class IPCamera(Camera):
"""
Represent an IP camera.
This class subclasses the Camera class and inherits its attributes and
extends its constructor.
Parameters
----------
id : int
The unique camera id.
ip : str
The IP address of the camera.
image_path : str
The path to the camera image stream.
mjpeg_path : str, optional
The path to the camera MJPEG stream.
port : int, optional
The port of the camera.
Attributes
----------
ip : str
The IP address of the camera.
image_path : str
The path to the camera image stream.
mjpeg_path : str
The path to the camera MJPEG stream.
port : int
The port of the camera.
Notes
-----
By default, the constructor of this class initializes an ImageStreamParser
so that frames can be retrieved from the image stream without the need to
call the open_stream method.
"""
def __init__(self, _id, ip, image_path, mjpeg_path=None, port=None):
super(IPCamera, self).__init__(_id)
self.is_video = 1
self.ip = ip
self.image_path = image_path
self.mjpeg_path = mjpeg_path
self.port = port
# Initializes an ImageStreamParser so that frames can be retrieved from
# the image stream without the need to call the open_stream method.
url = self.get_url()
self.parser = ImageStreamParser(url)
[docs] def open_stream(self, stream_format):
"""
Open the camera stream of the given format.
Parameters
----------
stream_format : int
The stream format of the camera. This can be any of the StreamFormat
class variables (e.g. StreamFormat.IMAGE or StreamFormat.MJPEG)
Raises
------
ValueError
If the value of stream_format is invalid.
error.UnreachableCameraError
If the camera is unreachable.
"""
# Get the URL of the stream of the given format.
url = self.get_url(stream_format)
# Initialize and open the parser according to the stream format.
if stream_format == StreamFormat.MJPEG:
self.parser = MJPEGStreamParser(url)
self.parser.open_stream()
elif stream_format == StreamFormat.IMAGE:
# The image stream parser is always initialized, and the stream
# does not need to be opened.
self.parser = ImageStreamParser(url)
else:
raise ValueError('Invalid Argument: stream_format')
[docs] def close_stream(self):
"""
Close the currently open camera stream.
Notes
-----
After closing the currently open camera stream, this method initializes
an ImageStreamParser so that frames can be retrieved from the image
stream without the need to call the open_stream method.
"""
if self.parser is not None:
self.parser.close_stream()
self.parser = ImageStreamParser(self.get_url())
[docs] def get_url(self, stream_format=StreamFormat.IMAGE):
"""
Get the URL to the camera stream of the given format.
Parameters
----------
stream_format : int, optional
The stream format of the camera. This can be any of the StreamFormat
class variables (e.g. StreamFormat.IMAGE or StreamFormat.MJPEG)
Returns
-------
url : str
The URL to the camera stream of the given format.
Raises
------
ValueError
If the value of stream_format is invalid.
"""
# Set the path according to the stream format.
if stream_format == StreamFormat.IMAGE:
path = self.image_path
elif stream_format == StreamFormat.MJPEG:
path = self.mjpeg_path
else:
raise ValueError('Invalid Argument: stream_format')
# Construct the URL using the IP, port, and path.
if self.port in ('', None):
url = 'http://{}{}'.format(self.ip, path)
else:
if path is None:
url = 'http://{}:{}'.format(
self.ip, self.port)
else:
url = 'http://{}:{}{}'.format(
self.ip, self.port, path)
return url
def __del__(self):
"""
Close the currently open camera stream before destroying the object.
This destructor is a backup plan in case the user of this class did not
call the close_stream method. The close_stream method has to be called,
without relying on this destructor, because __del__ is not guaranteed
to be called in some cases and it is also better to close the stream as
soon as possible to avoid unnecessary network workload.
"""
self.close_stream()
[docs]class NonIPCamera(Camera):
"""
Represent a non-IP camera.
This class represents a camera whose IP is not known. A web server hides
the information about the camera, and provides only a URL to get the most
recent frame from the camera. This class subclasses the Camera class and
inherits its attributes and extends its constructor.
Parameters
----------
id : int
The unique camera ID.
url : str
The URL that is used to get the most recent frame from the camera.
Attributes
----------
url : str
The URL that is used to get the most recent frame from the camera.
Notes
-----
By default, the constructor of this class initializes an ImageStreamParser
so that frames can be retrieved from the image stream without the need to
call the open_stream method.
"""
def __init__(self, _id, url):
super(NonIPCamera, self).__init__(_id)
self.is_video = 0
self.url = url
self.parser = ImageStreamParser(url)
[docs] def get_url(self):
return self.url
[docs]class StreamCamera(Camera):
"""
Represent a Stream camera similar to a non-ip camera, but faster frame rates.
This class represents a camera whose IP is not known. A web server hides
the information about the camera, and provides only a URL to get the most
recent frame from the camera. The This class subclasses the Camera class and
inherits its attributes and extends its constructor.
Parameters
----------
id : int
The unique camera ID.
url : str
The URL that is used to get the most recent frame from the camera.
Attributes
----------
url : str
The URL that is used to get the most recent frame from the camera.
"""
def __init__(self, _id, url):
super(StreamCamera, self).__init__(_id)
self.is_video = 0
self.url = url
self.parser = MJPGm3u8StreamParser(url)