ebilab.analysis.process_util のソースコード

from __future__ import annotations

import re
import os
from pathlib import Path
from typing import Union, Callable, List
from dataclasses import dataclass

import pandas as pd

from .paths import paths

[ドキュメント]@dataclass class DfAction: handler: Callable[[pd.DataFrame], pd.DataFrame] key: str
[ドキュメント] def get_key(self): return self.key
[ドキュメント]@dataclass class DfPlotter: handler: Callable[[pd.DataFrame, Union[str, Path]], None] key: str
[ドキュメント] def get_key(self): return self.key
[ドキュメント]@dataclass class AggregatedDfPlotter: handler: Callable[[List[pd.DataFrame], Union[str, Path]], None] key: str
[ドキュメント] def get_key(self): return self.key
[ドキュメント]def df_action(key: str): def decorator(func: Callable[[pd.DataFrame], pd.DataFrame]): return DfAction(handler=func, key=key) return decorator
[ドキュメント]def df_plotter(key: str): def decorator(func: Callable[[pd.DataFrame, Union[str, Path]], None]): return DfPlotter(handler=func, key=key) return decorator
[ドキュメント]def agg_df_plotter(key: str): def decorator(func: Callable[[List[pd.DataFrame], Union[str, Path]], None]): return AggregatedDfPlotter(handler=func, key=key) return decorator
[ドキュメント]class ProcessingData: _df: pd.DataFrame _key: str _use_cache = True def __init__(self, df: pd.DataFrame, key: str): self._df = df self._key = key
[ドキュメント] def apply(self, action: DfAction): self._key += "__" + action.key cache = self._load_csv() if cache is None: self._df = action.handler(self._df) self._save() self._use_cache = False print(f"Saved: {self._key}.csv") else: self._df = cache if os.environ.get("EBILAB_SOURCE") != "WATCH": print(f"Cache available: {self._key}.csv") return self
[ドキュメント] def query(self, q: str, caption: str): def func(df: pd.DataFrame) -> pd.DataFrame: return df.query(q) return self.apply(DfAction(func, caption))
[ドキュメント] def concat(self, other: ProcessingData) -> ProcessingData: key1 = re.sub(r"^\((.+)\)$", r"\1", self._key) key2 = re.sub(r"^\((.+)\)$", r"\1", other._key) return ProcessingData(df=pd.concat([self._df, other._df]), key=f"({key1}+{key2})")
[ドキュメント] def nocache(self): self._use_cache = False return self
def _load_csv(self): if not self._use_cache: return None path = paths.output / (self._key + ".csv") if path.exists(): return pd.read_csv(path) return None def _check_png(self) -> bool: path = paths.output / (self._key + ".png") return self._use_cache and path.exists() def _save(self): self._df.to_csv(paths.output / (self._key + ".csv"), index=False) return self
[ドキュメント] def plot(self, plotter: DfPlotter): filename = paths.plot / (self._key + "__" + plotter.key + ".png") # cache if self._use_cache and filename.exists(): if os.environ.get("EBILAB_SOURCE") != "WATCH": print(f"Plot already exists: {filename.name}") else: plotter.handler(self._df, filename) print(f"Saved plot: {filename.name}") return self
def __del__(self): # ここでplt.closeすると show() メソッドが作れるかも pass
[ドキュメント] @classmethod def fromCsv(cls, filename: Union[str, Path]): path = Path(filename) df = pd.read_csv(path) return cls(df, path.stem)
[ドキュメント]class AggregatedProcessingData: _dfs: List[pd.DataFrame] _keys: List[str] _use_cache = True def __init__(self, data: List[ProcessingData]): self._dfs = list(map(lambda d:d._df, data)) self._keys = list(map(lambda d:d._key, data)) self._use_cache = all(map(lambda d:d._use_cache, data))
[ドキュメント] def plot(self, plotter: AggregatedDfPlotter): filename = paths.plot / ("[" + ",".join(self._keys) + "]__" + plotter.key + ".png") # cache if self._use_cache and filename.exists(): if os.environ.get("EBILAB_SOURCE") != "WATCH": print(f"Plot already exists: {filename.name}") else: plotter.handler(self._dfs, filename) print(f"Saved plot: {filename.name}") return self
[ドキュメント] def nocache(self): self._use_cache = False return self
[ドキュメント]def aggregate(data: List[ProcessingData]): return AggregatedProcessingData(data)
[ドキュメント]def input(filename: str): return ProcessingData.fromCsv(paths.input / filename)
[ドキュメント]def output(filename: str): return ProcessingData.fromCsv(paths.output / filename)
[ドキュメント]def fromDf(df: pd.DataFrame, key: str): return ProcessingData(df, key)