"""
ECSV Engine Module
--------------------------
This module provides functionality for reading and writing Enhanced Character Separated
Values (ECSV) files using various backends, including:
- `PyArrow <https://arrow.apache.org/docs/python/>`_
- `pandas <https://pandas.pydata.org/>`_
- `Astropy's ASCII engine <https://docs.astropy.org/en/stable/io/ascii/>`_
ECSV is a human-readable, YAML-encoded table format used in the Astropy
ecosystem for storing tables with metadata, units, and complex data types.
Key Features
------------
- Defines data structures for representing ECSV column and header metadata.
- Implements multiple ECSV reader engines, supporting PyArrow, pandas, and Astropy's
ASCII CSV readers.
- Handles conversion between ECSV datatypes and numpy/pandas/pyarrow types, including
support for JSON-encoded columns, multidimensional arrays, and masked data.
- Provides robust parsing of ECSV headers and data, including support for compressed
files and in-memory file-like objects.
- Ensures compatibility with legacy ECSV files and provides liberal error handling for
unknown datatypes.
- Integrates with Astropy's Unified I/O registry for seamless reading and writing of
ECSV files.
Main Classes and Functions
--------------------------
- ``ColumnECSV``: Represents the attributes of a column as described in the ECSV header.
- ``ECSVHeader``: Encapsulates the parsed ECSV header, including column definitions and
table metadata.
- ``ECSVEngine`` and subclasses: Abstract base class and concrete implementations for
different CSV parsing engines.
- ``read_ecsv``: Reads an ECSV file and returns an Astropy ``Table`` object, handling all
necessary conversions and metadata.
- ``write_ecsv``: Writes an Astropy ``Table`` to an ECSV file.
- ``register_pyarrow_ecsv_table``: Registers the PyArrow ECSV reader/writer with
Astropy's I/O registry.
Usage
-----
This module is intended for internal use within Astropy and for advanced users who need
fine-grained control over ECSV parsing and engine selection. For most users, reading and
writing ECSV files can be accomplished via the high-level ``Table.read`` and
``Table.write`` interfaces, e.g.:
```
Table.read(filename, format="ecsv", engine="pyarrow.csv")
```
Dependencies
------------
- numpy
- astropy.table
- pyarrow (optional, for PyArrow engine)
- pandas (optional, for pandas engine)
"""
import abc
import collections
import functools
import io
import json
import os
import re
import warnings
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Final, NamedTuple
import numpy as np
import numpy.typing as npt
from astropy.utils.data import get_readable_fileobj
if TYPE_CHECKING:
from astropy.table import SerializedColumn, Table
__all__ = [
"ColumnECSV",
"ECSVEngine",
"ECSVEngineIoAscii",
"ECSVEnginePandas",
"ECSVEnginePyArrow",
"ECSVHeader",
"read_ecsv",
"register_ecsv_table",
"write_ecsv",
]
ECSVEngines: Final[dict[str, "ECSVEngine"]] = {}
class DerivedColumnProperties(NamedTuple):
"""Named tuple for derived properties of a ECSV column specification.
Attributes
----------
csv_np_type : str
Numpy type string for the CSV column data, e.g. "int64", "float32", "str".
This is derived from the ECSV `datatype` and `subtype`.
dtype : str
Numpy dtype in the final column data. This may differ from `csv_np_type` in
some cases, e.g. for JSON-encoded columns.
shape : tuple[int, ...]
Shape of the final column data as a tuple of integers. This is derived from the
ECSV `subtype` if applicable, or an empty tuple for scalar columns.
"""
csv_np_type: str
dtype: str
shape: tuple[int, ...]
[docs]
@dataclass(frozen=True)
class ColumnECSV:
"""
Class representing attributes of a column in an ECSV header.
Attributes
----------
name : str
The name of the column.
datatype : str
The data type of the column as specified in the ECSV header.
subtype : str or None, optional
The subtype of the column, if applicable.
unit : str or None, optional
The unit of the column values, if specified.
description : str or None, optional
A description of the column.
format : str or None, optional
The format string for the column values.
meta : dict or None, optional
Additional metadata associated with the column.
Properties
----------
csv_np_type : str
Numpy type string describing the column CSV data. In practice this is the same
as the ECSV ``datatype`` except that "string" => "str". This is provided to the
engine ``convert_np_type()`` method to generate the engine-specific type
provided to the CSV reader. For instance, for pandas the ``int32`` type gets
converted to ``Int32`` to read columns as a nullable int32.
dtype : np.dtype
Numpy dtype in the final column data. This may be entirely different from
``csv_np_type`` in some cases, in particular JSON-encoded fields.
shape : tuple of int
Shape of the final column data.
"""
name: str
datatype: str
subtype: str | None = None
unit: str | None = None
description: str | None = None
format: str | None = None
meta: dict | None = None
[docs]
@functools.cached_property
def csv_np_type(self) -> str:
"""Numpy type string describing the column CSV data."""
return self._derived_properties.csv_np_type
[docs]
@functools.cached_property
def dtype(self) -> np.dtype:
"""Numpy dtype in the final column data"""
return np.dtype(self._derived_properties.dtype)
[docs]
@functools.cached_property
def shape(self) -> tuple[int, ...]:
"""Shape of the column data"""
return self._derived_properties.shape
@functools.cached_property
def _derived_properties(self) -> DerivedColumnProperties:
"""Get the csv_np_type, dtype, and shape of the column from ECSV header."""
return get_csv_np_type_dtype_shape(self.datatype, self.subtype, self.name)
[docs]
class ECSVEngine(metaclass=abc.ABCMeta):
"""Base class for ECSV reader engines.
An engine is responsible for reading the raw CSV data that follows the ECSV header.
This assumes that the engine has a defined Table Unified I/O interface.
- `name` and `format` must be defined as class attributes in subclasses.
- `engines` is a base class-level dictionary that maps engine names to their
respective engine classes. Subclasses should not modify this directly.
Properties
----------
name : str
Name of the engine, used for ``engine`` parameter in a call like:
``Table.read(filename, format="ecsv", engine="pyarrow")``.
format : str
Format string for the engine CSV reader, e.g. "pyarrow.csv", "ascii.csv", etc.
engines : dict[str, ECSVEngine]
Dictionary mapping engine names to their respective engine classes.
"""
name: str | None = None
format: str | None = None
engines: dict[str, "ECSVEngine"] = {}
def __init_subclass__(cls, **kwargs):
"""Register the subclass as an ECSV engine."""
super().__init_subclass__(**kwargs)
# Ensure that the subclass has the required string class attributes.
for attr in ("name", "format"):
if not isinstance(val := getattr(cls, attr, None), str):
raise TypeError(
f"Subclasses of ECSVEngine must define a class attribute '{attr}' "
f"as a string, got {type(val)}."
)
cls.engines[cls.name] = cls
[docs]
@abc.abstractmethod
def convert_np_type(self, np_type: str) -> Any:
"""
Convert a numpy type string to engine-specific type for parsing.
For instance, for pandas the ``"int32"`` numpy type gets converted to an
``Int32Dtype()`` instance to read columns as a nullable int32.
Parameters
----------
np_type : str
The numpy type string to be converted.
Returns
-------
Any
Corresponding engine-specific type.
"""
[docs]
def get_converters(self, header):
"""
Get a dictionary of converters for the columns in the ECSV header.
This is used to convert column names to engine-specific types.
Parameters
----------
header : ECSVHeader
The ECSV header containing column definitions.
Returns
-------
dict[str, Any]
Dictionary mapping column names to engine-specific converters.
"""
return {col.name: self.convert_np_type(col.csv_np_type) for col in header.cols}
[docs]
@abc.abstractmethod
def get_data_kwargs(
self,
header: ECSVHeader,
null_values: list[str],
) -> dict[str, Any]:
"""
Generate a dictionary of keyword arguments for data parsing.
This accounts for the API variations in each engine CSV reader.
Parameters
----------
header : ECSVHeader
ECSVHeader object within header information.
null_values : list of str
List of strings with values to be interpreted as null or missing data.
Returns
-------
dict[str, Any]
Dict of keyword arguments to be passed to engine CSV reader.
"""
[docs]
class ECSVEnginePyArrow(ECSVEngine):
"""ECSV reader engine using PyArrow."""
name = "pyarrow"
format = "pyarrow.csv"
[docs]
def convert_np_type(self, np_type: str) -> str:
# PyArrow does not support float128 and there is no workaround (unlike float16).
if np_type == "float128":
raise TypeError(
"pyarrow engine does not support float128, choose a different engine"
)
# PyArrow does not support float16, so we need to convert it to float32.
# The final output is still cast as float16.
return "float32" if np_type == "float16" else np_type
[docs]
def get_data_kwargs(
self,
header: ECSVHeader,
null_values: list[str],
) -> dict[str, Any]:
# See base method for details.
kw = {}
kw["null_values"] = null_values
kw["header_start"] = header.n_header
kw["dtypes"] = self.get_converters(header)
return kw
[docs]
class ECSVEngineIoAscii(ECSVEngine):
"""ECSV reader engine using astropy.io.ascii Python CSV reader."""
name = "io.ascii"
format = "ascii.csv"
[docs]
def convert_np_type(self, np_type: str) -> np.generic:
# Convert the np_type string to a numpy dtype type like np.int32, np.float64,
# etc. This output is compatible with io.ascii `converters` option where is gets
# used.
return np.dtype(np_type).type
[docs]
def get_data_kwargs(
self,
header: ECSVHeader,
null_values: list[str],
) -> dict[str, Any]:
kw = {}
kw["fill_values"] = get_null_values_per_column(
header.cols, header.table_meta, null_values
)
kw["header_start"] = header.n_header - header.n_empty
kw["converters"] = self.get_converters(header)
# Fast reader does not support converters (defining types in advance) nor any
# encoding. Converters are required, e.g. for a string column that looks like
# floats. Would be nice to fix this, but in mean time use Python CSV reader.
kw["fast_reader"] = False
kw["strip_column_names"] = False
return kw
[docs]
class ECSVEnginePandas(ECSVEngine):
"""ECSV reader engine using pandas."""
name = "pandas"
format = "pandas.csv"
[docs]
def convert_np_type(self, np_type: str) -> np.dtype:
# Convert the np_type to a pandas dtype will support for nullable types.
import pandas as pd
dtype = np.dtype(np_type)
if dtype.kind in ("i", "u"):
# Convert int64 to Int64, uint32 to UInt32, etc for nullable types
converter = dtype.name.replace("i", "I").replace("u", "U")
elif dtype.kind == "b":
converter = "boolean"
else:
converter = np_type
return pd.api.types.pandas_dtype(converter)
[docs]
def get_data_kwargs(
self,
header: ECSVHeader,
null_values: list[str],
) -> dict[str, Any]:
fill_values = get_null_values_per_column(
header.cols, header.table_meta, null_values
)
null_values = collections.defaultdict(list)
converters = self.get_converters(header)
for null_value, _, col_name in fill_values:
null_values[col_name].append(null_value)
# Pandas parser does not natively parse nan or NaN for floats, so we need
# to declare this as a null value.
if converters[col_name].kind == "f":
for nan in ("nan", "NaN"):
null_values[col_name].append(nan)
kw = {
"na_values": null_values,
"keep_default_na": False,
"comment": "#",
"dtype": converters,
}
# Would prefer setting `"skiprows": header.n_header` above (as in the original
# implementation prior to #18756) instead of "comment": "#". However there is a
# bug in pandas.read_csv where skiprows does not work when the line includes a
# quote character, see https://github.com/pandas-dev/pandas/issues/62739.
return kw # noqa: RET504
def is_numpy_dtype(np_type: str) -> bool:
# Check if the given dtype is a valid numpy dtype.
try:
np.dtype(np_type)
except Exception:
return False
else:
return True
def get_header_lines(
input_file: str | os.PathLike | io.BytesIO,
encoding="utf-8",
) -> tuple[list[str], int, int, int]:
"""
Extract header lines from a file or file-like object.
This function reads a file or file-like object and extracts lines that
start with a specific header prefix ("# ") while skipping blank lines
and lines starting with a comment prefix ("##"). The function stops
reading at the first non-blank, non-comment line that does not match
the header prefix.
Parameters
----------
input_file : str | os.PathLike | io.BytesIO
The input file path or file-like object to read. If a file path is
provided, the function automatically handles compressed file formats
that are supported by `~astropy.utils.data.get_readable_fileobj`.
encoding : str, optional
The encoding used to decode the file content. Default is "utf-8".
Returns
-------
lines : list[str]
List of decoded header lines without the header prefix.
idx : int
Index of the last line read.
n_empty : int
Number of empty lines read.
n_comment : int
Number of comment lines read.
"""
header_prefix = "# ".encode(encoding)
comment_prefix = "##".encode(encoding)
lines = []
n_empty = 0
n_comment = 0
with get_readable_fileobj(input_file, encoding="binary") as f:
for idx, line in enumerate(f):
line_strip = line.strip()
if line_strip.startswith(header_prefix):
lines.append(line_strip[2:].decode(encoding))
elif not line_strip:
n_empty += 1
elif line_strip.startswith(comment_prefix):
n_comment += 1
else:
# Stop iterating on first failed comment match for a non-blank line
break
# Need to rewind the input file if it is a file-like object
if isinstance(input_file, io.BytesIO):
input_file.seek(0)
return lines, idx, n_empty, n_comment
def get_csv_np_type_dtype_shape(
datatype: str, subtype: str | None, name: str
) -> DerivedColumnProperties:
"""Get the csv_np_type, dtype, and shape of the column from datatype and subtype.
This function implements most of the complexity of the ECSV data type handling. The
ECSV standard allows for a wide variety of data types and subtypes, and we also need
to handle some legacy cases and be liberal in what we accept.
Parameters
----------
datatype : str
The data type of the column as specified in the ECSV header.
subtype : str or None
The subtype of the column, if applicable. This can include additional
information like array shape or JSON serialization.
name : str
The name of the column, used for error messages.
Returns
-------
CSVNpTypeDtypeShape
A named tuple containing:
- `csv_np_type`: Numpy type string for the CSV column data.
- `dtype`: Numpy dtype in the final column data.
- `shape`: Shape of the final column data as a tuple of integers.
Raises
------
ValueError
If the `datatype` or `subtype` is not recognized or cannot be converted to a
valid numpy dtype.
InconsistentTableError
If the `datatype` is not in the allowed ECSV datatypes and cannot be parsed as a
numpy dtype.
"""
from astropy.io.ascii.core import InconsistentTableError
from astropy.io.ascii.ecsv import ECSV_DATATYPES, InvalidEcsvDatatypeWarning
csv_np_type = "str" if datatype == "string" else datatype
dtype = csv_np_type
shape = ()
if datatype not in ECSV_DATATYPES:
msg = (
f"unexpected datatype {datatype!r} of column {name!r} "
f"is not in allowed ECSV datatypes {ECSV_DATATYPES}."
)
# Try being liberal on input if the `csv_np_type` (derived from ECSV
# `datatype`) looks like a numpy dtype. In this case, parse the column as
# string and then cast as `csv_np_type`. This allows for back-compatibility
# with early versions of io.ascii.ecsv that wrote and read e.g.
# datatype=datetime64.
if is_numpy_dtype(csv_np_type):
dtype = csv_np_type
csv_np_type = "str"
warnings.warn(msg, InvalidEcsvDatatypeWarning)
else:
# No joy, this is an exception
raise InconsistentTableError(msg)
if subtype and csv_np_type != "str":
# Note: the "column .. failed to convert" bit is odd here but it is to match
# the io.ascii.ecsv behavior.
raise ValueError(
f"column {name!r} failed to convert: "
f'datatype of column {name!r} must be "string"'
)
if subtype:
# Subtype can be written like "int64[2,null]" and we want to split this
# out to "int64" and [2, None].
if "[" in subtype:
idx = subtype.index("[")
dtype = subtype[:idx]
shape = tuple(json.loads(subtype[idx:]))
else:
dtype = subtype
# Map ECSV types to numpy dtypes
dtype = {"json": "object", "string": "str"}.get(dtype, dtype)
# Check if the subtype corresponds to a valid numpy dtype. This is required by
# the astropy implementation, but not by the ECSV standard. The standard states
# that an unknown subtype can be ignored, so that is what we do here (but with
# a warning).
if not is_numpy_dtype(dtype):
warnings.warn(
f"unexpected subtype {subtype!r} set for column "
f"{name!r}, using dtype={csv_np_type!r} instead.",
category=InvalidEcsvDatatypeWarning,
)
dtype = csv_np_type
return DerivedColumnProperties(csv_np_type, dtype, shape)
def read_header(
input_file: str | os.PathLike | io.BytesIO,
encoding: str = "utf-8",
) -> ECSVHeader:
"""
Read and parse the header of an ECSV (Enhanced Character Separated Values) input.
This function extracts and validates the ECSV header from the given input file,
parses the YAML metadata, and constructs the corresponding ECSVHeader object
containing column definitions and table metadata.
Parameters
----------
input_file : str, os.PathLike, or io.BytesIO
The path to the ECSV file or a file-like object containing the ECSV data.
encoding : str, optional
The encoding to use when reading the file. Default is 'utf-8'.
Returns
-------
ECSVHeader
An object containing header information, including the number of header lines,
number of empty lines, column attributes, table metadata, and delimiter.
Raises
------
InconsistentTableError
If the ECSV header is missing, malformed, or the YAML metadata cannot be parsed.
ValueError
If the delimiter specified in the header is not supported.
Notes
-----
The function expects the first non-blank comment line to be the ECSV version header,
and only space and comma are allowed as delimiters in the ECSV format.
"""
from astropy.io.ascii.core import InconsistentTableError
from astropy.io.ascii.ecsv import DELIMITERS
from astropy.table import meta
# Extract non-blank comment (header) lines with comment character stripped
header_lines, n_header, n_empty, n_comment = get_header_lines(
input_file, encoding=encoding
)
# Validate that this is a ECSV file
ecsv_header_re = r"""%ECSV [ ]
(?P<major> \d+)
\. (?P<minor> \d+)
\.? (?P<bugfix> \d+)? $"""
no_header_msg = (
'ECSV header line like "# %ECSV <version>" not found as first line.'
" This is required for a ECSV file."
)
if not header_lines:
raise InconsistentTableError(no_header_msg)
match = re.match(ecsv_header_re, header_lines[0].strip(), re.VERBOSE)
if not match:
raise InconsistentTableError(no_header_msg)
try:
header = meta.get_header_from_yaml(header_lines)
except meta.YamlParseError as e:
raise InconsistentTableError("unable to parse yaml in meta header") from e
table_meta = header.get("meta", None)
delimiter = header.get("delimiter", " ")
if delimiter not in DELIMITERS:
raise ValueError(
"only space and comma are allowed for delimiter in ECSV format"
)
# Create list of columns from `header`.
cols_attrs = [ColumnECSV(**col) for col in header["datatype"]]
return ECSVHeader(n_header, n_empty, n_comment, cols_attrs, table_meta, delimiter)
def read_data(
input_file: str | os.PathLike | io.BytesIO,
header: ECSVHeader,
null_values: list[str],
encoding: str = "utf-8",
engine_name: str = "io.ascii",
) -> "Table":
"""
Read the data from an ECSV table using the specified engine.
This function uses an engine-specific class to handle reading and converting
the data according to the ECSV specification and the selected backend.
Parameters
----------
input_file : str, os.PathLike, or io.BytesIO
The path to the input file or a file-like object containing the ECSV data.
header : ECSVHeader
The parsed ECSV header containing column definitions and metadata.
null_values : list of str
List of string values to interpret as null/missing values in the data.
encoding : str, optional
The encoding to use when reading the file. Default is "utf-8".
engine_name: str, optional
The backend engine to use for reading the data. Default is "io.ascii".
Built-in options are "pyarrow", "pandas", and "io.ascii".
Returns
-------
Table
An Astropy Table containing the data read from the ECSV file.
Raises
------
InconsistentTableError
If the column names from the ECSV header do not match the column names
in the data.
"""
from astropy.table import Table
engine = ECSVEngine.engines[engine_name]()
# Get the engine-specific kwargs for reading the CSV data.
kwargs = engine.get_data_kwargs(header, null_values)
data = Table.read(
input_file,
format=engine.format,
delimiter=header.delimiter,
encoding=encoding,
**kwargs,
)
# Ensure ECSV header names match the data column names.
ecsv_header_names = [col.name for col in header.cols]
if ecsv_header_names != data.colnames:
from astropy.io.ascii.core import InconsistentTableError
raise InconsistentTableError(
f"column names from ECSV header {ecsv_header_names} do not "
f"match names from header line of CSV data {data.colnames}"
)
return data
def get_str_vals(
data: np.ndarray | np.ma.MaskedArray,
) -> tuple[list[str] | npt.NDArray[np.str_], npt.NDArray[np.bool_] | None]:
"""Get the string values and the mask if available.
This assumes a 1-d input array of strings, possibly masked. This array comes from
reading the ECSV data, which is always a 1-d array. This function is only called if
that array is a numpy string array or a masked array of strings.
For a masked array it converts the data to the equivalent Python representation
(list of strings) and returns the mask as a separate array.
A list of strings is required in this case because the subsequent
``process_*_data`` functions substitute (in-place) a new string with the appropriate
JSON for an empty/masked fill value. In particular, if the original input consists
solely of empty strings (which is legal), the numpy string array will be not be wide
enough to hold the fill value.
For regular numpy arrays it simply returns the original data as a numpy array.
Parameters
----------
data : np.ndarray | np.ma.MaskedArray
The input data array to extract string values from.
Returns
-------
str_vals : list[str] | npt.NDArray[np.str_]
A list of strings or a 1D numpy array of strings representing the data.
mask : npt.NDArray[np.bool_] | None
A 1D numpy array of booleans indicating the mask, or None if not applicable.
"""
# For masked we need a list because for multidim the data under the mask is set
# to a compatible value.
if hasattr(data, "mask"):
# TODO: for not NUMPY_LT_2_0, try changing this to:
# str_vals = data.astype("T")
str_vals = data.view(np.ndarray).tolist()
mask = data.mask
else:
str_vals = data
mask = None
return str_vals, mask
def convert_column(
col: ColumnECSV,
data_in: np.ndarray | np.ma.MaskedArray,
) -> np.ndarray | np.ma.MaskedArray:
"""
Convert column data from original CSV numpy type to specified output numpy dtype.
This function handles both regular scalar columns and more complex columns such as:
- Object dtype columns containing arbitrary Python objects serializable to JSON.
- Variable-length array columns, where the last axis may vary in length.
- Fixed-shape multidimensional columns.
Depending on the column's dtype and shape, the function selects the appropriate
processing routine to convert the data, including deserialization from JSON where
necessary. For regular scalar columns, it casts the data to the target dtype if
needed.
Parameters
----------
col : ColumnECSV
The column specification, including dtype, shape, and name.
data_in : np.ndarray | np.ma.MaskedArray
The input data array to be converted.
Returns
-------
np.ndarray | np.ma.MaskedArray
The converted data array with the appropriate dtype and shape.
Raises
------
ValueError
If the data cannot be converted due to shape mismatch or invalid JSON content.
"""
try:
if col.dtype == "object" or col.shape:
# Handle three distinct column types where each row element is serialized to
# JSON. In this case ``data_in`` is an ndarray or MaskedArray of
# fixed-length string which are the JSON-encoded representation of the data.
# See docstring in `get_str_vals` for explanation of the next step, which
# has some subtlety.
str_vals, mask = get_str_vals(data_in)
if col.dtype == "object":
# Any Python objects serializable to JSON
process_func = process_object_data
elif col.shape[-1] is None:
# Variable length arrays with shape (n, m, ..., *) for fixed
# n, m, .. and variable in last axis.
process_func = process_variable_length_array_data
else:
# Multidim columns with consistent shape (n, m, ...).
process_func = process_fixed_shape_multidim_data
data_out, col_shape = process_func(col, str_vals, mask)
# Regular scalar value column
else:
data_out = data_in
# If we need to cast the data to a different dtype, do it now.
if data_out.dtype != col.dtype:
data_out = data_out.astype(col.dtype)
col_shape = col.shape
if data_out.shape[1:] != tuple(col_shape):
raise ValueError("shape mismatch between value and column specifier")
except json.JSONDecodeError:
raise ValueError(
f"column {col.name!r} failed to convert: column value is not valid JSON"
)
except Exception as exc:
raise ValueError(f"column {col.name!r} failed to convert: {exc}") from exc
return data_out
def process_object_data(
col: ColumnECSV,
str_vals: list[str] | npt.NDArray[np.str_],
mask: npt.NDArray[np.bool_] | None,
) -> tuple[np.ndarray | np.ma.MaskedArray, tuple[int, ...]]:
"""
Handle object columns where each row element is a JSON-encoded object.
The ECSV format only allows a 1-d column of object type.
Example::
# %ECSV 1.0
# ---
# datatype:
# - {name: objects, datatype: string, subtype: json}
# schema: astropy-2.0
objects
"{""a"":1}"
"{""b"":[2.5,null]}"
true
Parameters
----------
col : ColumnECSV
The column specification, including dtype, shape, and name.
str_vals : list[str] or 1-D ndarray[str]
JSON-encoded string representations of the data.
mask : 1-D ndarray[bool] or None
Boolean mask array 1-D indicating invalid or missing values. If None, no masking
is applied.
Returns
-------
data_out : numpy.ndarray or numpy.ma.MaskedArray
An array of objects reconstructed from `str_vals`, with the same shape as `col`.
If `mask` is provided, a masked array is returned with the mask applied.
col_shape : tuple[int, ...]
Expected shape of data_out, used in final sanity check of reading.
"""
if mask is not None:
for idx in np.nonzero(mask)[0]:
str_vals[idx] = "0" # could be "null" but io.ascii uses "0"
col_vals = [json.loads(val) for val in str_vals]
np_empty = np.empty if mask is None else np.ma.empty
data_out = np_empty((len(col_vals),) + tuple(col.shape), dtype=object)
data_out[...] = col_vals
if mask is not None:
data_out.mask = mask
return data_out, col.shape
def process_fixed_shape_multidim_data(
col: ColumnECSV,
str_vals: list[str] | npt.NDArray[np.str_],
mask: npt.NDArray[np.bool_] | None,
) -> tuple[np.ndarray | np.ma.MaskedArray, tuple[int, ...]]:
"""
Handle fixed-shape multidimensional columns as JSON-encoded strings.
Example::
# %ECSV 1.0
# ---
# datatype:
# - {name: array3x2, datatype: string, subtype: 'float64[3,2]'}
# schema: astropy-2.0
array3x2
[[0.0,1.0],[2.0,3.0],[4.0,5.0]]
[[6.0,7.0],[8.0,null],[10.0,11.0]]
Parameters
----------
col : ColumnECSV
The column specification, including dtype, shape, and name.
str_vals : array-like of str
Array of string representations of the data, typically JSON-encoded.
mask : numpy.ndarray or None
Boolean mask array indicating invalid or missing values. If None, no masking is
applied.
Returns
-------
data_out : numpy.ndarray or numpy.ma.MaskedArray
An array of objects reconstructed from `str_vals`, with the same shape as `col`.
If `mask` is provided, a masked array is returned with the mask applied.
col_shape : tuple[int, ...]
Expected shape of data_out, used in final sanity check of reading.
"""
# Change empty (blank) values in original ECSV to something
# like "[[null, null],[null,null]]" so subsequent JSON
# decoding works.
if mask is not None:
all_none_arr = np.full(shape=col.shape, fill_value=None, dtype=object)
fill_value = json.dumps(all_none_arr.tolist())
for idx in np.nonzero(mask)[0]:
str_vals[idx] = fill_value
col_vals = [json.loads(val) for val in str_vals]
# Make a numpy object array of col_vals to look for None (masked values)
arr_vals = np.array(col_vals, dtype=object)
arr_vals_mask = arr_vals == None
if np.any(arr_vals_mask):
# Replace all the None with an appropriate fill value
arr_vals[arr_vals_mask] = {"U": "", "S": b""}.get(col.dtype.kind, 0)
# Finally make a MaskedArray with the filled data + mask
data_out = np.ma.array(arr_vals.astype(col.dtype), mask=arr_vals_mask)
else:
data_out = arr_vals.astype(col.dtype)
return data_out, col.shape
def process_variable_length_array_data(
col: ColumnECSV,
str_vals: list[str] | npt.NDArray[np.str_],
mask: npt.NDArray[np.bool_] | None,
) -> tuple[np.ndarray | np.ma.MaskedArray, tuple[int, ...]]:
"""
Handle variable length arrays with shape (n, m, ..., *) as JSON-encoded strings.
Shape is fixed for n, m, .. and variable in last axis. The output is a 1-d object
array with each row element being an ``np.ndarray`` or ``np.ma.masked_array`` of the
appropriate shape.
Example::
# %ECSV 1.0
# ---
# datatype:
# - {name: array_var, datatype: string, subtype: 'int64[null]'}
# schema: astropy-2.0
array_var
[1,2]
[3,4,5,null,7]
[8,9,10]
Parameters
----------
col : ColumnECSV
The column specification, including dtype, shape, and name.
str_vals : list[str] or 1-D ndarray[str]
JSON-encoded string representations of the data.
mask : 1-D ndarray[bool] or None
Boolean mask array 1-D indicating invalid or missing values. If None, no masking
is applied.
Returns
-------
data_out : numpy.ndarray or numpy.ma.MaskedArray
An array of objects reconstructed from `str_vals`, with the same shape as `col`.
If `mask` is provided, a masked array is returned with the mask applied.
col_shape : tuple[int, ...]
Expected shape of data_out, used in final sanity check of reading.
"""
# Empty (blank) values in original ECSV are masked. Instead set the values
# to "[]" indicating an empty list. This operation also unmasks the values.
if mask is not None:
fill_value = "[]"
for idx in np.nonzero(mask)[0]:
str_vals[idx] = fill_value
# Remake as a 1-d object column of numpy ndarrays or
# MaskedArray using the datatype specified in the ECSV file.
col_vals = []
for str_val in str_vals:
obj_val = json.loads(str_val) # list or nested lists
try:
arr_val = np.array(obj_val, dtype=col.dtype)
except TypeError:
# obj_val has entries that are inconsistent with
# dtype. For a valid ECSV file the only possibility
# is None values (indicating missing values).
vals = np.array(obj_val, dtype=object)
# Replace all the None with an appropriate fill value
mask_vals = vals == None
vals[mask_vals] = {"U": "", "S": b""}.get(col.dtype.kind, 0)
arr_val = np.ma.array(vals.astype(col.dtype), mask=mask_vals)
col_vals.append(arr_val)
np_empty = np.empty if mask is None else np.ma.empty
data_out = np_empty(len(col_vals), dtype=object)
data_out[:] = col_vals
if mask is not None:
data_out.mask = mask
return data_out, ()
def get_null_values_per_column(
cols: list[ColumnECSV],
table_meta: dict | None,
null_values: list[str],
) -> list[tuple[str, str, str]]:
"""Get null and fill values for individual columns.
For ECSV to handle the corner case of data that has been serialized using the
serialize_method='data_mask' option, which writes the full data and mask directly,
AND where that table includes a string column with zero-length string entries ("")
which are valid data. Normally the super() method will set col.fill_value=('', '0')
to replace blanks with a '0'. But for that corner case subset, instead do not do
any filling.
Parameters
----------
cols : list[ColumnECSV]
List of ColumnECSV objects representing the columns in the ECSV file.
table_meta : dict or None
Metadata dictionary from the ECSV header, which may include serialized columns
and other metadata.
null_values : list[str]
List of string values to interpret as null/missing values in every column.
The upstream default from ``read_ecsv`` is [""] but no default is defined here.
Returns
-------
fill_values : list[tuple[str, str, str]]
A list of tuples with (null_value, fill_value, column_name) for each column in
`cols` that is not a MaskedColumn. If no fill values are needed, returns an
empty list.
"""
if table_meta is None:
table_meta = {}
# Get the serialized columns spec or an empty dict if not present.
serialized_columns: dict[str, SerializedColumn] = table_meta.get(
"__serialized_columns__", {}
)
# A serialized MaskedColumn column (via `serialize_method="data_mask"`) does not
# have a fill value, so assemble a set of columns names to skip include the data and
# the mask columns. For example:
# - __serialized_columns__:
# a:
# __class__: astropy.table.column.MaskedColumn
# data: !astropy.table.SerializedColumn {name: a}
# mask: !astropy.table.SerializedColumn {name: a.mask}
masked_col_names = set()
for name, sc in serialized_columns.items():
if sc["__class__"] == "astropy.table.column.MaskedColumn":
masked_col_names.add(name)
masked_col_names.add(name + ".mask")
fill_values = []
for col in cols:
if col.name in masked_col_names:
continue
fill_value = "" if col.csv_np_type == "str" else "0"
for null_value in null_values:
fill_values.append((null_value, fill_value, col.name))
return fill_values
[docs]
def read_ecsv(
input_file: str | os.PathLike | io.BytesIO | io.StringIO | Iterable[str],
*,
encoding: str = "utf-8",
engine: str = "io.ascii",
null_values: list[str] | None = None,
) -> "Table":
"""
Read an ECSV (Enhanced Character Separated Values) file and return an Astropy Table.
Parameters
----------
input_file : str, os.PathLike, io.BytesIO, io.StringIO, Iterable[str]
The ECSV input to read. This can be a file path, a file-like object, a string
containing the file contents, or an iterable of strings representing lines of
the file. Note that providing ``io.StringIO`` or an iterable of strings will be
less memory efficient, as it will be converted to a bytes stream.
encoding : str, optional
The encoding to use when reading the file. Default is "utf-8".
engine : str, optional
The engine to use for reading the CSV data. Default is "io.ascii", which uses
astropy to read the CSV data. Other built-in options are "pyarrow" and "pandas".
The "pyarrow" engine is optimized for performance and can handle large datasets
efficiently. The "pandas" engine uses the pandas CSV reader, which is also
faster than the default "io.ascii" engine.
null_values : list of str or None, optional
List of string values to interpret as null/missing values. Default is [""]. The
ECSV standard requires the null values are represented as empty strings in the
CSV data, but this allows reading non-compliant ECSV files. A notable example
are the Gaia source download files which are ECSV but use "null".
Returns
-------
table : astropy.table.Table
The table read from the ECSV file.
Raises
------
astropy.io.ascii.core.InconsistentTableError
If the column names in the ECSV header do not match the column names in the CSV
data.
Notes
-----
- The function handles various input types, including file paths, file-like objects,
and in-memory strings or lists of strings.
- Metadata and column attributes (such as unit, description, format, and meta) are
transferred from the ECSV header to the resulting Table object.
- Handles JSON-encoded data and ensures appropriate numpy dtypes for columns.
"""
from astropy.table import Table, serialize
if null_values is None:
null_values = [""]
# Allow input types that are historically supported by io.ascii. These will not be
# memory or speed efficient but will still work.
if isinstance(input_file, io.StringIO):
input_file = io.BytesIO(input_file.getvalue().encode(encoding))
elif isinstance(input_file, str) and "\n" in input_file:
input_file = io.BytesIO(input_file.encode(encoding))
elif isinstance(input_file, (list, tuple)):
# TODO: better way to check for an iterable of str?
input_file = io.BytesIO("\n".join(input_file).encode(encoding))
# Read the ECSV header from the input.
header = read_header(input_file, encoding=encoding)
# Read the CSV data from the input starting at the line after the header. This
# includes handling that is particular to the engine.
data_raw = read_data(
input_file,
header,
null_values=null_values,
encoding=encoding,
engine_name=engine,
)
# Convert the column data to the appropriate numpy dtype. This is mostly concerned
# with JSON-encoded data but also handles cases like pyarrow not supporting float16.
data = {col.name: convert_column(col, data_raw[col.name]) for col in header.cols}
# Create the Table object
table = Table(data)
# Transfer metadata from the ECSV header to the Table columns.
for header_col in header.cols:
col = table[header_col.name]
for attr in ["unit", "description", "format", "meta"]:
if (val := getattr(header_col, attr)) is not None:
setattr(col.info, attr, val)
# Add metadata to the table
if header.table_meta:
table.meta.update(header.table_meta)
# Construct any mixin columns from the raw columns.
table = serialize._construct_mixins_from_columns(table)
return table # noqa: RET504
[docs]
def write_ecsv(tbl, output, engine="io.ascii", **kwargs):
"""Thin wrapper around the ``io.ascii`` ECSV writer to write ECSV files.
Parameters
----------
tbl : astropy.table.Table
The table to write to ECSV format.
output : str or os.PathLike or file-like object
The output file path or file-like object to write the ECSV data to.
engine : str, optional
The engine to use for writing the CSV data. Default is "io.ascii", which uses
astropy to write the CSV data. Currently this is the only option.
**kwargs : dict, optional
Additional keyword arguments passed to the ECSV writer. These can include
options like ``delimiter``, ``encoding``, and others supported by the
`astropy.io.ascii.Ecsv` writer.
"""
if engine != "io.ascii":
raise ValueError(
f"{engine=} is not a supported engine for writing, use 'io.ascii'"
)
tbl.write(output, format="ascii.ecsv", **kwargs)
[docs]
def register_ecsv_table():
"""
Register ECSV reader and writer with Unified I/O as a Table reader.
"""
from astropy.io import registry as io_registry
from astropy.table import Table
io_registry.register_reader("ecsv", Table, read_ecsv)
io_registry.register_writer("ecsv", Table, write_ecsv)