Source code for rolumns.columns

from typing import Any, Dict, List, Optional, Tuple, Union

from rolumns.column import Column
from rolumns.cursor import Cursor
from rolumns.exceptions import MultipleGroups
from rolumns.group import Group
from rolumns.source import Source


[docs]class Columns: """ A set of columns. .. testcode:: from rolumns import Columns data = [ { "name": "Robert Pringles", "email": "bob@pringles.pop", }, ] columns = Columns() columns.add("Name", "name") columns.add("Email", "email") """ def __init__( self, cursor: Optional[Union[Cursor, Group, str]] = None, ) -> None: if isinstance(cursor, (Group, str)) or cursor is None: cursor = Cursor(cursor) self._columns: List[Column] = [] self._cursor = cursor self._grouped_set: Optional[Columns] = None
[docs] def add( self, name: str, source: Optional[Union[Source, str]] = None, ) -> None: """ Adds a column. `source` describes the data source for the column, which can be: - an explicit :class:`Source` - a string that describes the :code:`.`-separated path to the value - `None` if this set's record is an iterable list of primitives .. testcode:: from rolumns import Columns, Source data = [ { "name": "Robert Pringles", "email": "bob@pringles.pop", "awards": [ "Fastest Doughnut Run", "Tie of the Year", ], }, ] columns = Columns() columns.add("Name", "name") columns.add("Email", Source(path="email")) awards = columns.group("awards") # "awards" is also a column set awards.add("Awards") """ source = source if isinstance(source, Source) else Source(path=source) column = Column(name, source) self._columns.append(column)
@property def cursor(self) -> Cursor: """ Cursor. """ return self._cursor
[docs] def group( self, cursor: Union[Cursor, Group, str], ) -> "Columns": """ Creates and adds a grouped column set. A column set cannot have multiple groups (though each group can have its own group). Will raise :class:`exceptions.MultipleGroups` if you try to add a second group. .. testcode:: from rolumns import Columns, Source data = [ { "name": "Robert Pringles", "awards": [ "Fastest Doughnut Run", "Tie of the Year", ], }, ] columns = Columns() columns.add("Name", "name") awards = columns.group("awards") awards.add("Awards") """ if self._grouped_set: raise MultipleGroups() if isinstance(cursor, (Group, str)): cursor = self._cursor.group(cursor) self._grouped_set = Columns(cursor) return self._grouped_set
[docs] def names(self) -> List[str]: """ Gets the names of the columns within this set and its children. """ names: List[str] = [] for c in self._columns: names.append(c.name) if self._grouped_set: names.extend(self._grouped_set.names()) return names
[docs] def normalize(self) -> List[Dict[str, Any]]: """ Normalises `data` into a list of dictionaries describing column names and values. """ result: List[Dict[str, Any]] = [] for record in self._cursor: resolved: Dict[str, Any] = {} for column in self._columns: for index, value in enumerate(column.source.read(record)): if index == 0: resolved[column.name] = value else: raise Exception("Encountered multiple values") if self._grouped_set: key = self._grouped_set._cursor.cursor_group.name() resolved[key] = self._grouped_set.normalize() result.append(resolved) return result
[docs] @staticmethod def normalized_to_column_values( normalized: List[Dict[str, Any]], ) -> Dict[str, List[Any]]: """ Translates the normalised list of values `normalized` to a dictionary of column names and values. """ filled_columns: Dict[str, List[Any]] = {} filled_columns_height = 0 for record in normalized: inner_columns: Dict[str, List[Any]] = {} inner_height = 1 group: Optional[Tuple[str, List[Any]]] = None for key, value in record.items(): if isinstance(value, list): if group: raise Exception("Encountered multiple groups") group = (key, value) else: inner_columns[key] = [value] if group: group_values = Columns.normalized_to_column_values(group[1]) for key, value in group_values.items(): if inner_height > 1 and inner_height != len(value): raise Exception inner_height = max(inner_height, len(value)) inner_columns[key] = value for key, value in inner_columns.items(): while len(value) < inner_height: value.append(value[0]) for key, value in inner_columns.items(): if key not in filled_columns: filled_columns[key] = [None] * filled_columns_height filled_columns[key].extend(value) filled_columns_height += inner_height return filled_columns
[docs] def to_column_values(self) -> Dict[str, List[Any]]: """ Translates `data` to a dictionary of column names and values. """ normalized = self.normalize() return Columns.normalized_to_column_values(normalized)