Cookbook

Subsequence extraction

myseq  = s'CAATAGAGACTAAGCATTAT'
sublen = 5
stride = 2

# explicit for-loop
for subseq in myseq.split(sublen, stride):
    print subseq

# pipelined
myseq |> split(sublen, stride) |> echo

k-mer extraction

myseq  = s'CAATAGAGACTAAGCATTAT'
type K = Kmer[5]
stride = 2

# explicit for-loop
for subseq in myseq.kmers[K](stride):
    print subseq

# pipelined
myseq |> kmers[K](stride) |> echo

Reverse complementation

# sequences
s = s'GGATC'
print ~s     # GATCC

# k-mers
k = k'GGATC'
print ~k     # GATCC

k-mer Hamming distance

k1 = k'ACGTC'
k2 = k'ACTTA'
#        ^ ^
print abs(k1 - k2)  # Hamming distance = 2

k-mer Hamming neighbors

def neighbors(kmer):
    for i in range(len(kmer)):
        for b in (k'A', k'C', k'G', k'T'):
            if kmer[i] != b:
                yield kmer |> base(i, b)

print list(neighbors(k'AGC'))  # CGC, GGC, etc.

k-mer minimizer

def minimizer[K](s: seq):
    assert len(s) >= K.len()
    kmer_min = K(s)
    for kmer in s[1:].kmers[K](1):
        kmer = min(kmer, ~kmer)
        if kmer < kmer_min: kmer_min = kmer
    return kmer_min

print minimizer[Kmer[10]](s'ACGTACGTACGT')

de Bruijn edge

def de_bruijn_edge[K](a: K, b: K):
    a = a |> base(0, k'A')  # reset first base: [T]GAG -> [A]GAG
    b = b >> s'A'           # shift right to A: [GAG]C -> A[GAG]
    return a == b           # suffix of a == prefix of b

print de_bruijn_edge(k'TGAG', k'GAGC')  # True
print de_bruijn_edge(k'TCAG', k'GAGC')  # False

Count bases

type BaseCount(A: int, C: int, G: int, T: int):
    def __add__(self: BaseCount, other: BaseCount):
        a1, c1, g1, t1 = self
        a2, c2, g2, t2 = other
        return (a1 + a2, c1 + c2, g1 + g2, t1 + t2)

def count_bases(s: seq) -> BaseCount:
    match s:
        case s'A...': return count_bases(s[1:]) + (1,0,0,0)
        case s'C...': return count_bases(s[1:]) + (0,1,0,0)
        case s'G...': return count_bases(s[1:]) + (0,0,1,0)
        case s'T...': return count_bases(s[1:]) + (0,0,0,1)
        case _: return BaseCount(0,0,0,0)

Reverse-complement palindrome

def is_own_revcomp(s: seq) -> bool:
    match s:
        case s'A...T' or s'T...A' or s'C...G' or s'G...C':
            return is_own_revcomp(s[1:-1])
        case s'':
            return True
        case _:
            return False

Sequence alignment

# default parameters
s1 = s'CGCGAGTCTT'
s2 = s'CGCAGAGTT'
aln = s1 @ s2
print aln.cigar, aln.score

# custom parameters
# match = 2; mismatch = 4; gap1(k) = 2k + 4; gap2(k) = k + 13
aln = s1.align(s2, a=2, b=4, gapo=4, gape=2, gapo2=13, gape2=1)
print aln.cigar, aln.score

Reading FASTA/FASTQ

# iterate over everything
for r in FASTA('genome.fa'):
    print r.name
    print r.seq

# iterate over sequences
for s in FASTA('genome.fa') |> seqs:
    print s

# iterate over everything
for r in FASTQ('reads.fq'):
    print r.name
    print r.read
    print r.qual

# iterate over sequences
for s in FASTQ('reads.fq') |> seqs:
    print s

Reading paired-end FASTQ

for r1, r2 in zip(FASTQ('reads_1.fq'), FASTQ('reads_2.fq')):
    print r1.name, r2.name
    print r1.read, r2.read
    print r1.qual, r2.qual

Parallel FASTQ processing

def process(s: seq):
    ...

# OMP_NUM_THREADS environment variable controls threads
FASTQ('reads.fq') |> iter ||> process

# Sometimes batching reads into blocks can improve performance,
# especially if each is quick to process.
FASTQ('reads.fq') |> blocks(size=1000) ||> iter |> process

Reading SAM/BAM/CRAM

# iterate over everything
for r in SAM('alignments.sam'):
    print r.name
    print r.read
    print r.pos
    print r.mapq
    print r.cigar
    print r.reversed
    # etc.

for r in BAM('alignments.bam'):
    # ...

for r in CRAM('alignments.cram'):
    # ...

# iterate over sequences
for s in SAM('alignments.sam') |> seqs:
    print s

for s in BAM('alignments.bam') |> seqs:
    print s

for s in CRAM('alignments.cram') |> seqs:
    print s

DNA to protein translation

dna = s'AGGTCTAACGGC'
protein = dna |> translate
print protein  # RSNG

Reading protein sequences from FASTA

for s in pFASTA('seqs.fasta') |> seqs:
    print s