#!/usr/bin/env python
# coding=utf-8
# aeneas is a Python/C library and a set of tools
# to automagically synchronize audio and text (aka forced alignment)
#
# Copyright (C) 2012-2013, Alberto Pettarin (www.albertopettarin.it)
# Copyright (C) 2013-2015, ReadBeyond Srl (www.readbeyond.it)
# Copyright (C) 2015-2017, Alberto Pettarin (www.albertopettarin.it)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module contains the following classes:
* :class:`~aeneas.audiofile.AudioFile`, representing an audio file;
* :class:`~aeneas.audiofile.AudioFileConverterError`,
* :class:`~aeneas.audiofile.AudioFileNotInitializedError`,
* :class:`~aeneas.audiofile.AudioFileProbeError`, and
* :class:`~aeneas.audiofile.AudioFileUnsupportedFormatError`,
representing errors generated by audio files.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
from aeneas.exacttiming import TimeValue
from aeneas.ffmpegwrapper import FFMPEGPathError
from aeneas.ffmpegwrapper import FFMPEGWrapper
from aeneas.ffprobewrapper import FFPROBEParsingError
from aeneas.ffprobewrapper import FFPROBEPathError
from aeneas.ffprobewrapper import FFPROBEUnsupportedFormatError
from aeneas.ffprobewrapper import FFPROBEWrapper
from aeneas.logger import Loggable
from aeneas.runtimeconfiguration import RuntimeConfiguration
from aeneas.wavfile import read as scipywavread
from aeneas.wavfile import write as scipywavwrite
import aeneas.globalfunctions as gf
[docs]class AudioFileConverterError(Exception):
"""
Error raised when the audio converter executable cannot be executed.
"""
pass
[docs]class AudioFileNotInitializedError(Exception):
"""
Error raised when trying to access audio samples from
an :class:`~aeneas.audiofile.AudioFile` object which
has not been initialized yet.
"""
pass
[docs]class AudioFileProbeError(Exception):
"""
Error raised when the audio probe executable cannot be executed.
"""
pass
[docs]class AudioFile(Loggable):
"""
A class representing an audio file.
This class can be used either to extract properties
from an audio file on disk,
or to load/edit/save a monoaural (single channel) audio file,
represented as an array of audio samples.
The properties of the audio file (length, format, etc.)
can set by invoking the :func:`~aeneas.audiofile.AudioFile.read_properties` function,
which calls an audio file probe.
(Currently, the probe is :class:`~aeneas.ffprobewrapper.FFPROBEWrapper`)
Moreover, this class can read the audio data,
by converting the original file format
into a temporary PCM16 Mono WAVE (RIFF) file,
which is deleted as soon as audio data is read in memory.
(Currently, the converter is :class:`~aeneas.ffmpegwrapper.FFMPEGWrapper`)
The internal representation of the wave is a
a NumPy 1D array of ``float64`` values in ``[-1.0, 1.0]``.
It supports append, reverse, and trim operations.
Audio samples can be written to file.
Memory can be pre-allocated to speed append operations up.
Allocated memory is doubled when an append operation
requires more memory than what is available;
this leads to an amortized linear complexity
(in the number of audio samples)
for append operations.
.. note:: Support for stereo WAVE files might be implemented in a future version
:param string file_path: the path of the audio file
:param tuple file_format: the format of the audio file, if known in advance: ``(codec, channels, rate)`` or ``None``
:param rconf: a runtime configuration
:type rconf: :class:`~aeneas.runtimeconfiguration.RuntimeConfiguration`
:param logger: the logger object
:type logger: :class:`~aeneas.logger.Logger`
"""
FILE_EXTENSIONS = [
u"3g2",
u"3gp",
u"aa",
u"aa3",
u"aac",
u"aax",
u"aiff",
u"alac",
u"amr",
u"ape",
u"asf",
u"at3",
u"at9",
u"au",
u"avi",
u"awb",
u"celt",
u"dct",
u"dss",
u"dvf",
u"eac",
u"flac",
u"flv",
u"gsm",
u"m4a",
u"m4b",
u"m4p",
u"m4v",
u"mid",
u"midi",
u"mkv",
u"mmf",
u"mov",
u"mp2",
u"mp3",
u"mp4",
u"mpc",
u"mpeg",
u"mpg",
u"mpv",
u"msv",
u"oga",
u"ogg",
u"ogv",
u"oma",
u"opus",
u"pcm",
u"qt",
u"ra",
u"ram",
u"raw",
u"riff",
u"rm",
u"rmvb",
u"shn",
u"sln",
u"theora",
u"tta",
u"vob",
u"vorbis",
u"vox",
u"wav",
u"webm",
u"wma",
u"wmv",
u"wv",
u"yuv",
]
""" Extensions of common formats for audio (and video) files. """
TAG = u"AudioFile"
def __init__(self, file_path=None, file_format=None, rconf=None, logger=None):
super(AudioFile, self).__init__(rconf=rconf, logger=logger)
self.file_path = file_path
self.file_format = file_format
self.file_size = None
self.audio_length = None
self.audio_format = None
self.audio_sample_rate = None
self.audio_channels = None
self.__samples_capacity = 0
self.__samples_length = 0
self.__samples = None
def __unicode__(self):
fmt = self.file_format
if isinstance(fmt, tuple):
fmt = u"%s %d %d" % fmt
msg = [
u"File path: %s" % self.file_path,
u"File format: %s" % fmt,
u"File size (bytes): %s" % gf.safe_int(self.file_size),
u"Audio length (s): %s" % gf.safe_float(self.audio_length),
u"Audio format: %s" % self.audio_format,
u"Audio sample rate: %s" % gf.safe_int(self.audio_sample_rate),
u"Audio channels: %s" % gf.safe_int(self.audio_channels),
u"Samples capacity: %s" % gf.safe_int(self.__samples_capacity),
u"Samples length: %s" % gf.safe_int(self.__samples_length),
]
return u"\n".join(msg)
def __str__(self):
return gf.safe_str(self.__unicode__())
@property
def file_path(self):
"""
The path of the audio file.
:rtype: string
"""
return self.__file_path
@file_path.setter
def file_path(self, file_path):
self.__file_path = file_path
@property
def file_size(self):
"""
The size of the audio file, in bytes.
:rtype: int
"""
return self.__file_size
@file_size.setter
def file_size(self, file_size):
self.__file_size = file_size
@property
def audio_length(self):
"""
The length of the audio file, in seconds.
:rtype: :class:`~aeneas.exacttiming.TimeValue`
"""
return self.__audio_length
@audio_length.setter
def audio_length(self, audio_length):
self.__audio_length = audio_length
@property
def audio_format(self):
"""
The format of the audio file.
:rtype: string
"""
return self.__audio_format
@audio_format.setter
def audio_format(self, audio_format):
self.__audio_format = audio_format
@property
def audio_sample_rate(self):
"""
The sample rate of the audio file, in samples per second.
:rtype: int
"""
return self.__audio_sample_rate
@audio_sample_rate.setter
def audio_sample_rate(self, audio_sample_rate):
self.__audio_sample_rate = audio_sample_rate
@property
def audio_channels(self):
"""
The number of channels of the audio file.
:rtype: int
"""
return self.__audio_channels
@audio_channels.setter
def audio_channels(self, audio_channels):
self.__audio_channels = audio_channels
@property
def audio_samples(self):
"""
The audio audio_samples, that is, an array of ``float64`` values,
each representing an audio sample in ``[-1.0, 1.0]``.
Note that this function returns a view into the
first ``self.__samples_length`` elements of ``self.__samples``.
If you want to clone the values,
you must use e.g. ``numpy.array(audiofile.audio_samples)``.
:rtype: :class:`numpy.ndarray` (1D, view)
:raises: :class:`~aeneas.audiofile.AudioFileNotInitializedError`: if the audio file is not initialized yet
"""
if self.__samples is None:
if self.file_path is None:
self.log_exc(u"AudioFile object not initialized", None, True, AudioFileNotInitializedError)
else:
self.read_samples_from_file()
return self.__samples[0:self.__samples_length]
[docs] def read_properties(self):
"""
Populate this object by reading
the audio properties of the file at the given path.
Currently this function uses
:class:`~aeneas.ffprobewrapper.FFPROBEWrapper`
to get the audio file properties.
:raises: :class:`~aeneas.audiofile.AudioFileProbeError`: if the path to the ``ffprobe`` executable cannot be called
:raises: :class:`~aeneas.audiofile.AudioFileUnsupportedFormatError`: if the audio file has a format not supported
:raises: OSError: if the audio file cannot be read
"""
self.log(u"Reading properties...")
# check the file can be read
if not gf.file_can_be_read(self.file_path):
self.log_exc(u"File '%s' cannot be read" % (self.file_path), None, True, OSError)
# get the file size
self.log([u"Getting file size for '%s'", self.file_path])
self.file_size = gf.file_size(self.file_path)
self.log([u"File size for '%s' is '%d'", self.file_path, self.file_size])
# get the audio properties using FFPROBEWrapper
try:
self.log(u"Reading properties with FFPROBEWrapper...")
properties = FFPROBEWrapper(
rconf=self.rconf,
logger=self.logger
).read_properties(self.file_path)
self.log(u"Reading properties with FFPROBEWrapper... done")
except FFPROBEPathError:
self.log_exc(u"Unable to call ffprobe executable", None, True, AudioFileProbeError)
except (FFPROBEUnsupportedFormatError, FFPROBEParsingError):
self.log_exc(u"Audio file format not supported by ffprobe", None, True, AudioFileUnsupportedFormatError)
# save relevant properties in results inside the audiofile object
self.audio_length = TimeValue(properties[FFPROBEWrapper.STDOUT_DURATION])
self.audio_format = properties[FFPROBEWrapper.STDOUT_CODEC_NAME]
self.audio_sample_rate = gf.safe_int(properties[FFPROBEWrapper.STDOUT_SAMPLE_RATE])
self.audio_channels = gf.safe_int(properties[FFPROBEWrapper.STDOUT_CHANNELS])
self.log([u"Stored audio_length: '%s'", self.audio_length])
self.log([u"Stored audio_format: '%s'", self.audio_format])
self.log([u"Stored audio_sample_rate: '%s'", self.audio_sample_rate])
self.log([u"Stored audio_channels: '%s'", self.audio_channels])
self.log(u"Reading properties... done")
[docs] def read_samples_from_file(self):
"""
Load the audio samples from file into memory.
If ``self.file_format`` is ``None`` or it is not
``("pcm_s16le", 1, self.rconf.sample_rate)``,
the file will be first converted
to a temporary PCM16 mono WAVE file.
Audio data will be read from this temporary file,
which will be then deleted from disk immediately.
Otherwise,
the audio data will be read directly
from the given file,
which will not be deleted from disk.
:raises: :class:`~aeneas.audiofile.AudioFileConverterError`: if the path to the ``ffmpeg`` executable cannot be called
:raises: :class:`~aeneas.audiofile.AudioFileUnsupportedFormatError`: if the audio file has a format not supported
:raises: OSError: if the audio file cannot be read
"""
self.log(u"Loading audio data...")
# check the file can be read
if not gf.file_can_be_read(self.file_path):
self.log_exc(u"File '%s' cannot be read" % (self.file_path), None, True, OSError)
# determine if we need to convert the audio file
convert_audio_file = (
(self.file_format is None) or
(
(self.rconf.safety_checks) and
(self.file_format != ("pcm_s16le", 1, self.rconf.sample_rate))
)
)
# convert the audio file if needed
if convert_audio_file:
# convert file to PCM16 mono WAVE with correct sample rate
self.log(u"self.file_format is None or not good => converting self.file_path")
tmp_handler, tmp_file_path = gf.tmp_file(suffix=u".wav", root=self.rconf[RuntimeConfiguration.TMP_PATH])
self.log([u"Temporary PCM16 mono WAVE file: '%s'", tmp_file_path])
try:
self.log(u"Converting audio file to mono...")
converter = FFMPEGWrapper(rconf=self.rconf, logger=self.logger)
converter.convert(self.file_path, tmp_file_path)
self.file_format = ("pcm_s16le", 1, self.rconf.sample_rate)
self.log(u"Converting audio file to mono... done")
except FFMPEGPathError:
gf.delete_file(tmp_handler, tmp_file_path)
self.log_exc(u"Unable to call ffmpeg executable", None, True, AudioFileConverterError)
except OSError:
gf.delete_file(tmp_handler, tmp_file_path)
self.log_exc(u"Audio file format not supported by ffmpeg", None, True, AudioFileUnsupportedFormatError)
else:
# read the file directly
if self.rconf.safety_checks:
self.log(u"self.file_format is good => reading self.file_path directly")
else:
self.log_warn(u"Safety checks disabled => reading self.file_path directly")
tmp_handler = None
tmp_file_path = self.file_path
# TODO allow calling C extension cwave to read samples faster
try:
self.audio_format = "pcm16"
self.audio_channels = 1
self.audio_sample_rate, self.__samples = scipywavread(tmp_file_path)
# scipy reads a sample as an int16_t, that is, a number in [-32768, 32767]
# so we convert it to a float64 in [-1, 1]
self.__samples = self.__samples.astype("float64") / 32768
self.__samples_capacity = len(self.__samples)
self.__samples_length = self.__samples_capacity
self._update_length()
except ValueError:
self.log_exc(u"Audio format not supported by scipywavread", None, True, AudioFileUnsupportedFormatError)
# if we converted the audio file, delete the temporary converted audio file
if convert_audio_file:
gf.delete_file(tmp_handler, tmp_file_path)
self.log([u"Deleted temporary audio file: '%s'", tmp_file_path])
self._update_length()
self.log([u"Sample length: %.3f", self.audio_length])
self.log([u"Sample rate: %d", self.audio_sample_rate])
self.log([u"Audio format: %s", self.audio_format])
self.log([u"Audio channels: %d", self.audio_channels])
self.log(u"Loading audio data... done")
[docs] def preallocate_memory(self, capacity):
"""
Preallocate memory to store audio samples,
to avoid repeated new allocations and copies
while performing several consecutive append operations.
If ``self.__samples`` is not initialized,
it will become an array of ``capacity`` zeros.
If ``capacity`` is larger than the current capacity,
the current ``self.__samples`` will be extended with zeros.
If ``capacity`` is smaller than the current capacity,
the first ``capacity`` values of ``self.__samples``
will be retained.
:param int capacity: the new capacity, in number of samples
:raises: ValueError: if ``capacity`` is negative
.. versionadded:: 1.5.0
"""
if capacity < 0:
raise ValueError(u"The capacity value cannot be negative")
if self.__samples is None:
self.log(u"Not initialized")
self.__samples = numpy.zeros(capacity)
self.__samples_length = 0
else:
self.log([u"Previous sample length was (samples): %d", self.__samples_length])
self.log([u"Previous sample capacity was (samples): %d", self.__samples_capacity])
self.__samples = numpy.resize(self.__samples, capacity)
self.__samples_length = min(self.__samples_length, capacity)
self.__samples_capacity = capacity
self.log([u"Current sample capacity is (samples): %d", self.__samples_capacity])
[docs] def minimize_memory(self):
"""
Reduce the allocated memory to the minimum
required to store the current audio samples.
This function is meant to be called
when building a wave incrementally,
after the last append operation.
.. versionadded:: 1.5.0
"""
if self.__samples is None:
self.log(u"Not initialized, returning")
else:
self.log(u"Initialized, minimizing memory...")
self.preallocate_memory(self.__samples_length)
self.log(u"Initialized, minimizing memory... done")
[docs] def add_samples(self, samples, reverse=False):
"""
Concatenate the given new samples to the current audio data.
This function initializes the memory if no audio data
is present already.
If ``reverse`` is ``True``, the new samples
will be reversed and then concatenated.
:param samples: the new samples to be concatenated
:type samples: :class:`numpy.ndarray` (1D)
:param bool reverse: if ``True``, concatenate new samples after reversing them
.. versionadded:: 1.2.1
"""
self.log(u"Adding samples...")
samples_length = len(samples)
current_length = self.__samples_length
future_length = current_length + samples_length
if (self.__samples is None) or (self.__samples_capacity < future_length):
self.preallocate_memory(2 * future_length)
if reverse:
self.__samples[current_length:future_length] = samples[::-1]
else:
self.__samples[current_length:future_length] = samples[:]
self.__samples_length = future_length
self._update_length()
self.log(u"Adding samples... done")
[docs] def reverse(self):
"""
Reverse the audio data.
:raises: :class:`~aeneas.audiofile.AudioFileNotInitializedError`: if the audio file is not initialized yet
.. versionadded:: 1.2.0
"""
if self.__samples is None:
if self.file_path is None:
self.log_exc(u"AudioFile object not initialized", None, True, AudioFileNotInitializedError)
else:
self.read_samples_from_file()
self.log(u"Reversing...")
self.__samples[0:self.__samples_length] = numpy.flipud(self.__samples[0:self.__samples_length])
self.log(u"Reversing... done")
[docs] def trim(self, begin=None, length=None):
"""
Get a slice of the audio data of ``length`` seconds,
starting from ``begin`` seconds.
If audio data is not loaded, load it and then slice it.
:param begin: the start position, in seconds
:type begin: :class:`~aeneas.exacttiming.TimeValue`
:param length: the position, in seconds
:type length: :class:`~aeneas.exacttiming.TimeValue`
:raises: TypeError: if one of the arguments is not ``None``
or :class:`~aeneas.exacttiming.TimeValue`
.. versionadded:: 1.2.0
"""
for variable, name in [(begin, "begin"), (length, "length")]:
if (variable is not None) and (not isinstance(variable, TimeValue)):
raise TypeError(u"%s is not None or TimeValue" % name)
self.log(u"Trimming...")
if (begin is None) and (length is None):
self.log(u"begin and length are both None: nothing to do")
else:
if begin is None:
begin = TimeValue("0.000")
self.log([u"begin was None, now set to %.3f", begin])
begin = min(max(TimeValue("0.000"), begin), self.audio_length)
self.log([u"begin is %.3f", begin])
if length is None:
length = self.audio_length - begin
self.log([u"length was None, now set to %.3f", length])
length = min(max(TimeValue("0.000"), length), self.audio_length - begin)
self.log([u"length is %.3f", length])
begin_index = int(begin * self.audio_sample_rate)
end_index = int((begin + length) * self.audio_sample_rate)
new_idx = end_index - begin_index
self.__samples[0:new_idx] = self.__samples[begin_index:end_index]
self.__samples_length = new_idx
self._update_length()
self.log(u"Trimming... done")
[docs] def write(self, file_path):
"""
Write the audio data to file.
Return ``True`` on success, or ``False`` otherwise.
:param string file_path: the path of the output file to be written
:raises: :class:`~aeneas.audiofile.AudioFileNotInitializedError`: if the audio file is not initialized yet
.. versionadded:: 1.2.0
"""
if self.__samples is None:
if self.file_path is None:
self.log_exc(u"AudioFile object not initialized", None, True, AudioFileNotInitializedError)
else:
self.read_samples_from_file()
self.log([u"Writing audio file '%s'...", file_path])
try:
# our value is a float64 in [-1, 1]
# scipy writes the sample as an int16_t, that is, a number in [-32768, 32767]
data = (self.audio_samples * 32768).astype("int16")
scipywavwrite(file_path, self.audio_sample_rate, data)
except Exception as exc:
self.log_exc(u"Error writing audio file to '%s'" % (file_path), exc, True, OSError)
self.log([u"Writing audio file '%s'... done", file_path])
[docs] def clear_data(self):
"""
Clear the audio data, freeing memory.
"""
self.log(u"Clear audio_data")
self.__samples_capacity = 0
self.__samples_length = 0
self.__samples = None
def _update_length(self):
"""
Update the audio length property,
according to the length of the current audio data
and audio sample rate.
This function fails silently if one of the two is ``None``.
"""
if (self.audio_sample_rate is not None) and (self.__samples is not None):
# NOTE computing TimeValue (... / ...) yields wrong results,
# see issue #168
# self.audio_length = TimeValue(self.__samples_length / self.audio_sample_rate)
self.audio_length = TimeValue(self.__samples_length) / TimeValue(self.audio_sample_rate)