Source code for tabledata._core

"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""

import copy
import re
from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import dataproperty as dp
import typepy
from dataproperty import DataPropertyMatrix
from dataproperty.typing import TypeHint
from typepy import Nan

from ._constant import PatternMatch
from ._converter import to_value_matrix
from ._logger import logger


if TYPE_CHECKING:
    import pandas


[docs]class TableData: """ Class to represent a table data structure. :param table_name: Name of the table. :param headers: Table header names. :param rows: Data of the table. """ def __init__( self, table_name: Optional[str], headers: Sequence[str], rows: Sequence, dp_extractor: Optional[dp.DataPropertyExtractor] = None, type_hints: Optional[Sequence[Union[str, TypeHint]]] = None, max_workers: Optional[int] = None, max_precision: Optional[int] = None, ) -> None: self.__table_name = table_name self.__value_matrix: List[List[Any]] = [] self.__value_dp_matrix: Optional[DataPropertyMatrix] = None if rows: self.__rows = rows else: self.__rows = [] if dp_extractor: self.__dp_extractor = copy.deepcopy(dp_extractor) else: self.__dp_extractor = dp.DataPropertyExtractor(max_precision=max_precision) if type_hints: self.__dp_extractor.column_type_hints = type_hints self.__dp_extractor.strip_str_header = '"' if max_workers: self.__dp_extractor.max_workers = max_workers if not headers: self.__dp_extractor.headers = [] else: self.__dp_extractor.headers = headers def __repr__(self) -> str: element_list = [f"table_name={self.table_name}"] try: element_list.append("headers=[{}]".format(", ".join(self.headers))) except TypeError: element_list.append("headers=None") element_list.extend([f"cols={self.num_columns}", f"rows={self.num_rows}"]) return ", ".join(element_list) def __eq__(self, other: Any) -> bool: if not isinstance(other, TableData): return False return self.equals(other, cmp_by_dp=False) def __ne__(self, other: Any) -> bool: if not isinstance(other, TableData): return True return not self.equals(other, cmp_by_dp=False) @property def table_name(self) -> Optional[str]: """str: Name of the table.""" return self.__table_name @table_name.setter def table_name(self, value: Optional[str]) -> None: self.__table_name = value @property def headers(self) -> Sequence[str]: """Sequence[str]: Table header names.""" return self.__dp_extractor.headers @property def rows(self) -> Sequence: """Sequence: Original rows of tabular data.""" return self.__rows @property def value_matrix(self) -> DataPropertyMatrix: """DataPropertyMatrix: Converted rows of tabular data.""" if self.__value_matrix: return self.__value_matrix self.__value_matrix = [ [value_dp.data for value_dp in value_dp_list] for value_dp_list in self.value_dp_matrix ] return self.__value_matrix @property def has_value_dp_matrix(self) -> bool: return self.__value_dp_matrix is not None @property def max_workers(self) -> int: return self.__dp_extractor.max_workers @max_workers.setter def max_workers(self, value: Optional[int]) -> None: self.__dp_extractor.max_workers = value @property def num_rows(self) -> Optional[int]: """Optional[int]: Number of rows in the tabular data. |None| if the ``rows`` is neither list nor tuple. """ try: return len(self.rows) except TypeError: return None @property def num_columns(self) -> Optional[int]: if typepy.is_not_empty_sequence(self.headers): return len(self.headers) try: return len(self.rows[0]) except TypeError: return None except IndexError: return 0 @property def value_dp_matrix(self) -> DataPropertyMatrix: """DataPropertyMatrix: DataProperty for table data.""" if self.__value_dp_matrix is None: self.__value_dp_matrix = self.__dp_extractor.to_dp_matrix( to_value_matrix(self.headers, self.rows) ) return self.__value_dp_matrix @property def header_dp_list(self) -> List[dp.DataProperty]: return self.__dp_extractor.to_header_dp_list() @property def column_dp_list(self) -> List[dp.ColumnDataProperty]: return self.__dp_extractor.to_column_dp_list(self.value_dp_matrix) @property def dp_extractor(self) -> dp.DataPropertyExtractor: return self.__dp_extractor
[docs] def is_empty_header(self) -> bool: """bool: |True| if the data :py:attr:`.headers` is empty.""" return typepy.is_empty_sequence(self.headers)
[docs] def is_empty_rows(self) -> bool: """ :return: |True| if the tabular data has no rows. :rtype: bool """ return self.num_rows == 0
[docs] def is_empty(self) -> bool: """ :return: |True| if the data :py:attr:`.headers` or :py:attr:`.value_matrix` is empty. :rtype: bool """ return any([self.is_empty_header(), self.is_empty_rows()])
[docs] def equals(self, other: "TableData", cmp_by_dp: bool = True) -> bool: if cmp_by_dp: return self.__equals_dp(other) return self.__equals_raw(other)
def __equals_base(self, other: "TableData") -> bool: compare_item_list = [self.table_name == other.table_name] if self.num_rows is not None: compare_item_list.append(self.num_rows == other.num_rows) return all(compare_item_list) def __equals_raw(self, other: "TableData") -> bool: if not self.__equals_base(other): return False if self.headers != other.headers: return False for lhs_row, rhs_row in zip(self.rows, other.rows): if len(lhs_row) != len(rhs_row): return False if not all( [ lhs == rhs for lhs, rhs in zip(lhs_row, rhs_row) if not Nan(lhs).is_type() and not Nan(rhs).is_type() ] ): return False return True def __equals_dp(self, other: "TableData") -> bool: if not self.__equals_base(other): return False if self.header_dp_list != other.header_dp_list: return False if self.value_dp_matrix is None or other.value_dp_matrix is None: return False for lhs_list, rhs_list in zip(self.value_dp_matrix, other.value_dp_matrix): if len(lhs_list) != len(rhs_list): return False if any([lhs != rhs for lhs, rhs in zip(lhs_list, rhs_list)]): return False return True
[docs] def in_tabledata_list(self, other: Sequence["TableData"], cmp_by_dp: bool = True) -> bool: for table_data in other: if self.equals(table_data, cmp_by_dp=cmp_by_dp): return True return False
[docs] def validate_rows(self) -> None: """ :raises ValueError: """ invalid_row_idx_list = [] for row_idx, row in enumerate(self.rows): if isinstance(row, (list, tuple)) and len(self.headers) != len(row): invalid_row_idx_list.append(row_idx) if isinstance(row, dict): if not all([header in row for header in self.headers]): invalid_row_idx_list.append(row_idx) if not invalid_row_idx_list: return for invalid_row_idx in invalid_row_idx_list: logger.debug(f"invalid row (line={invalid_row_idx}): {self.rows[invalid_row_idx]}") raise ValueError( "table header length and row length are mismatch:\n" + f" header(len={len(self.headers)}): {self.headers}\n" + " # of miss match rows: {} ouf of {}\n".format( len(invalid_row_idx_list), self.num_rows ) )
[docs] def as_dict(self, default_key: str = "table") -> Dict[str, List["OrderedDict[str, Any]"]]: """ Args: default_key: Key of a returning dictionary when the ``table_name`` is empty. Returns: dict: Table data as a |dict| instance. Sample Code: .. code:: python from tabledata import TableData TableData( "sample", ["a", "b"], [[1, 2], [3.3, 4.4]] ).as_dict() Output: .. code:: json {'sample': [OrderedDict([('a', 1), ('b', 2)]), OrderedDict([('a', 3.3), ('b', 4.4)])]} """ # noqa dict_body = [] for row in self.value_matrix: if not row: continue values = [ (header, value) for header, value in zip(self.headers, row) if value is not None ] if not values: continue dict_body.append(OrderedDict(values)) table_name = self.table_name if not table_name: table_name = default_key return {table_name: dict_body}
[docs] def as_tuple(self) -> Iterator[Tuple]: """ :return: Rows of the tuple. :rtype: list of |namedtuple| :Sample Code: .. code:: python from tabledata import TableData records = TableData( "sample", ["a", "b"], [[1, 2], [3.3, 4.4]] ).as_tuple() for record in records: print(record) :Output: .. code-block:: none Row(a=1, b=2) Row(a=Decimal('3.3'), b=Decimal('4.4')) """ Row = namedtuple("Row", self.headers) # type: ignore for value_dp_list in self.value_dp_matrix: if typepy.is_empty_sequence(value_dp_list): continue row = Row(*(value_dp.data for value_dp in value_dp_list)) yield row
[docs] def as_dataframe(self) -> "pandas.DataFrame": """ :return: Table data as a ``pandas.DataFrame`` instance. :rtype: pandas.DataFrame :Sample Code: .. code-block:: python from tabledata import TableData TableData( "sample", ["a", "b"], [[1, 2], [3.3, 4.4]] ).as_dataframe() :Output: .. code-block:: none a b 0 1 2 1 3.3 4.4 :Dependency Packages: - `pandas <https://pandas.pydata.org/>`__ """ try: from pandas import DataFrame except ImportError: raise RuntimeError("required 'pandas' package to execute as_dataframe method") dataframe = DataFrame(self.value_matrix) if not self.is_empty_header(): dataframe.columns = self.headers return dataframe
[docs] def transpose(self) -> "TableData": return TableData( self.table_name, self.headers, [row for row in zip(*self.rows)], max_workers=self.max_workers, )
[docs] def filter_column( self, patterns: Optional[str] = None, is_invert_match: bool = False, is_re_match: bool = False, pattern_match: PatternMatch = PatternMatch.OR, ) -> "TableData": logger.debug( "filter_column: patterns={}, is_invert_match={}, " "is_re_match={}, pattern_match={}".format( patterns, is_invert_match, is_re_match, pattern_match ) ) if not patterns: return self match_header_list = [] match_column_matrix = [] if pattern_match == PatternMatch.OR: match_method = any elif pattern_match == PatternMatch.AND: match_method = all else: raise ValueError(f"unknown matching: {pattern_match}") for header, column in zip(self.headers, zip(*self.rows)): is_match_list = [] for pattern in patterns: is_match = self.__is_match(header, pattern, is_re_match) is_match_list.append( any([is_match and not is_invert_match, not is_match and is_invert_match]) ) if match_method(is_match_list): match_header_list.append(header) match_column_matrix.append(column) logger.debug( "filter_column: table={}, match_header_list={}".format( self.table_name, match_header_list ) ) return TableData( self.table_name, match_header_list, list(zip(*match_column_matrix)), max_workers=self.max_workers, )
[docs] @staticmethod def from_dataframe( dataframe: "pandas.DataFrame", table_name: str = "", type_hints: Optional[Sequence[TypeHint]] = None, max_workers: Optional[int] = None, ) -> "TableData": """ Initialize TableData instance from a pandas.DataFrame instance. :param pandas.DataFrame dataframe: :param str table_name: Table name to create. """ return TableData( table_name, list(dataframe.columns.values), dataframe.values.tolist(), type_hints=type_hints, max_workers=max_workers, )
@staticmethod def __is_match(header: str, pattern: str, is_re_match: bool) -> bool: if is_re_match: return re.search(pattern, header) is not None return header == pattern