ebilab.experiment._experiment_controller のソースコード

import datetime
import numbers
import os
import sys
import csv
import abc
import socket
import copy
import time
import queue
from pathlib import Path
from typing import List, Optional, Type, Dict
import weakref
from threading import Thread
import dataclasses
from logging import getLogger

import matplotlib.pyplot as plt

from .options import OptionField
from ..project import get_current_project

logger = getLogger(__name__)

class ExperimentStoppedByUser(Exception):
    """
    Raised when user pressed 'stop' button
    """
    def __str__(self):
        return "User has stopped the experiment"

# dependencies of ExperimentController
class ExperimentContextDelegate(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def experiment_ctx_delegate_send_row(self, row):
        raise NotImplementedError()

    @abc.abstractmethod
    def experiment_ctx_delegate_send_log(self, log: str):
        raise NotImplementedError()

    @abc.abstractmethod
    def experiment_ctx_delegate_get_t(self) -> float:
        raise NotImplementedError()

    @abc.abstractmethod
    def experiment_ctx_delegate_get_options(self) -> dict:
        raise NotImplementedError()

    @abc.abstractmethod
    def experiment_ctx_delegate_loop(self) -> None:
        raise NotImplementedError()

[ドキュメント] class ExperimentContext: _delegate: ExperimentContextDelegate def __init__(self, delegate: ExperimentContextDelegate): self._delegate = delegate
[ドキュメント] def send_row(self, row: dict): self._delegate.experiment_ctx_delegate_send_row(row)
[ドキュメント] def log(self, log: str): self._delegate.experiment_ctx_delegate_send_log(log)
@property def t(self) -> float: return self._delegate.experiment_ctx_delegate_get_t() @property def options(self) -> dict: return self._delegate.experiment_ctx_delegate_get_options()
[ドキュメント] def loop(self) -> None: self._delegate.experiment_ctx_delegate_loop()
[ドキュメント] def sleep(self, sleep_time: float) -> None: """ Cancelable sleep You should use ctx.sleep instead of time.sleep Args: sleep_time (float): Time to sleep """ target = time.time() + sleep_time while target - time.time() > 1.0: time.sleep(1) self.loop() time.sleep(target - time.time())
[ドキュメント] @dataclasses.dataclass class PlotterContext: plotter_options: dict protocol_options: dict
[ドキュメント] class ExperimentPlotter(metaclass=abc.ABCMeta): fig: plt.Figure name: str options: Optional[Dict[str, OptionField]] = None
[ドキュメント] @abc.abstractmethod def prepare(self, ctx: PlotterContext): raise NotImplementedError()
[ドキュメント] @abc.abstractmethod def update(self, df, ctx: PlotterContext): raise NotImplementedError()
[ドキュメント] class ExperimentProtocol(metaclass=abc.ABCMeta): name: str columns: List[str] plotter_classes: List[Type[ExperimentPlotter]] = None options: Optional[Dict[str, OptionField]] = None
[ドキュメント] @abc.abstractmethod def steps(self, ctx: ExperimentContext) -> None: raise NotImplementedError()
[ドキュメント] @classmethod def register_plotter(cls, plotter): if cls.plotter_classes is None: cls.plotter_classes = [] cls.plotter_classes.append(plotter)
[ドキュメント] @dataclasses.dataclass(frozen=True) class ExperimentProtocolGroup: name: str protocols: List[Type[ExperimentProtocol]]
class ExperimentUIDelegate(metaclass=abc.ABCMeta): # UI -> Coreの操作 @abc.abstractmethod def handle_ui_start(self, experiment_index: int): raise NotImplementedError @abc.abstractmethod def handle_ui_stop(self): raise NotImplementedError class IExperimentUI(metaclass=abc.ABCMeta): __delegate_ref = None @property def delegate(self) -> Optional[ExperimentUIDelegate]: if self.__delegate_ref is None: return None return self.__delegate_ref() @delegate.setter def delegate(self, delegate: ExperimentUIDelegate): self.__delegate_ref = weakref.ref(delegate) @property @abc.abstractmethod def data_queue(self) -> queue.Queue: raise NotImplementedError() experiments: List[Type[ExperimentProtocol]] log_queue: queue.Queue @abc.abstractmethod def launch(self): raise NotImplementedError() @abc.abstractmethod def update_state(self, state: str): raise NotImplementedError() @abc.abstractmethod def reset_data(self): raise NotImplementedError() @abc.abstractmethod def get_options(self) -> dict: raise NotImplementedError() @property def experiment_label(self) -> str: raise NotImplementedError() def show_error(self, msg: str): raise NotImplementedError() def prepare_experiments(experiments): for experiment in experiments: if isinstance(experiment, ExperimentProtocolGroup): prepare_experiments(experiment.protocols) continue if experiment.plotter_classes is None: experiment.plotter_classes = []
[ドキュメント] class ExperimentController(ExperimentContextDelegate, ExperimentUIDelegate): _experiments: List[Type[ExperimentProtocol]] _ui: IExperimentUI _ctx: ExperimentContext _running = False _file = None _log_file = None _experiment_thread = None def __init__(self, *, experiments: List[Type[ExperimentProtocol]], ui: IExperimentUI): # fix plotter_classes in experiments prepare_experiments(experiments) self._experiments = experiments self._ui = ui self._ui.delegate = self self._ui.experiments = experiments
[ドキュメント] def launch(self): self._ui.launch()
def _get_comment_line(self, experiment: ExperimentProtocol, options: dict) -> str: """ Returns: str: includes trailing NL """ exp_name = experiment.name date = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") pc_name = socket.gethostname() options_str = ", ".join([f"{k}: {v}" for k, v in options.items()]) if options else "" comment_str = "" comment_str += f"# {exp_name} experiment: Ran at {date} in {pc_name}\n" comment_str += f"# {options_str}\n" comment_str += f"#\n" return comment_str def _run(self, experiment: Type[ExperimentProtocol]): logger.info(f"starting experiment") self._ui.update_state("running") self._running_experiment_class = experiment self._running_experiment = self._running_experiment_class() self._ui.reset_data() self._options = self._ui.get_options() self._ctx = ExperimentContext(delegate=self) # file try: logger.debug("ebilab project found") data_dir = get_current_project().path.data_original except: logger.debug("ebilab project not found") data_dir = Path(".") / "data" dir = data_dir / datetime.datetime.now().strftime("%y%m%d") os.makedirs(dir, exist_ok=True) label = self._ui.experiment_label or self._running_experiment.name filename = label + "-" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ".csv" log_filename = label + "-" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ".log" self._filename = dir / filename logger.info(f"Output file: {self._filename}") self._file = open(self._filename, "w", newline="") self._log_file = open(dir / log_filename, "w", newline="") self._csv_writer = csv.writer(self._file, quoting=csv.QUOTE_NONNUMERIC) comment_lines = self._get_comment_line(self._running_experiment, self._options) logger.debug("Comment_lines: " + comment_lines) self._file.write(comment_lines) header = ["t", "time"] + self._running_experiment.columns logger.debug("Header: " + str(header)) self._csv_writer.writerow(header) def run(): try: self._running_experiment.steps(self._ctx) except ExperimentStoppedByUser: logger.info("experiment stopped by user") except Exception as e: # print error info self._ctx.log(f"Python error occured: {e}") logger.exception("Python error occured during experiment") self._ui.show_error(f"Python error occured: {e}") finally: logger.debug("running_experiment finished, waiting 1sec") time.sleep(1) self._running = False self._ui.update_state("stopped") logger.info("running_experiment finished") self._started_time = time.perf_counter() logger.debug(f"started_time: {self._started_time}") self._running = True self._experiment_thread = Thread(target=run) self._experiment_thread.daemon = True self._experiment_thread.start() logger.info(f"experiment thread started") def _stop(self): logger.debug(f"stopping experiment") self._ui.update_state("stopping") self._running = False if self._experiment_thread is not None: logger.info(f"joining experiment thread") self._experiment_thread.join() self._ui.update_state("stopped") if self._file is not None: self._file.close() self._file = None if self._log_file is not None: self._log_file.close() self._log_file = None logger.debug(f"stopped experiment") def _get_t(self) -> float: return time.perf_counter() - self._started_time # ExperimentContextDelegate
[ドキュメント] def experiment_ctx_delegate_send_row(self, row): row = copy.copy(row) row["t"] = self._get_t() row["time"] = datetime.datetime.now() self._ui.data_queue.put(row) # write to file cols = ["t", "time"] + self._running_experiment.columns row_list = [row.get(col) for col in cols] self._csv_writer.writerow(row_list)
[ドキュメント] def experiment_ctx_delegate_send_log(self, message): t = self._get_t() time = datetime.datetime.now() self._ui.log_queue.put({ "t": t, "time": time, "message": message, }) self._log_file.write(f"{time} t={t}: {message}\n")
# TODO: write to file
[ドキュメント] def experiment_ctx_delegate_get_t(self) -> float: return self._get_t()
[ドキュメント] def experiment_ctx_delegate_get_options(self) -> dict: return self._options
[ドキュメント] def experiment_ctx_delegate_loop(self) -> None: if not self._running: raise ExperimentStoppedByUser()
# ExperimentUIDelegate
[ドキュメント] def handle_ui_start(self, experiment: Type[ExperimentProtocol]): self._run(experiment)
[ドキュメント] def handle_ui_stop(self): self._stop()
def __del__(self): if self._file is not None: self._file.close()