|
9 | 9 | from subprocess import Popen, PIPE
|
10 | 10 | import ast
|
11 | 11 | from collections import defaultdict
|
12 |
| -from typing import Dict, List, Optional, Union |
| 12 | +from typing import Dict, List, Optional, Union, Generator, Tuple |
13 | 13 | from pathlib import Path
|
14 | 14 |
|
15 | 15 | # install libraries
|
@@ -245,49 +245,103 @@ def launch_infernal(
|
245 | 245 | return gene_objs
|
246 | 246 |
|
247 | 247 |
|
248 |
| -def read_fasta(org: Organism, fna_file: Union[TextIOWrapper, list]) -> Dict[str, str]: |
249 |
| - """Reads a fna file (or stream, or string) and stores it in a dictionary with contigs as key and sequence as value. |
| 248 | +def check_sequence_tuple(name: str, sequence: str): |
| 249 | + """ |
| 250 | + Checks and validates a sequence name and its corresponding sequence. |
| 251 | +
|
| 252 | + :param name: The name (header) of the sequence, typically extracted from the FASTA file header. |
| 253 | + :param sequence: The sequence string corresponding to the name, containing the nucleotide or protein sequence. |
250 | 254 |
|
251 |
| - :param org: Organism corresponding to fasta file |
252 |
| - :param fna_file: Input fasta file with sequences or list of each line as sequence |
| 255 | + :return: A tuple containing the validated name and sequence. |
253 | 256 |
|
254 |
| - :return: Dictionary with contig_name as keys and contig sequence in values |
| 257 | + :raises ValueError: |
| 258 | + - If the sequence is empty, a ValueError is raised with a message containing the header name. |
| 259 | + - If the name is empty, a ValueError is raised with a message containing a preview of the sequence. |
255 | 260 | """
|
256 |
| - global contig_counter |
257 |
| - try: |
258 |
| - contigs = {} |
259 |
| - contig_seq = "" |
260 |
| - contig = None |
261 |
| - for line in fna_file: |
262 |
| - if line.startswith(">"): |
263 |
| - if len(contig_seq) >= 1: # contig filter = 1 |
264 |
| - contigs[contig.name] = contig_seq.upper() |
265 |
| - contig.length = len(contig_seq) |
266 |
| - contig_seq = "" |
267 |
| - try: |
268 |
| - contig = org.get(line.split()[0][1:]) |
269 |
| - except KeyError: |
270 |
| - with contig_counter.get_lock(): |
271 |
| - contig = Contig(contig_counter.value, line.split()[0][1:]) |
272 |
| - contig_counter.value += 1 |
273 |
| - org.add(contig) |
274 |
| - else: |
275 |
| - contig_seq += line.strip() |
276 |
| - if len(contig_seq) >= 1: # processing the last contig |
277 |
| - contigs[contig.name] = contig_seq.upper() |
278 |
| - contig.length = len(contig_seq) |
279 |
| - |
280 |
| - except AttributeError as e: |
281 |
| - raise AttributeError( |
282 |
| - f"{e}\nAn error was raised when reading file: '{fna_file.name}'. " |
283 |
| - f"One possibility for this error is that the file did not start with a '>' " |
284 |
| - f"as it would be expected from a fna file." |
285 |
| - ) |
286 |
| - except Exception as err: # To manage other exception which can occur |
287 |
| - raise Exception( |
288 |
| - f"{err}: Please check your input file and if everything looks fine, " |
289 |
| - "please post an issue on our github" |
| 261 | + if not sequence: |
| 262 | + raise ValueError(f"Found an empty sequence with header '{name}'") |
| 263 | + |
| 264 | + if not name: |
| 265 | + raise ValueError( |
| 266 | + f"Found a sequence with empty name (sequence starts as '{sequence[:60]}')" |
290 | 267 | )
|
| 268 | + |
| 269 | + return name, sequence |
| 270 | + |
| 271 | + |
| 272 | +def parse_fasta( |
| 273 | + fna_file: Union[TextIOWrapper, list] |
| 274 | +) -> Generator[Tuple[str, str], None, None]: |
| 275 | + """Yields each sequence name and sequence from a FASTA file or stream as a tuple. |
| 276 | +
|
| 277 | + :param fna_file: Input FASTA file or list of lines as sequences. |
| 278 | + :yield: Tuple with contig header (without '>') and sequence. |
| 279 | + :raises ValueError: If the file does not contain valid FASTA format. |
| 280 | + """ |
| 281 | + name = None |
| 282 | + sequence = "" |
| 283 | + |
| 284 | + for line in fna_file: |
| 285 | + line = line.strip() |
| 286 | + |
| 287 | + if line.startswith(">"): # New header |
| 288 | + if name: # Yield previous header and sequence if available |
| 289 | + yield check_sequence_tuple(name, sequence) |
| 290 | + |
| 291 | + name = line[1:].split()[ |
| 292 | + 0 |
| 293 | + ] # Strip '>' and extract the first word as the name |
| 294 | + sequence = "" |
| 295 | + |
| 296 | + elif line: # Only append non-empty lines |
| 297 | + sequence += line |
| 298 | + |
| 299 | + else: |
| 300 | + # You can skip or handle empty lines here if required |
| 301 | + pass |
| 302 | + |
| 303 | + # Yield the final contig if exists |
| 304 | + if name: |
| 305 | + yield check_sequence_tuple(name, sequence) |
| 306 | + |
| 307 | + # Check if there was any valid data (at least one header and sequence) |
| 308 | + if not name: |
| 309 | + raise ValueError("The file does not contain any valid FASTA content.") |
| 310 | + |
| 311 | + |
| 312 | +def get_contigs_from_fasta_file( |
| 313 | + org: Organism, fna_file: Union[TextIOWrapper, list] |
| 314 | +) -> Dict[str, str]: |
| 315 | + """Processes contigs from a parsed FASTA generator and stores in a dictionary. |
| 316 | +
|
| 317 | + :param org: Organism instance to update with contig info. |
| 318 | + :param fna_file: Input FASTA file or list of lines as sequences. |
| 319 | + :return: Dictionary with contig names as keys and sequences as values. |
| 320 | + """ |
| 321 | + |
| 322 | + global contig_counter |
| 323 | + contigs = {} |
| 324 | + |
| 325 | + for contig_name, sequence in parse_fasta(fna_file): |
| 326 | + |
| 327 | + # Retrieve or create the contig |
| 328 | + try: |
| 329 | + contig = org.get(contig_name) |
| 330 | + except KeyError: |
| 331 | + with contig_counter.get_lock(): |
| 332 | + contig = Contig(contig_counter.value, contig_name) |
| 333 | + contig_counter.value += 1 |
| 334 | + org.add(contig) |
| 335 | + |
| 336 | + # Update contig information |
| 337 | + if contig.length is not None and contig.length != len(sequence): |
| 338 | + raise ValueError( |
| 339 | + f"Length mismatch for contig {contig_name}: expected {contig.length}, found {len(sequence)} from the fasta sequence." |
| 340 | + ) |
| 341 | + |
| 342 | + contig.length = len(sequence) |
| 343 | + contigs[contig_name] = sequence.upper() |
| 344 | + |
291 | 345 | return contigs
|
292 | 346 |
|
293 | 347 |
|
@@ -464,7 +518,7 @@ def annotate_organism(
|
464 | 518 |
|
465 | 519 | fasta_file = read_compressed_or_not(file_name)
|
466 | 520 |
|
467 |
| - contig_sequences = read_fasta(org, fasta_file) |
| 521 | + contig_sequences = get_contigs_from_fasta_file(org, fasta_file) |
468 | 522 | if is_compressed(file_name): # TODO simply copy file with shutil.copyfileobj
|
469 | 523 | fasta_file = write_tmp_fasta(contig_sequences, tmpdir)
|
470 | 524 | if procedure is None: # prodigal procedure is not force by user
|
|
0 commit comments