"""
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
"""
import copy
import re
from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union
import dataproperty as dp
import typepy
from dataproperty import DataPropertyMatrix
from dataproperty.typing import TypeHint
from typepy import Nan
from ._constant import PatternMatch
from ._converter import to_value_matrix
from ._logger import logger
if TYPE_CHECKING:
import pandas
[docs]class TableData:
"""
Class to represent a table data structure.
:param table_name: Name of the table.
:param headers: Table header names.
:param rows: Data of the table.
"""
def __init__(
self,
table_name: Optional[str],
headers: Sequence[str],
rows: Sequence,
dp_extractor: Optional[dp.DataPropertyExtractor] = None,
type_hints: Optional[Sequence[Union[str, TypeHint]]] = None,
max_workers: Optional[int] = None,
max_precision: Optional[int] = None,
) -> None:
self.__table_name = table_name
self.__value_matrix: List[List[Any]] = []
self.__value_dp_matrix: Optional[DataPropertyMatrix] = None
if rows:
self.__rows = rows
else:
self.__rows = []
if dp_extractor:
self.__dp_extractor = copy.deepcopy(dp_extractor)
else:
self.__dp_extractor = dp.DataPropertyExtractor(max_precision=max_precision)
if type_hints:
self.__dp_extractor.column_type_hints = type_hints
self.__dp_extractor.strip_str_header = '"'
if max_workers:
self.__dp_extractor.max_workers = max_workers
if not headers:
self.__dp_extractor.headers = []
else:
self.__dp_extractor.headers = headers
def __repr__(self) -> str:
element_list = [f"table_name={self.table_name}"]
try:
element_list.append("headers=[{}]".format(", ".join(self.headers)))
except TypeError:
element_list.append("headers=None")
element_list.extend([f"cols={self.num_columns}", f"rows={self.num_rows}"])
return ", ".join(element_list)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, TableData):
return False
return self.equals(other, cmp_by_dp=False)
def __ne__(self, other: Any) -> bool:
if not isinstance(other, TableData):
return True
return not self.equals(other, cmp_by_dp=False)
@property
def table_name(self) -> Optional[str]:
"""str: Name of the table."""
return self.__table_name
@table_name.setter
def table_name(self, value: Optional[str]) -> None:
self.__table_name = value
@property
def headers(self) -> Sequence[str]:
"""Sequence[str]: Table header names."""
return self.__dp_extractor.headers
@property
def rows(self) -> Sequence:
"""Sequence: Original rows of tabular data."""
return self.__rows
@property
def value_matrix(self) -> DataPropertyMatrix:
"""DataPropertyMatrix: Converted rows of tabular data."""
if self.__value_matrix:
return self.__value_matrix
self.__value_matrix = [
[value_dp.data for value_dp in value_dp_list] for value_dp_list in self.value_dp_matrix
]
return self.__value_matrix
@property
def has_value_dp_matrix(self) -> bool:
return self.__value_dp_matrix is not None
@property
def max_workers(self) -> int:
return self.__dp_extractor.max_workers
@max_workers.setter
def max_workers(self, value: Optional[int]) -> None:
self.__dp_extractor.max_workers = value
@property
def num_rows(self) -> Optional[int]:
"""Optional[int]:
Number of rows in the tabular data.
|None| if the ``rows`` is neither list nor tuple.
"""
try:
return len(self.rows)
except TypeError:
return None
@property
def num_columns(self) -> Optional[int]:
if typepy.is_not_empty_sequence(self.headers):
return len(self.headers)
try:
return len(self.rows[0])
except TypeError:
return None
except IndexError:
return 0
@property
def value_dp_matrix(self) -> DataPropertyMatrix:
"""DataPropertyMatrix: DataProperty for table data."""
if self.__value_dp_matrix is None:
self.__value_dp_matrix = self.__dp_extractor.to_dp_matrix(
to_value_matrix(self.headers, self.rows)
)
return self.__value_dp_matrix
@property
def header_dp_list(self) -> List[dp.DataProperty]:
return self.__dp_extractor.to_header_dp_list()
@property
def column_dp_list(self) -> List[dp.ColumnDataProperty]:
return self.__dp_extractor.to_column_dp_list(self.value_dp_matrix)
@property
def dp_extractor(self) -> dp.DataPropertyExtractor:
return self.__dp_extractor
[docs] def is_empty_rows(self) -> bool:
"""
:return: |True| if the tabular data has no rows.
:rtype: bool
"""
return self.num_rows == 0
[docs] def is_empty(self) -> bool:
"""
:return:
|True| if the data :py:attr:`.headers` or
:py:attr:`.value_matrix` is empty.
:rtype: bool
"""
return any([self.is_empty_header(), self.is_empty_rows()])
[docs] def equals(self, other: "TableData", cmp_by_dp: bool = True) -> bool:
if cmp_by_dp:
return self.__equals_dp(other)
return self.__equals_raw(other)
def __equals_base(self, other: "TableData") -> bool:
compare_item_list = [self.table_name == other.table_name]
if self.num_rows is not None:
compare_item_list.append(self.num_rows == other.num_rows)
return all(compare_item_list)
def __equals_raw(self, other: "TableData") -> bool:
if not self.__equals_base(other):
return False
if self.headers != other.headers:
return False
for lhs_row, rhs_row in zip(self.rows, other.rows):
if len(lhs_row) != len(rhs_row):
return False
if not all(
[
lhs == rhs
for lhs, rhs in zip(lhs_row, rhs_row)
if not Nan(lhs).is_type() and not Nan(rhs).is_type()
]
):
return False
return True
def __equals_dp(self, other: "TableData") -> bool:
if not self.__equals_base(other):
return False
if self.header_dp_list != other.header_dp_list:
return False
if self.value_dp_matrix is None or other.value_dp_matrix is None:
return False
for lhs_list, rhs_list in zip(self.value_dp_matrix, other.value_dp_matrix):
if len(lhs_list) != len(rhs_list):
return False
if any([lhs != rhs for lhs, rhs in zip(lhs_list, rhs_list)]):
return False
return True
[docs] def in_tabledata_list(self, other: Sequence["TableData"], cmp_by_dp: bool = True) -> bool:
for table_data in other:
if self.equals(table_data, cmp_by_dp=cmp_by_dp):
return True
return False
[docs] def validate_rows(self) -> None:
"""
:raises ValueError:
"""
invalid_row_idx_list = []
for row_idx, row in enumerate(self.rows):
if isinstance(row, (list, tuple)) and len(self.headers) != len(row):
invalid_row_idx_list.append(row_idx)
if isinstance(row, dict):
if not all([header in row for header in self.headers]):
invalid_row_idx_list.append(row_idx)
if not invalid_row_idx_list:
return
for invalid_row_idx in invalid_row_idx_list:
logger.debug(f"invalid row (line={invalid_row_idx}): {self.rows[invalid_row_idx]}")
raise ValueError(
"table header length and row length are mismatch:\n"
+ f" header(len={len(self.headers)}): {self.headers}\n"
+ " # of miss match rows: {} ouf of {}\n".format(
len(invalid_row_idx_list), self.num_rows
)
)
[docs] def as_dict(self, default_key: str = "table") -> Dict[str, List["OrderedDict[str, Any]"]]:
"""
Args:
default_key:
Key of a returning dictionary when the ``table_name`` is empty.
Returns:
dict: Table data as a |dict| instance.
Sample Code:
.. code:: python
from tabledata import TableData
TableData(
"sample",
["a", "b"],
[[1, 2], [3.3, 4.4]]
).as_dict()
Output:
.. code:: json
{'sample': [OrderedDict([('a', 1), ('b', 2)]), OrderedDict([('a', 3.3), ('b', 4.4)])]}
""" # noqa
dict_body = []
for row in self.value_matrix:
if not row:
continue
values = [
(header, value) for header, value in zip(self.headers, row) if value is not None
]
if not values:
continue
dict_body.append(OrderedDict(values))
table_name = self.table_name
if not table_name:
table_name = default_key
return {table_name: dict_body}
[docs] def as_tuple(self) -> Iterator[Tuple]:
"""
:return: Rows of the tuple.
:rtype: list of |namedtuple|
:Sample Code:
.. code:: python
from tabledata import TableData
records = TableData(
"sample",
["a", "b"],
[[1, 2], [3.3, 4.4]]
).as_tuple()
for record in records:
print(record)
:Output:
.. code-block:: none
Row(a=1, b=2)
Row(a=Decimal('3.3'), b=Decimal('4.4'))
"""
Row = namedtuple("Row", self.headers) # type: ignore
for value_dp_list in self.value_dp_matrix:
if typepy.is_empty_sequence(value_dp_list):
continue
row = Row(*(value_dp.data for value_dp in value_dp_list))
yield row
[docs] def as_dataframe(self) -> "pandas.DataFrame":
"""
:return: Table data as a ``pandas.DataFrame`` instance.
:rtype: pandas.DataFrame
:Sample Code:
.. code-block:: python
from tabledata import TableData
TableData(
"sample",
["a", "b"],
[[1, 2], [3.3, 4.4]]
).as_dataframe()
:Output:
.. code-block:: none
a b
0 1 2
1 3.3 4.4
:Dependency Packages:
- `pandas <https://pandas.pydata.org/>`__
"""
try:
from pandas import DataFrame
except ImportError:
raise RuntimeError("required 'pandas' package to execute as_dataframe method")
dataframe = DataFrame(self.value_matrix)
if not self.is_empty_header():
dataframe.columns = self.headers
return dataframe
[docs] def transpose(self) -> "TableData":
return TableData(
self.table_name,
self.headers,
[row for row in zip(*self.rows)],
max_workers=self.max_workers,
)
[docs] def filter_column(
self,
patterns: Optional[str] = None,
is_invert_match: bool = False,
is_re_match: bool = False,
pattern_match: PatternMatch = PatternMatch.OR,
) -> "TableData":
logger.debug(
"filter_column: patterns={}, is_invert_match={}, "
"is_re_match={}, pattern_match={}".format(
patterns, is_invert_match, is_re_match, pattern_match
)
)
if not patterns:
return self
match_header_list = []
match_column_matrix = []
if pattern_match == PatternMatch.OR:
match_method = any
elif pattern_match == PatternMatch.AND:
match_method = all
else:
raise ValueError(f"unknown matching: {pattern_match}")
for header, column in zip(self.headers, zip(*self.rows)):
is_match_list = []
for pattern in patterns:
is_match = self.__is_match(header, pattern, is_re_match)
is_match_list.append(
any([is_match and not is_invert_match, not is_match and is_invert_match])
)
if match_method(is_match_list):
match_header_list.append(header)
match_column_matrix.append(column)
logger.debug(
"filter_column: table={}, match_header_list={}".format(
self.table_name, match_header_list
)
)
return TableData(
self.table_name,
match_header_list,
list(zip(*match_column_matrix)),
max_workers=self.max_workers,
)
[docs] @staticmethod
def from_dataframe(
dataframe: "pandas.DataFrame",
table_name: str = "",
type_hints: Optional[Sequence[TypeHint]] = None,
max_workers: Optional[int] = None,
) -> "TableData":
"""
Initialize TableData instance from a pandas.DataFrame instance.
:param pandas.DataFrame dataframe:
:param str table_name: Table name to create.
"""
return TableData(
table_name,
list(dataframe.columns.values),
dataframe.values.tolist(),
type_hints=type_hints,
max_workers=max_workers,
)
@staticmethod
def __is_match(header: str, pattern: str, is_re_match: bool) -> bool:
if is_re_match:
return re.search(pattern, header) is not None
return header == pattern