Source code for pantable.io
from __future__ import annotations
import csv
import io
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
from .util import EmptyTableError
if TYPE_CHECKING:
from typing import List
from .ast import PanTableOption
logger = getLogger('pantable')
[docs]def load_csv(
data: str,
options: PanTableOption,
) -> List[List[str]]:
'''loading CSV table
Note that this can emit EmptyTableError, FileNotFoundError
'''
include = options.include
# TODO: PY37
# encoding = encoding_ if (encoding_ := options.include_encoding) else None
encoding = options.include_encoding
# default include_encoding is ''
# default encoding below is None
if not encoding:
encoding = None
try:
with (
open(include, encoding=encoding, newline='')
) if include else (
io.StringIO(data, newline='')
) as f:
table_list = list(csv.reader(f, **options.csv_kwargs))
if table_list:
for row in table_list:
if row:
for i in row:
if i.strip():
return table_list
raise EmptyTableError
except FileNotFoundError:
raise FileNotFoundError(f'include path {include} not found.')
[docs]def load_csv_array(
data: str,
options: PanTableOption,
) -> np.ndarray[np.str_]:
'''loading CSV table in `numpy.ndarray`
Note that this can emit EmptyTableError, FileNotFoundError
'''
table_list = load_csv(data, options)
m = len(table_list)
n = max(len(row) for row in table_list)
res = np.full((m, n), '', dtype=np.object_)
for i, row in enumerate(table_list):
for j, cell in enumerate(row):
res[i, j] = cell
return res
[docs]def dump_csv(
data: np.ndarray[np.str_],
options: PanTableOption,
) -> str:
'''dump data as CSV string
'''
with io.StringIO(newline='') as f:
writer = csv.writer(f, **options.csv_kwargs)
writer.writerows(data)
return f.getvalue()
[docs]def dump_csv_io(
data: np.ndarray[np.str_],
options: PanTableOption,
) -> str:
'''dump data as CSV
it will mutate options.include if it is an invalid write path.
'''
_include = options.include
text = dump_csv(data, options)
if _include:
try:
include = Path(_include)
include.parent.mkdir(parents=True, exist_ok=True)
with open(include, 'x', encoding=options.include_encoding, newline='') as f:
f.write(text)
return ''
except (PermissionError, FileExistsError):
logger.error(f'Data cannot be written to file {options.include}, Overriding include path to empty...')
options.include = ''
return text