Source code for dnachisel.biotools.bowtie
import subprocess
import tempfile
import os
def run_process(name, parameters):
process = subprocess.run(parameters, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if process.returncode:
error = process.stderr.decode()
raise OSError("%s failed:\n\n%s" % (name, error))
return process.stdout
def create_bowtie_index_from_sequences(sequences, path):
fasta_path = os.path.join(path, "sequences.fa")
bowtie_path = os.path.join(path, "bowtie")
with open(fasta_path, "w") as f:
f.write(
"\n".join(
[">%d\n%s" % (i, sequence) for i, sequence in enumerate(sequences)]
)
)
run_process(
"build-bowtie", ["bowtie-build", "-f", fasta_path, bowtie_path, "--quiet"]
)
return bowtie_path
[docs]
def find_all_bowtie_matches(
sequence, bowtie_index_path, match_length, max_mismatches=0
):
"""Return (short) matches between a sequence and a Bowtie index.
The result is of the form [(start, end), n_mismatches)] where (start, end)
indicates the position of the match in the sequence, and n_mismatches is
the number of mismatches with the closest homology in the index.
"""
# CREATE THE PARAMETERS
parameters = ["bowtie"]
parameters += ["--best", "-k", "1"] # only return the best alignments
parameters += ["-v", str(max_mismatches)] # only allow that N mismatches
parameters += [bowtie_index_path]
parameters += ["--quiet", "--suppress", "2,3,4,5,6,7"] # small output
k = match_length
kmers = [sequence[i : i + k] for i in range(len(sequence) - k + 1)]
if k * len(kmers) < 10000:
# Input the sequences directly
tmp_fasta_path = None
parameters += ["-c", ",".join(kmers)]
else:
# Write sequences to a file if too many.
tmp_fasta_path = tempfile.mktemp(".fa")
with open(tmp_fasta_path, "w") as f:
entries = [">%d\n%s" % (i, s) for i, s in enumerate(kmers)]
f.write("\n\n".join(entries))
parameters += ["-f", tmp_fasta_path]
# RUN THE PROCESS
try:
output = run_process("BOWTIE", parameters)
except Exception as err:
raise err
finally:
if tmp_fasta_path is not None:
os.remove(tmp_fasta_path)
output_records = [
line.split("\t") for line in output.decode().split("\n") if len(line)
]
return [
((int(index), int(index) + k), edits.count(":"))
for index, edits in output_records
]