Source code for polymerist.genutils.sequences.similarity.edits

'''For calculating the edit distance between sequences and inspecting the edits needed to go between them'''

__author__ = 'Timotej Bernat'
__email__ = 'timotej.bernat@colorado.edu'

from typing import Any, Generator, Sequence, Type, TypeVar, TypeAlias
from dataclasses import dataclass, field, replace
T = TypeVar('T')

from enum import Enum
import numpy as np

from ...typetools.numpytypes import N, M, Shape
from ...bits import int_to_bits


# REPRESENTATION CLASSES
[docs] class EditOperation(Enum): # TODO: reimplement as bitwise flags '''For annotating distinct kinds of sequence edits and their associated index offsets''' NULL = 0 # NOTE: order of fields here intentional and CANNOT be modified! INSERTION = 1 # ... this is because the bits of each index are the row and column DELETION = 2 # ... offsets corresponding to that edit operation in a Wagner-Fischer matrix SUBSTITUTION = 3 @property def bits(self) -> tuple[int, int]: '''Convert the integer value of the Enum field into its binary bits''' return tuple(int_to_bits(self.value, num_bits=2, as_list=True)) offsets = bits # alias to make dependent code more readable
[docs] @dataclass class EditInfo: '''for bundling together information about a sequence edit step''' edit_op : EditOperation indices : tuple[int, int] distance : int
# WAGNER-FISCHER MATRIX OPERATION
[docs] def compute_wf_matrix(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int) -> np.ndarray[Shape[N, M], int]: '''Compute (N+1)x(M+1) matrix of Levenshtein distances between all partial prefices of a pair of sequences where N and M are the lengths of the first and second sequence, respectively. Implements the Wagner-Fischer algorithm''' n, m = len(seq1), len(seq2) n_aug, m_aug = n + 1, m + 1 # initialize matrix with all zeros apart from first row and column, wf_matrix = np.zeros((n_aug, m_aug), dtype=int_type) wf_matrix[:, 0] = np.arange(n_aug, dtype=int_type) # index along first column is same as number of deletions to get to empty sequence - NOTE: element [0, 0] overlaps here wf_matrix[0, :] = np.arange(m_aug, dtype=int_type) # index along first row is same as number of insertions to get from empty sequence - NOTE: element [0, 0] overlaps here # populate matrix by iterating on distances between sub-sequence problems for i, elem1 in enumerate(seq1, start=1): for j, elem2 in enumerate(seq2, start=1): wf_matrix[i, j] = 1 + np.min([ wf_matrix[i-1, j], # deletion at end of second sequence (after augmentation to end of first sequence) wf_matrix[i, j-1], # insertion at end of second sequence wf_matrix[i-1, j-1] - int(elem1 == elem2) # substition of last elements between sequences (if elements are equal, then substitution costs nothing) ]) # TODO: implement support for transposition weighting (i.e. Damerau-Levenshtein distance) return wf_matrix
[docs] def traverse_wf_matrix(wf_matrix : np.ndarray[Shape[N, M], int], begin_idxs : tuple[int, int]=(0, 0), end_idxs : tuple[int, int]=(-1, -1)) -> Generator[list[EditInfo], None, None]: '''Takes a Wagner-Fischer Levenshtein distance matrix and returns the indices of the minimal path through the matrix from the origin (i.e. empty sequences) to the ''' assert(wf_matrix.ndim == 2) n_aug, m_aug = wf_matrix.shape if end_idxs == begin_idxs: yield [ # need to wrap in list for consistent typing in recursive calls EditInfo( # base case edit_op=EditOperation.NULL, indices=begin_idxs, distance=0 ) ] else: i, j = end_idxs i %= n_aug # ensure values are positive j %= m_aug # ensure values are positive curr_dist = wf_matrix[i, j] prev_edits = [] for edit_op in EditOperation: if edit_op == EditOperation.NULL: continue # need to skip over to avoid RecursionError - TOSELF: would be nice to reimplement EditOperations as bitwise flags to streamline this di, dj = edit_op.offsets # unpack index offsets i_prev, j_prev = i - di, j - dj prev_edits.append( EditInfo( edit_op=edit_op, indices=(i_prev, j_prev), distance=wf_matrix[i_prev, j_prev] ) ) min_prev_dist = min(ei.distance for ei in prev_edits) for edit_info in prev_edits: ret_edit_info = replace(edit_info, indices=(i, j)) # create a copy of the current edit info which reports the current if (edit_info.edit_op == EditOperation.SUBSTITUTION) and (curr_dist == min_prev_dist): ret_edit_info.edit_op = EditOperation.NULL if edit_info.distance == min_prev_dist: for edit_steps in traverse_wf_matrix(wf_matrix, begin_idxs=begin_idxs, end_idxs=edit_info.indices): # recursive tail call through all possible predecessors yield edit_steps + [ret_edit_info]
[docs] def describe_edits(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int, indicator : str=' -> ', delimiter : str='\n') -> Generator[str, None, None]: '''Describes step-by-step the insertion, deletion, or substitution operations needed to transform one sequence into another''' seqs = (seq1, seq2) # need to bundle for zipping later wf_matrix = compute_wf_matrix(seq1, seq2, int_type=int_type) for edits in traverse_wf_matrix(wf_matrix): edit_descs : list[str] = [] for edit_info in edits: edit_op = edit_info.edit_op if edit_op == EditOperation.NULL: continue elem_edit_str = indicator.join( # TODO : add printout for how the word being modified looks after each change (notnjust which symbols are changing) str(seq[idx - 1] if to_show else None) # need to subtract 1 to get index of step previous to current edit for idx, to_show, seq in zip(edit_info.indices, edit_op.bits, seqs) ) edit_descs.append(f'{edit_info.edit_op.name}: {elem_edit_str}') yield delimiter.join(edit_descs)
# EDIT DISTANCES
[docs] def levenshtein_distance(seq1 : Sequence[T], seq2 : Sequence[T], int_type : Type=int) -> int: '''Compute the Levenshtein (edit) distance between a pair of sequences with elements of compatible type Denotes the minimal number of insertion, deletion, or substitution operations needed to transform either sequence into the other''' return compute_wf_matrix(seq1, seq2, int_type=int_type)[-1, -1]