Source code for pymlst.wg.extractors
"""Set of methods to extract different types of results from wgMLST"""
import abc
import importlib
import logging
import click
from abc import ABC
import pandas as pd
from pymlst.common import mafft, exceptions, utils
from pymlst.wg.core import Extractor
def read_gene_list(base, gene_file):
core = base.get_core_genes()
if gene_file is None:
return core
else:
select = []
for g in utils.strip_file(gene_file):
if g in core:
select.append(g)
else:
logging.debug("Gene {} not found in the database".format(g))
return select
[docs]class SequenceExtractor(Extractor):
""" Extracts coregene sequences into fasta file."""
[docs] def __init__(self, file=None, reference=False):
"""
:param file: Path of the file containing the coregens to extract
"""
self.list_file = file
self.reference = reference
[docs] def extract(self, base, output):
coregene = read_gene_list(base, self.list_file)
logging.info("Number of gene to analyse : %s", len(coregene))
for gene in coregene:
if self.reference:
seq = base.get_gene_sequence_reference(gene)
output.write(">" + gene + "|reference" + "\n")
output.write(seq + "\n")
else:
seqs = base.get_gene_sequences(gene)
for seq in seqs:
output.write(">" + gene + "|" + str(seq[0]) + " "
+ ";".join(seq[1]) + "\n")
output.write(seq[2] + "\n")
[docs]class MsaExtractor(Extractor):
""" Compute Multiple Sequence Alignment (MSA) and extracts the aligned sequences. """
[docs] def __init__(self, file=None, realign=False):
"""
:param file: Path of the file containing the coregens to extract
:param realign: Realign genes with same length
"""
self.list_file = file
self.realign = realign
[docs] def extract(self, base, output):
coregene = read_gene_list(base, self.list_file)
if len(coregene) == 0:
raise exceptions.PyMLSTError('No valid genes selected, verify your genes list')
strains = base.get_all_strains()
duplicated = base.get_duplicated_genes()
sequences = {s: [] for s in strains}
for index, gene in enumerate(coregene):
if gene in duplicated:
logging.info("%s/%s | %s %s", index + 1, len(coregene), gene, "No: Repeat gene")
continue
seqs = base.get_gene_sequences(gene)
size = set()
for seq in seqs:
size.add(len(seq[2]))
if len(size) == 1 and self.realign is False:
self.add_sequence_strain(seqs, strains, sequences)
logging.info("%s/%s | %s %s", index + 1, len(coregene), gene, "Direct")
else:
genes = {str(s[0]): s[2] for s in seqs}
corrseqs = mafft.align(genes)
for seq in seqs:
seq[2] = corrseqs.get(str(seq[0]))
self.add_sequence_strain(seqs, strains, sequences)
logging.info("%s/%s | %s %s", index + 1, len(coregene), gene, "Align")
# output align result
for strain in strains:
output.write('>' + strain + "\n")
output.write("\n".join(map(str, sequences.get(strain))) + "\n")
[docs] def add_sequence_strain(self, seqs, strains, sequences):
"""Add a sequence to multi-align, take the first gene in case of repetition"""
size = 0
if len(seqs) > 0:
size = len(seqs[0][2])
for strain in strains:
seq = [i[2] for i in seqs if strain in i[1]]
if len(seq) == 0:
sequences.get(strain).append('-' * size)
elif len(seq) == 1:
sequences.get(strain).append(seq[0])
else:
raise exceptions.PyMLSTError(
'Repeated genes must be excluded in order to export alignment')
[docs]class TableExtractor(Extractor):
""" Extraction of cgMLST distance matrix, MLST profiles, Genes and Strains list from a wgMLST database. """
[docs] def __init__(self,
mincover=0,
keep=False,
duplicate=False,
inverse=False):
self.mincover = mincover
self.keep = keep
self.duplicate = duplicate
self.inverse = inverse
def get_valid_shema(self, base):
# read samples mlst
strains = base.get_all_strains()
# Minimun number of strain
if self.mincover < 0 or self.mincover > len(strains):
raise exceptions.PyMLSTError(
'Mincover must be between 0 and number of strains {}'.format(len(strains)))
# allgene
allgene = base.get_core_genes()
# duplicate gene
dupli = base.get_duplicated_genes()
# cover without duplication
count_souches = base.count_souches_per_gene()
# Count distinct gene
diff = base.count_sequences_per_gene()
# filter coregene that is not sufficient mincover or keep only different or return inverse
valid_shema = []
# Test different case for validation
for gene in allgene:
valid = []
if self.keep is True:
if diff.get(gene, 0) > 1:
valid.append(True)
else:
valid.append(False)
else:
valid.append(True)
if count_souches.get(gene, 0) >= self.mincover:
valid.append(True)
else:
valid.append(False)
if not self.duplicate:
if gene in dupli:
valid.append(False)
else:
valid.append(True)
else:
valid.append(True)
if self.inverse is False:
if sum(valid) == 3:
valid_shema.append(gene)
else:
if sum(valid) < 3:
valid_shema.append(gene)
# report
logging.info("Number of coregene used : %s/%s", len(valid_shema), len(allgene))
return(valid_shema)
[docs]class TableExtractorCommand(click.core.Command):
""" Options supported by :class:`~pymlst.wg.extractors.TableExtractor`. """
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.params.insert(0, click.core.Option(('--mincover', '-m'),
type=click.INT,
help='Minimum number of strains found to retain a gene (default:0)'))
self.params.insert(1, click.core.Option(('--keep', '-k'),
is_flag=True,
help='Keeps only gene with different alleles (omit missing).'))
self.params.insert(2, click.core.Option(('--duplicate', '-d'),
is_flag=True,
help='Keeps duplicate genes (default remove).'))
self.params.insert(3, click.core.Option(('--inverse', '-V'),
is_flag=True,
help='Keeps only gene that do not ' \
'match the filter of mincover or keep options.'))
[docs]class GeneExtractor(TableExtractor):
""" Extracts a list of genes from a wgMLST database. """
[docs] def extract(self, base, output):
valid_schema = super().get_valid_shema(base)
output.write("\n".join(sorted(valid_schema)) + "\n")
[docs]class StatsExtractor(Extractor):
""" Extracts stats, number of strains, coregenes and sequences from a wgMLST database. """
[docs] def extract(self, base, output):
infos = base.get_infos()
for c,v in zip(['name', 'source', 'species', 'version'], infos):
if v is None:
v = ""
output.write(c + "\t" + v + "\n")
output.write("Coregenes\t" + str(len(base.get_core_genes())) + "\n")
output.write("Strains\t" + str(len(base.get_all_strains())) + "\n")
output.write("Sequences\t" + str(base.count_sequences()) + "\n")
[docs]class StrainExtractor(TableExtractor):
""" Extracts a list of strains from a wgMLST database. """
[docs] def extract(self, base, output):
if self.count is False:
output.write("\n".join(base.get_all_strains()) + "\n")
else:
tmp = base.count_genes_per_souche(super().get_valid_shema(base))
for strain in base.get_all_strains():
output.write(strain + "\t" + str(tmp.get(strain)) + "\n")
[docs]class DistanceExtractor(TableExtractor):
""" Extracts a distance matrix from a wgMLST database. """
[docs] def extract(self, base, output):
if self.duplicate:
logging.warning("Calculate distance between strains " +
"using duplicate genes could reported bad result.")
strains = base.get_all_strains()
output.write(str(len(strains)) + "\n")
distance = base.get_strains_distances(super().get_valid_shema(base))
for strain in strains:
output.write(strain + "\t")
dist = [str(distance.get(strain, {}).get(s2, 0)) for s2 in strains]
output.write("\t".join(dist) + "\n")
[docs]class MlstExtractor(TableExtractor):
""" Extracts an MLST table from a wgMLST database. """
[docs] def extract(self, base, output):
valid_shema = super().get_valid_shema(base)
strains = base.get_all_strains()
mlst = base.get_mlst(valid_shema)
table = pd.DataFrame(columns=["#GeneId"] + strains)
rows = []
for gene in valid_shema:
row = {"#GeneId": gene}
mlstg = mlst.get(gene, {})
for strain in strains:
row[strain] = mlstg.get(strain, None)
rows.append(row)
table = pd.concat([table, pd.DataFrame.from_dict(rows)], ignore_index=True)
table = table.set_index('#GeneId')
if self.form == 'grapetree':
if self.duplicate:
logging.warnings("Export grapetree table " +
"using duplicate genes is not recommended.")
table = table.fillna(-1)
table = table.transpose()
else:
table = table.fillna("")
table.to_csv(output, sep='\t')