Source code for dnachisel.biotools.bowtie
import subprocess
import tempfile
import os
def run_process(name, parameters):
process = subprocess.run(
parameters,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
if process.returncode:
error = process.stderr.decode()
raise OSError("%s failed:\n\n%s" % (name, error))
return process.stdout
def create_bowtie_index_from_sequences(sequences, path):
fasta_path = os.path.join(path, 'sequences.fa')
bowtie_path = os.path.join(path, 'bowtie')
with open(fasta_path, 'w') as f:
f.write("\n".join([
">%d\n%s" % (i, sequence)
for i, sequence in enumerate(sequences)
]))
run_process("build-bowtie", [
"bowtie-build", "-f", fasta_path, bowtie_path, "--quiet"
])
return bowtie_path
[docs]def find_all_bowtie_matches(
sequence, bowtie_index_path, match_length, max_mismatches=0
):
"""Return (short) matches between a sequence and a Bowtie index.
The result is of the form [(start, end), n_mismatches)] where (start, end)
indicates the position of the match in the sequence, and n_mismatches is
the number of mismatches with the closest homology in the index.
"""
# CREATE THE PARAMETERS
parameters = ["bowtie"]
parameters += ["--best", "-k", "1"] # only return the best alignments
parameters += ["-v", str(max_mismatches)] # only allow that N mismatches
parameters += [bowtie_index_path]
parameters += ["--quiet", "--suppress", "2,3,4,5,6,7"] # small output
k = match_length
kmers = [sequence[i : i + k] for i in range(len(sequence) - k + 1)]
if k * len(kmers) < 10000:
# Input the sequences directly
tmp_fasta_path = None
parameters += ["-c", ",".join(kmers)]
else:
# Write sequences to a file if too many.
tmp_fasta_path = tempfile.mktemp(".fa")
with open(tmp_fasta_path, "w") as f:
entries = [">%d\n%s" % (i, s) for i, s in enumerate(kmers)]
f.write("\n\n".join(entries))
parameters += ["-f", tmp_fasta_path]
# RUN THE PROCESS
try:
output = run_process("BOWTIE", parameters)
except Exception as err:
raise err
finally:
if tmp_fasta_path is not None:
os.remove(tmp_fasta_path)
output_records = [
line.split("\t") for line in output.decode().split("\n") if len(line)
]
return [
((int(index), int(index) + k), edits.count(":"))
for index, edits in output_records
]