Source code for emlib.csvtools

"""
Utilities to read and write csv files
"""
from __future__ import annotations
import csv as _csv
import os as _os
import numpy
from fractions import Fraction as _Fraction
from collections import namedtuple as _namedtuple
import re as _re
from .containers import RecordList
from typing import Sequence as Seq, List
import dataclasses
from . import misc


def _as_number_if_possible(s: str, fallback=None, accept_fractions: bool = True,
                           accept_expon=False):
    n = misc.asnumber(s, accept_fractions=accept_fractions, accept_expon=accept_expon)
    return n if n is not None else fallback


[docs] def replace_non_alpha(s: str) -> str: """ Remove any non-alphanumeric characters, replace spaces with _ Args: s: the string to sanitize Returns: a copy of s with all non-alphanumeric characters removed """ TRANSLATION_STRING = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'__x+,__/0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff' s = s.translate(TRANSLATION_STRING) s = _re.sub("[\[\]#:\(\)]", "", s) s.replace(" ", "_") return s
def _normalize_column_name(name: str) -> str: name = replace_non_alpha(name) if name and name[0] in '0123456789': name = 'n' + name name = name.strip().rstrip('_') name = name.replace(" ", "") return name if name else 'untitled' def _treat_duplicates(columns: Seq[str]) -> list[str]: names: dict[str, int] = {} new_names = [] for column in columns: if column not in names: names[column] = 1 new_name = column else: n = names[column] n += 1 names[column] = n new_name = "%s_%d" % (column, n) new_names.append(new_name) return new_names
[docs] def readcsv_numpy(csvfile: str) -> numpy.ndarray: """ Read CSV into a numpy array Args: csvfile: the file to read Returns: the contents of the file as a 2D numpy array """ return numpy.genfromtxt(csvfile, names=None, delimiter=',')
class _Rows(list): def __init__(self, seq=None): super(_Rows, self).__init__() self._firstappend = True self.columns = None if seq: for elem in seq: self.append(elem) def append(self, namedtup): if self._firstappend: self.columns = namedtup._fields list.append(self, namedtup)
[docs] def readcsv(csvfile: str, columns: list[str] = None, asnumber: bool = True, accept_exponential_numbers: bool = False, typeconversions: dict = None, prefer_fractions: bool = False, dialect: str = 'excel', first_row_header=True ) -> RecordList: """ Read a CSV file into a namedtuple If the first collumn is all text, assume these are the column names Args: columns: a seq of column names, if the first row of data is not a list header asnumber: convert strings to numbers if they can be converted typeconversions: if given, a dict of the form {column:type} accept_exponential_numbers: if True, parse a string 1.5e4 as a number prefer_fractions: If True, interpret expressions like 3/4 as Fractions, otherwise, as str. Returns: a :class:`~emlib.containers.RecordList` """ assert dialect in _csv.list_dialects() mode = "U" f = open(csvfile, mode) r = _csv.reader(f, dialect=dialect) firstrow = next(r) if columns is not None: assert isinstance(columns, (tuple, list)) else: if first_row_header and all(misc.asnumber(x) is None for x in firstrow): columns = firstrow else: raise TypeError("Can't infer column names. Pass the column names as arguments.") normalized_columns = [_normalize_column_name(col) for col in columns] columns = _treat_duplicates(normalized_columns) Row = _namedtuple('Row', ' '.join(columns)) numcolumns = len(columns) rows = _Rows() for row in r: if asnumber: row = [_as_number_if_possible(cell, fallback=cell, accept_fractions=prefer_fractions, accept_expon=accept_exponential_numbers) for cell in row] elif typeconversions: row = [] for i, cell in enumerate(row): func = typeconversions.get(i) if func: cell = func(cell) row.append(cell) if len(row) == numcolumns: rows.append(Row(*row)) else: row.extend([''] * (numcolumns - len(row))) row = row[:numcolumns] rows.append(Row(*row)) return RecordList(rows)
[docs] def write_records_as_csv(records: list, outfile: str) -> None: """ Write the records as a csv file Args: records: a list of dataclass objects or namedtuples (anything with a '_fields' attribute) outfile: the path to save the csv file """ r0 = records[0] if dataclasses.is_dataclass(r0): column_names = [field.name for field in dataclasses.fields(r0)] records = [dataclasses.astuple(rec) for rec in records] elif hasattr(r0, "_fields"): column_names = r0._fields else: raise TypeError("records should be a namedtuple or a dataclass") f = open(outfile, 'w', newline='', encoding='utf-8') w = _csv.writer(f) w.writerow(column_names) for record in records: w.writerow(record) f.close()
[docs] def writecsv(rows: list, outfile: str, column_names: Seq[str] = None) -> None: """ write a sequence of tuples/named tuples/dataclasses to outfile as CSV Args: rows: a list of tuples (one per row), namedtuples, dataclasses, etc. If namedtuples/dataclasses are passed, the column named are used. outfile: the path of the file to write column_names: needed if simple tuples/lists are passed """ firstrow = rows[0] rowsiter = rows if dataclasses.is_dataclass(firstrow): if column_names is None: fields = dataclasses.fields(firstrow) column_names = [f.name for f in fields] rowsiter = (dataclasses.astuple(row) for row in rows) elif hasattr(firstrow, '_fields'): if column_names is None: column_names = firstrow._fields outfile = _os.path.splitext(outfile)[0] + '.csv' f = open(outfile, 'w', newline='', encoding='utf-8') f_write = f.write w = _csv.writer(f) if column_names: w.writerow(column_names) for row in rowsiter: w.writerow(row) f.close()