Source code for aeneas.syncmap.fragmentlist

#!/usr/bin/env python
# coding=utf-8

# aeneas is a Python/C library and a set of tools
# to automagically synchronize audio and text (aka forced alignment)
#
# Copyright (C) 2012-2013, Alberto Pettarin (www.albertopettarin.it)
# Copyright (C) 2013-2015, ReadBeyond Srl   (www.readbeyond.it)
# Copyright (C) 2015-2017, Alberto Pettarin (www.albertopettarin.it)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import absolute_import
from __future__ import print_function
from copy import deepcopy
from bisect import insort

from aeneas.exacttiming import TimeInterval
from aeneas.exacttiming import TimeValue
from aeneas.logger import Loggable
from aeneas.syncmap.fragment import SyncMapFragment
from aeneas.textfile import TextFragment
import aeneas.globalconstants as gc


[docs]class SyncMapFragmentList(Loggable): """ A type representing a list of sync map fragments, with some constraints: * the begin and end time of each fragment should be within the list begin and end times; * two time fragments can only overlap at the boundary; * the list is kept sorted. This class has some convenience methods for clipping, offsetting, moving fragment boundaries, and fixing fragments with zero length. :param begin: the begin time :type begin: :class:`~aeneas.exacttiming.TimeValue` :param end: the end time :type end: :class:`~aeneas.exacttiming.TimeValue` :raises TypeError: if ``begin`` or ``end`` are not instances of :class:`~aeneas.exacttiming.TimeValue` :raises ValueError: if ``begin`` is negative or if ``begin`` is bigger than ``end`` .. versionadded:: 1.7.0 """ ALLOWED_POSITIONS = [ TimeInterval.RELATIVE_POSITION_PP_L, TimeInterval.RELATIVE_POSITION_PP_C, TimeInterval.RELATIVE_POSITION_PP_G, TimeInterval.RELATIVE_POSITION_PI_LL, TimeInterval.RELATIVE_POSITION_PI_LC, TimeInterval.RELATIVE_POSITION_PI_CG, TimeInterval.RELATIVE_POSITION_PI_GG, TimeInterval.RELATIVE_POSITION_IP_L, TimeInterval.RELATIVE_POSITION_IP_B, TimeInterval.RELATIVE_POSITION_IP_E, TimeInterval.RELATIVE_POSITION_IP_G, TimeInterval.RELATIVE_POSITION_II_LL, TimeInterval.RELATIVE_POSITION_II_LB, TimeInterval.RELATIVE_POSITION_II_EG, TimeInterval.RELATIVE_POSITION_II_GG, ] """ Allowed positions for any pair of time intervals in the list """ TAG = u"SyncMapFragmentList" def __init__(self, begin, end, rconf=None, logger=None): if not isinstance(begin, TimeValue): raise TypeError(u"begin is not an instance of TimeValue") if not isinstance(end, TimeValue): raise TypeError(u"end is not an instance of TimeValue") if begin < 0: raise ValueError(u"begin is negative") if begin > end: raise ValueError(u"begin is bigger than end") super(SyncMapFragmentList, self).__init__(rconf=rconf, logger=logger) self.begin = begin self.end = end self.__sorted = True self.__fragments = [] def __len__(self): return len(self.__fragments) def __getitem__(self, index): return self.__fragments[index] def __setitem__(self, index, value): self.__fragments[index] = value def _is_valid_index(self, index): """ Return ``True`` if and only if the given ``index`` is valid. """ if isinstance(index, int): return (index >= 0) and (index < len(self)) if isinstance(index, list): valid = True for i in index: valid = valid or self._is_valid_index(i) return valid return False def _check_boundaries(self, fragment): """ Check that the interval of the given fragment is within the boundaries of the list. Raises an error if not OK. """ if not isinstance(fragment, SyncMapFragment): raise TypeError(u"fragment is not an instance of SyncMapFragment") interval = fragment.interval if not isinstance(interval, TimeInterval): raise TypeError(u"interval is not an instance of TimeInterval") if (self.begin is not None) and (interval.begin < self.begin): raise ValueError(u"interval.begin is before self.begin") if (self.end is not None) and (interval.end > self.end): raise ValueError(u"interval.end is after self.end") def _check_overlap(self, fragment): """ Check that the interval of the given fragment does not overlap any existing interval in the list (except at its boundaries). Raises an error if not OK. """ # # NOTE bisect does not work if there is a configuration like: # # *********** <- existing interval # *** <- query interval # # TODO one should probably check this by doing bisect # over the begin and end lists separately # for existing_fragment in self.fragments: if existing_fragment.interval.relative_position_of(fragment.interval) not in self.ALLOWED_POSITIONS: self.log_exc(u"interval overlaps another already present interval", None, True, ValueError) def _check_min_max_indices(self, min_index=None, max_index=None): """ Ensure the given start/end fragment indices make sense: if one of them is ``None`` (i.e., not specified), then set it to ``0`` or ``len(self)``. """ min_index = min_index or 0 max_index = max_index or len(self) if min_index < 0: self.log_exc(u"min_index is negative", None, True, ValueError) if max_index > len(self): self.log_exc(u"max_index is bigger than the number of intervals in the list", None, True, ValueError) return min_index, max_index
[docs] def clone(self): """ Return a deep copy of this configuration object. :rtype: :class:`~aeneas.syncmap.fragmentlist.SyncMapFragmentList` """ return deepcopy(self)
@property def is_guaranteed_sorted(self): """ Return ``True`` if the list is sorted, and ``False`` if it might not be sorted (for example, because an ``add(..., sort=False)`` operation was performed). :rtype: bool """ return self.__sorted @property def fragments(self): """ Iterates through the fragments in the list (which are sorted). :rtype: generator of :class:`~aeneas.syncmap.SyncMapFragment` """ for fragment in self.__fragments: yield fragment @property def regular_fragments(self): """ Iterates through the regular fragments in the list (which are sorted). :rtype: generator of (int, :class:`~aeneas.syncmap.SyncMapFragment`) """ for i, fragment in enumerate(self.__fragments): if fragment.fragment_type == SyncMapFragment.REGULAR: yield (i, fragment) @property def nonspeech_fragments(self): """ Iterates through the nonspeech fragments in the list (which are sorted). :rtype: generator of (int, :class:`~aeneas.syncmap.SyncMapFragment`) """ for i, fragment in enumerate(self.__fragments): if fragment.fragment_type == SyncMapFragment.NONSPEECH: yield (i, fragment)
[docs] def remove(self, indices): """ Remove the fragments corresponding to the given list of indices. :param indices: the list of indices to be removed :type indices: list of int :raises ValueError: if one of the indices is not valid """ if not self._is_valid_index(indices): self.log_exc(u"The given list of indices is not valid", None, True, ValueError) new_fragments = [] sorted_indices = sorted(indices) i = 0 j = 0 while (i < len(self)) and (j < len(sorted_indices)): if i != sorted_indices[j]: new_fragments.append(self[i]) else: j += 1 i += 1 while i < len(self): new_fragments.append(self[i]) i += 1 self.__fragments = new_fragments
[docs] def sort(self): """ Sort the fragments in the list. :raises ValueError: if there is a fragment which violates the list constraints """ if self.is_guaranteed_sorted: self.log(u"Already sorted, returning") return self.log(u"Sorting...") self.__fragments = sorted(self.__fragments) self.log(u"Sorting... done") self.log(u"Checking relative positions...") for i in range(len(self) - 1): current_interval = self[i].interval next_interval = self[i + 1].interval if current_interval.relative_position_of(next_interval) not in self.ALLOWED_POSITIONS: self.log(u"Found overlapping fragments:") self.log([u" Index %d => %s", i, current_interval]) self.log([u" Index %d => %s", i + 1, next_interval]) self.log_exc(u"The list contains two fragments overlapping in a forbidden way", None, True, ValueError) self.log(u"Checking relative positions... done") self.__sorted = True
[docs] def remove_nonspeech_fragments(self, zero_length_only=False): """ Remove ``NONSPEECH`` fragments from the list. If ``zero_length_only`` is ``True``, remove only those fragments with zero length, and make all the others ``REGULAR``. :param bool zero_length_only: remove only zero length NONSPEECH fragments """ self.log(u"Removing nonspeech fragments...") nonspeech = list(self.nonspeech_fragments) if zero_length_only: nonspeech = [(i, f) for i, f in nonspeech if f.has_zero_length] nonspeech_indices = [i for i, f in nonspeech] self.remove(nonspeech_indices) if zero_length_only: for i, f in list(self.nonspeech_fragments): f.fragment_type = SyncMapFragment.REGULAR self.log(u"Removing nonspeech fragments... done")
[docs] def has_zero_length_fragments(self, min_index=None, max_index=None): """ Return ``True`` if the list has at least one interval with zero length withing ``min_index`` and ``max_index``. If the latter are not specified, check all intervals. :param int min_index: examine fragments with index greater than or equal to this index (i.e., included) :param int max_index: examine fragments with index smaller than this index (i.e., excluded) :raises ValueError: if ``min_index`` is negative or ``max_index`` is bigger than the current number of fragments :rtype: bool """ min_index, max_index = self._check_min_max_indices(min_index, max_index) zero = [i for i in range(min_index, max_index) if self[i].has_zero_length] self.log([u"Fragments with zero length: %s", zero]) return (len(zero) > 0)
[docs] def has_adjacent_fragments_only(self, min_index=None, max_index=None): """ Return ``True`` if the list contains only adjacent fragments, that is, if it does not have gaps. :param int min_index: examine fragments with index greater than or equal to this index (i.e., included) :param int max_index: examine fragments with index smaller than this index (i.e., excluded) :raises ValueError: if ``min_index`` is negative or ``max_index`` is bigger than the current number of fragments :rtype: bool """ min_index, max_index = self._check_min_max_indices(min_index, max_index) for i in range(min_index, max_index - 1): current_interval = self[i].interval next_interval = self[i + 1].interval if not current_interval.is_adjacent_before(next_interval): self.log(u"Found non adjacent fragments") self.log([u" Index %d => %s", i, current_interval]) self.log([u" Index %d => %s", i + 1, next_interval]) return False return True
[docs] def add(self, fragment, sort=True): """ Add the given fragment to the list (and keep the latter sorted). An error is raised if the fragment cannot be added, for example if its interval violates the list constraints. :param fragment: the fragment to be added :type fragment: :class:`~aeneas.syncmap.SyncMapFragment` :param bool sort: if ``True`` ensure that after the insertion the list is kept sorted :raises TypeError: if ``interval`` is not an instance of ``TimeInterval`` :raises ValueError: if ``interval`` does not respect the boundaries of the list or if it overlaps an existing interval, or if ``sort=True`` but the list is not guaranteed sorted """ self._check_boundaries(fragment) if sort: if not self.is_guaranteed_sorted: self.log_exc(u"Unable to add with sort=True if the list is not guaranteed sorted", None, True, ValueError) self._check_overlap(fragment) insort(self.__fragments, fragment) # self.log(u"Inserted and kept sorted flag true") else: self.__fragments.append(fragment) self.__sorted = False
# self.log(u"Appended at the end and invalidated sorted flag")
[docs] def offset(self, offset): """ Move all the intervals in the list by the given ``offset``. :param offset: the shift to be applied :type offset: :class:`~aeneas.exacttiming.TimeValue` :raises TypeError: if ``offset`` is not an instance of ``TimeValue`` """ self.log(u"Applying offset to all fragments...") self.log([u" Offset %.3f", offset]) for fragment in self.fragments: fragment.interval.offset( offset=offset, allow_negative=False, min_begin_value=self.begin, max_end_value=self.end ) self.log(u"Applying offset to all fragments... done")
[docs] def move_transition_point(self, fragment_index, value): """ Change the transition point between fragment ``fragment_index`` and the next fragment to the time value ``value``. This method fails silently (without changing the fragment list) if at least one of the following conditions holds: * ``fragment_index`` is negative * ``fragment_index`` is the last or the second-to-last * ``value`` is after the current end of the next fragment * the current fragment and the next one are not adjacent and both proper intervals (not zero length) The above conditions ensure that the move makes sense and that it keeps the list satisfying the constraints. :param int fragment_index: the fragment index whose end should be moved :param value: the new transition point :type value: :class:`~aeneas.exacttiming.TimeValue` """ self.log(u"Called move_transition_point with") self.log([u" fragment_index %d", fragment_index]) self.log([u" value %.3f", value]) if (fragment_index < 0) or (fragment_index > (len(self) - 3)): self.log(u"Bad fragment_index, returning") return current_interval = self[fragment_index].interval next_interval = self[fragment_index + 1].interval if value > next_interval.end: self.log(u"Bad value, returning") return if not current_interval.is_non_zero_before_non_zero(next_interval): self.log(u"Bad interval configuration, returning") return current_interval.end = value next_interval.begin = value self.log(u"Moved transition point")
[docs] def fragments_ending_inside_nonspeech_intervals( self, nonspeech_intervals, tolerance ): """ Determine a list of pairs (nonspeech interval, fragment index), such that the nonspeech interval contains exactly one fragment ending inside it (within the given tolerance) and adjacent to the next fragment. :param nonspeech_intervals: the list of nonspeech intervals to be examined :type nonspeech_intervals: list of :class:`~aeneas.exacttiming.TimeInterval` :param tolerance: the tolerance to be applied when checking if the end point falls within a given nonspeech interval :type tolerance: :class:`~aeneas.exacttiming.TimeValue` :rtype: list of (:class:`~aeneas.exacttiming.TimeInterval`, int) """ self.log(u"Called fragments_ending_inside_nonspeech_intervals") self.log([u" List begin: %.3f", self.begin]) self.log([u" List end: %.3f", self.end]) nsi_index = 0 frag_index = 0 nsi_counter = [(n, []) for n in nonspeech_intervals] # NOTE the last fragment is not eligible to be returned while (nsi_index < len(nonspeech_intervals)) and (frag_index < len(self) - 1): nsi = nonspeech_intervals[nsi_index] if nsi.end > self.end: self.log(u" nsi ends after self.end => breaking") break nsi_shadow = nsi.shadow(tolerance) frag = self[frag_index] self.log([u" nsi %s", nsi]) self.log([u" nsi_shadow %s", nsi_shadow]) self.log([u" frag %s", frag.interval]) if not frag.is_head_or_tail: self.log(u" Fragment is not HEAD or TAIL => inspecting it") if nsi_shadow.contains(frag.end): if nsi_shadow.contains(frag.begin): # # *************** nsi shadow # | *********** | nsi # | ***X | frag (X=frag.end) # # NOTE this case might happen as the following: # # *************** nsi shadow # | *** | nsi # | **X | frag (X=frag.end) # # so we must invalidate the nsi if this happens # nsi_counter[nsi_index] = (None, []) nsi_index += 1 frag_index += 1 self.log(u" nsi_shadow entirely contains frag => invalidate nsi, and skip to next fragment, nsi") else: # # *************** nsi shadow # | *********** | nsi # *****|***X | frag (X=frag.end) # nsi_counter[nsi_index][1].append(frag_index) frag_index += 1 self.log(u" nsi_shadow contains frag end only => save it and go to next fragment") elif nsi_shadow.begin > frag.end: # # *************** nsi shadow # | *********** | nsi # **X | | frag (X=frag.end) # frag_index += 1 self.log(u" nsi_shadow begins after frag end => skip to next fragment") else: # # *************** nsi shadow # | *********** | nsi # | *****|**X frag (X=frag.end) # nsi_index += 1 self.log(u" nsi_shadow ends before frag end => skip to next nsi") else: self.log(u" Fragment is HEAD or TAIL => skipping it") frag_index += 1 self.log(u"") tbr = [(n, c[0]) for (n, c) in nsi_counter if len(c) == 1] self.log([u"Returning: %s", tbr]) return tbr
[docs] def inject_long_nonspeech_fragments(self, pairs, replacement_string): """ Inject nonspeech fragments corresponding to the given intervals in this fragment list. It is assumed that ``pairs`` are consistent, e.g. they are produced by ``fragments_ending_inside_nonspeech_intervals``. :param list pairs: list of ``(TimeInterval, int)`` pairs, each identifying a nonspeech interval and the corresponding fragment index ending inside it :param string replacement_string: the string to be applied to the nonspeech intervals """ self.log(u"Called inject_long_nonspeech_fragments") # set the appropriate fragment text if replacement_string in [None, gc.PPV_TASK_ADJUST_BOUNDARY_NONSPEECH_REMOVE]: self.log(u" Remove long nonspeech") lines = [] else: self.log([u" Replace long nonspeech with '%s'", replacement_string]) lines = [replacement_string] # first, make room for the nonspeech intervals self.log(u" First pass: making room...") for nsi, index in pairs: self[index].interval.end = nsi.begin self[index + 1].interval.begin = nsi.end self.log(u" First pass: making room... done") self.log(u" Second pass: append nonspeech intervals...") for i, (nsi, index) in enumerate(pairs, 1): identifier = u"n%06d" % i self.add(SyncMapFragment( text_fragment=TextFragment( identifier=identifier, language=None, lines=lines, filtered_lines=lines ), interval=nsi, fragment_type=SyncMapFragment.NONSPEECH ), sort=False) self.log(u" Second pass: append nonspeech intervals... done") self.log(u" Third pass: sorting...") self.sort() self.log(u" Third pass: sorting... done")
[docs] def fix_zero_length_fragments(self, duration=TimeValue("0.001"), min_index=None, max_index=None, ensure_adjacent=True): """ Fix fragments with zero length, enlarging them to have length ``duration``, reclaiming the difference from the next fragment(s), or moving the next fragment(s) forward. This function assumes the fragments to be adjacent. :param duration: set the zero length fragments to have this duration :type duration: :class:`~aeneas.exacttiming.TimeValue` :param int min_index: examine fragments with index greater than or equal to this index (i.e., included) :param int max_index: examine fragments with index smaller than this index (i.e., excluded) :raises ValueError: if ``min_index`` is negative or ``max_index`` is bigger than the current number of fragments """ self.log(u"Called fix_zero_length_fragments") self.log([u" Duration %.3f", duration]) min_index, max_index = self._check_min_max_indices(min_index, max_index) if len(self) < 1: self.log(u"The list has no fragments: returning") return if not self.has_adjacent_fragments_only(min_index, max_index): self.log_warn(u"There are non adjacent fragments: aborting") return original_first_begin = None if ( (ensure_adjacent) and (min_index > 0) and (self[min_index - 1].interval.is_adjacent_before(self[min_index].interval)) ): original_first_begin = self[min_index].begin self.log([u"Original first was adjacent with previous, starting at %.3f", original_first_begin]) original_last_end = None if ( (ensure_adjacent) and (len(self) > 1) and (max_index < len(self)) and (self[max_index - 1].interval.is_adjacent_before(self[max_index].interval)) ): original_last_end = self[max_index - 1].end self.log([u"Original last was adjacent with next, ending at %.3f", original_last_end]) i = min_index while i < max_index: if self[i].has_zero_length: self.log([u" Fragment %d (%s) has zero length => ENLARGE", i, self[i].interval]) moves = [(i, "ENLARGE", duration)] slack = duration j = i + 1 self.log([u" Entered while with j == %d", j]) while (j < max_index) and (self[j].interval.length < slack): if self[j].has_zero_length: self.log([u" Fragment %d (%s) has zero length => ENLARGE", j, self[j].interval]) moves.append((j, "ENLARGE", duration)) slack += duration else: self.log([u" Fragment %d (%s) has non zero length => MOVE", j, self[j].interval]) moves.append((j, "MOVE", None)) j += 1 self.log([u" Exited while with j == %d", j]) fixable = False if (j == max_index) and (self[j - 1].interval.end + slack <= self.end): self.log(u" Fixable by moving back") current_time = self[j - 1].interval.end + slack fixable = True elif j < max_index: self.log(u" Fixable by shrinking") self[j].interval.shrink(slack) current_time = self[j].interval.begin fixable = True if fixable: for index, move_type, move_amount in moves[::-1]: self.log([u" Calling move_end_at with %.3f at index %d", current_time, index]) self[index].interval.move_end_at(current_time) if move_type == "ENLARGE": self.log([u" Calling enlarge with %.3f at index %d", move_amount, index]) self[index].interval.enlarge(move_amount) self.log([u" Interval %d is now: %s", index, self[index].interval]) current_time = self[index].interval.begin else: self.log([u"Unable to fix fragment %d (%s)", i, self[i].interval]) i = j - 1 i += 1 if original_first_begin is not None: if self[min_index].begin != self[min_index - 1].end: self.log(u"First fragment begin moved, restoring adjacency") self.log([u" Original was %.3f", original_first_begin]) self.log([u" New is %.3f", self[min_index - 1].end]) self[min_index].begin = self[min_index - 1].end if original_last_end is not None: if self[max_index].begin != self[max_index - 1].end: self.log(u"Last fragment end moved, restoring adjacency") self.log([u" Original was %.3f", original_last_end]) self.log([u" New is %.3f", self[max_index].begin]) self[max_index].begin = self[max_index - 1].end self.log(u"Fragments after fixing:") for i, fragment in enumerate(self): self.log([u" %d => %.3f %.3f", i, fragment.interval.begin, fragment.interval.end])
def fix_fragment_rate(self, fragment_index, max_rate, aggressive=False): def fix_pair(current_index, donor_index): self.log(u"Called fix_pair") if ( (current_index < 0) or (current_index >= len(self)) or (donor_index < 0) or (donor_index >= len(self)) or (abs(current_index - donor_index) > 1) ): self.log(u"Invalid index, returning False") return False donor_is_previous = donor_index < current_index current_fragment = self[current_index] donor_fragment = self[donor_index] if (current_fragment.rate is not None) and (current_fragment.rate <= max_rate): self.log(u"Current fragment rate is already <= max_rate, returning True") return True if donor_is_previous: if not donor_fragment.interval.is_non_zero_before_non_zero(current_fragment.interval): self.log(u"donor fragment is not adjacent before current fragment, returning False") return False else: if not current_fragment.interval.is_non_zero_before_non_zero(donor_fragment.interval): self.log(u"current fragment is not adjacent before donor fragment, returning False") return False self.log(u"Current and donor fragments are adjacent and not zero length") current_lack = current_fragment.rate_lack(max_rate) donor_slack = donor_fragment.rate_slack(max_rate) self.log([u"Current lack %.3f", current_lack]) self.log([u"Donor slack %.3f", donor_slack]) if donor_slack <= 0: self.log(u"Donor has no slack, returning False") return False self.log(u"Donor has some slack") effective_slack = min(current_lack, donor_slack) if donor_is_previous: self.move_transition_point(donor_index, donor_fragment.end - effective_slack) else: self.move_transition_point(current_index, current_fragment.end + effective_slack) if effective_slack == current_lack: self.log(u"Current lack can be fully stolen from donor") return True else: self.log(u"Current lack can be partially stolen from donor") return False # try fixing rate stealing slack from the previous fragment if fix_pair(fragment_index, fragment_index - 1): return True # if aggressive, try fixing rate stealing slack from the next fragment if aggressive: return fix_pair(fragment_index, fragment_index + 1) # cannot be fixed return False