Introduction

In the previous post I have compared various languages and libraries in terms of their speed. This notebook contains the code used in the comparison as well as some details about the choices made to improve the performance of numpy implementation.

Setup

 
# ! pip install pandas

Getting data

import pandas as pd
def get_data(path):
    fasta_df = pd.read_csv(path, lineterminator=">", header=None)
    fasta_df[['id', 'seq']] = fasta_df[0].str.split('\n', expand=True)[[0,1]]
    return fasta_df.seq.to_numpy(dtype=str)
seqs = get_data('picked_msa.fasta')

Numpy implementation

Just to remind the pseudo code looks like this:

for seq1 in seqs:
  for seq2 in seqs:
    if count_mathes(seq1, seq2) > threshold:
      weight +=1
  meff += 1/weight

meff = meff/(len(seq1)^0.5)

This Numpy implementation is based on pure python implementation which can be found here.

The main differences are:

  • uses numpy arrays and operators.
  • sequences are in vectorised fashion using np.equal rather than looping over each element in the sequence.
  • sequences are converted to arrays of integers rather than characters.
import numpy as np
def get_nf_python_numpy(seqs, threshold=0.8):
    seqs = seqs.view(np.uint32).reshape(seqs.shape[0], -1)
    n_seqs = len(seqs)
    is_same_cluster = np.ones([n_seqs, n_seqs], np.bool_) 
    for i in range(n_seqs):
        for j in range(i+1, n_seqs):
            identity = np.equal(seqs[i], seqs[j]).mean()
            is_more = np.greater(identity, threshold)
            is_same_cluster[i,j] = is_more
            is_same_cluster[j,i] = is_more
    meff = 1.0/np.sum(is_same_cluster,1)
    return meff.sum()/(seqs.shape[-1]**0.5)
%%timeit -n 3 -r 3
get_nf_python_numpy(seqs[:2500])
23.3 s ± 181 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
get_nf_python_numpy(seqs[:100])
0.18006706787628665

Although performance gain is substantial, I still have two loops which is one of the first things you want to tackle when trying to improve algorithm speed. In practise, you want to use vectorization to take advantage of CPU ability to perform the same operation on vectors.

Ideally, I would like to do all to all pairwise comparison in one vectorized operation. Unfortunately, this is not a viable option due to memory constraints . Such operation will require n * n * l size matrix. In my case, that would result into ~70GB size matrix (n = 10000, l = 683, boolean uses 1 byte)

$$10000^2 *683 = 68.3*10^9 \ bytes = 68.3 \ gigabytes$$

To be able to run such an algorithm on an everyday computer, the memory requirement has to be lower. Usually, this can be solved with batching, taking only a portion of data at the time. In this case, I will perform a one-to-all comparison at the time. This will require n*l memory which is 10000 (~7MB) times less than for the all-to-all version.

Note, I still can exploit the fact that the matrix of identity is symmetric which means that only the first iteration has to be done with n-1 sequences, the second sequence needs only be compared with n-2 and so on.

def get_one_to_all_comparison(seqs, threshold=0.8):
    pairwise_id = np.equal(seqs[1:], seqs[0].T).mean(1)
    return pairwise_id > threshold
def get_nf_numpy_v2(seqs):
    seqs = seqs.view(np.uint32).reshape(seqs.shape[0], -1)
    n_seqs, seq_len = seqs.shape
    is_same_cluster = np.ones([n_seqs, n_seqs],np.bool_)
    for i in range(n_seqs-1):
        out = get_one_to_all_comparison(seqs[i:])
        is_same_cluster[i, i+1:] = out
        is_same_cluster[i+1:, i] = out
    s = 1.0/is_same_cluster.sum(1)
    return s.sum()/(seq_len**0.5)
%%timeit -n 3 -r 3
get_nf_numpy_v2(seqs[:2500])
1.99 s ± 47.7 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
get_nf_numpy_v2(seqs[:100])
0.18006706787628665

By using numpy, the algorithm's runtime decreases significantly. The gain on 1% of the data is roughly 100 times (might differ depending on the hardware). The improvement will scale with the amount of data that is being processed so on the data set the gain will be even larger.

Multiprocessing

There is a limit to how much an algorithm can be improved. However, more speed can be gained by simply exploiting the sheer size of infrastructure, i.e. multiple CPUs/machines. Unfortunately, this rarely happens automatically, unless you use libraries that have this ability built-in. Although Numpy has some functionality that runs in parallel, in this case some additional work is required.

In this case, sequence comparison does not need to be performed sequentially, thus can be parallelised.

Some effort were made to make code below work on Windows machines. As a result, we have to create a seperate file for worker code. More information here

 
import multiprocessing
from worker import *

def get_nf_numpy_mp(seqs, threads=None):
    
    seqs = seqs.view(np.uint32).reshape(seqs.shape[0], -1)
    n_seqs, seq_len = seqs.shape   
    
    with multiprocessing.Pool(threads, initializer=init, initargs=[seqs]) as pool:
        results = pool.map(get_one_to_all_comparison_global, range(n_seqs-1))

    is_same_cluster = np.ones([n_seqs, n_seqs],np.bool_) 
    for out in results:
        i = n_seqs - out.shape[0] - 1
        is_same_cluster[i, i+1:] = out
        is_same_cluster[i+1:, i] = out    
    meff = 1.0/is_same_cluster.sum(1)
    return meff.sum()/(seq_len**0.5)
seqs_ = seqs[:2500]
%%timeit -n 1 -r 1
if __name__ == "__main__":
    meff = get_nf_numpy_mp(seqs_)
    print(meff)
19.919439094557045
481 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -n 1 -r 1
meff = get_nf_numpy_v2(seqs_)
print(meff)
19.919439094557045
2.04 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

The benefit of multiprocessing will be determined by hardware. However, it is important to remember that there is an overhead of distributing the work across CPUs. As a result, if the algorithm takes a couple of seconds, multiprocessing might make it even slower. On the other hand, really big tasks can be speeded up by nearly the factor of number of threads on the machine where overhead becomes negligible.