Friday, May 26, 2017

A Generic Sequence Alignment Algorithm

In this post I will describe how a general sequence alignment could be implemented in Scala.
For readers that don't know what sequence alignment is, I recommend reading my
post on building a gesture recogniser under Dynamic Time Warping distance (DTW).

Sequence alignment is often used in several contexts including speech recognition,
gesture recognition, approximate string matching, typing correction, biological
sequence alignment and time series classification and even stereo vision.
All these algorithms are based on a technique called dynamic programming and the algorithms
are all very similar. However, in literature this alignment technique is often found under different names (Dynamic Time Warping, Levensthein Distance, Needleman-Wunsch).

I will first formulate my idea of a general alignment algorithm with a scala implementation and afterwards describe an implementation of the three above algorithms.

Given two sequences X = [x1 .. xi .. xN] and Y = [y .. yj .. yM], we use the following
recursion to find the alignment score S(x, y). At each position i in sequence X and position j in sequence Y we look backwards at the matching score in X only, in Y only and in X and Y simultaneously resulting in the following recursion:


The rank function scores the alignment at position i and j based on the previous alignment scores and
the current elements. The rank function is the main component that changes in all three algorithms.
Looking back in X direction is also called an insertion, in Y direction a deletion and for both it is
called a match so the ranking function becomes:



Furthermore we will also implement a generic initialisation function that starts our recursion.
Ok now the Scala code:

class Alignment[A](
  init: (Int, Int) => Double,
  rank: (Double, Double, Double, A, A) => Double
) {

  def score(x: Array[A], y: Array[A]): Double = {  
    val n  = x.size
    val m  = y.size
    val dp = Array.ofDim[Double](n + 1, m + 1)
    
    for(i <- 0 until n + 1) dp(i)(0) = init(i, 0)
    for(j <- 0 until m + 1) dp(0)(j) = init(0, j)

    for(i <- 1 until n + 1) {
      for(j <- 1 until m + 1) {
        dp(i)(j) = rank(
          dp(i - 1)(j),     // insertion
          dp(i - 1)(j - 1), // match
          dp(i)(j - 1),     // deletion 
          x(i - 1),         // current sample in x 
          y(j - 1)          // current sample in y
        )
      }
    } 

    dp(n)(m)
  }

}



The alignment class has a type parameter specifying the type of the elements in the sequences we want to align and takes the initialising function and the ranking function as input. Now DTW and the Edit distance are just specialisations with a different type parameter and scoring function:

  class DTW extends Alignment[Double](
    init = (i: Int, j: Int) => if(i == 0 && j == 0) 0.0 else Double.PositiveInfinity,
    rank = (insert: Double, matching: Double, delete: Double, x: Double, y: Double) =>
      if(insert < matching && insert < delete) insert + math.pow(x - y, 2)
      else if (delete < insert && delete < matching) delete + math.pow(x - y, 2)
      else matching + math.pow(x - y, 2)
  )


DTW is defined over doubles and is initialising the recursion with 0.0 at position (0,0) and all others to infinity. The ranking adds the euclidian distance to the insertion, deletion and match and then takes the minimum. It is easy to see how this can be generalised to multivariate sequences. The type parameter becomes Seq[Double] and the euclidean distance is implemented for vectors. One can also use kernels instead or the cosine similarity. The next function is the edit distance for discrete sequence elements (here characters):

  class Levensthein extends Alignment[Char](
    init = (i: Int, j: Int) => if(i == 0 && j == 0) 0.0 else if(i == 0) j else i,
    rank = (insert: Double, matching: Double, delete: Double, x: Char, y: Char) =>
      if(insert < matching && insert < delete) insert + 1
      else if (delete < insert && delete < matching) delete + 1
      else matching + (if(x == y) 0.0 else 1.0)
  )


The function is initialised as the index at the borders and the function again takes the minimum of insertion, deletion and match. Insertions and deletions count as an error so we add 1. A match can also be a substitution so we add 1 to the error if the characters do not match. The last algorithm is the Needleman-Wunsch algorithm. Instead of minimising the distance we are maximising a score. The similarity between two symbols can be defined and the 'gap' penalty is not 1 as in the edit distance but given.

  final val Sim = Map(
    ('a', 'a') -> 10,
    ('a', 'g') -> -1,
    ('g', 'a') -> -1,
    ('a', 'c') -> -3,
    ('c', 'a') -> -3,
    ('a', 't') -> -4,
    ('t', 'a') -> -4,
    ('g', 'g') ->  7,
    ('g', 'c') -> -5,
    ('c', 'g') -> -5,
    ('g', 't') -> -3,
    ('t', 'g') -> -3,
    ('c', 'c') ->  9,
    ('c', 't') ->  0,
    ('t', 'c') ->  0,
    ('t', 't') ->  8
  )

  final val Gap = -5
  
  class NeedlemanWunsch extends Alignment[Char](
    init = (i: Int, j: Int) => if(i == 0 && j == 0) 0.0 else if(i == 0) j * Gap else i * Gap,
    rank = (insert: Double, matching: Double, delete: Double, x: Char, y: Char) =>
      if(insert > matching && insert > delete) insert + Gap
      else if (delete > insert && delete > matching) delete + Gap
      else matching + Sim(x, y)
  )



If one needs the actual alignment, we can backtrack the alignment process in all above cases. The code can be found here [GitHub]