# sample of multi-thread measuremnt & plotting
import queue
import inspect
import copy
import sys
from threading import Thread
import time
import datetime
from typing import Optional, List
import os
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
import pandas as pd
[ドキュメント]class Plotter:
[ドキュメント] def prepare(self):
raise NotImplementedError("Plotter::prepare() must be implemented")
[ドキュメント] def update(self, df: pd.DataFrame):
raise NotImplementedError("Plotter::update() must be implemented")
[ドキュメント]class Experiment:
started_time: float # in sec
running = False
columns = None
filename = "experiment"
_plotter: Optional[Plotter] = None
_data_queue: queue.Queue
_data = []
_experiment_thread = None
_completed = False
@property
def plotter(self) -> Optional[Plotter]:
return self._plotter
@plotter.setter
def plotter(self, plotter: Optional[Plotter]):
if self.running:
raise RuntimeError("Plotter cannnot be set during experiment!!")
self._plotter = plotter
def _get_data_from_queue(self):
d = []
while True:
try:
d.append(self._data_queue.get(False))
except queue.Empty:
return d
def _write_to_file(self, row):
row_list = [str(row["t"])]
for col in self.columns:
if col in row:
row_list.append(str(row[col]))
else:
row_list.append("")
self._f.write(",".join(row_list)+ "\n")
print(",\t".join(row_list))
def _main_loop(self):
while self._experiment_thread is not None and self._experiment_thread.is_alive():
# get data from experiment thread
data = self._get_data_from_queue()
for d in data:
self._data.append(d)
self._write_to_file(d)
if self._plotter is not None:
if len(self._data) > 0:
df = pd.DataFrame(self._data)
self._plotter.update(df)
plt.gcf().canvas.draw_idle()
plt.gcf().canvas.flush_events()
[ドキュメント] def start(self):
if self.columns is None:
raise NotImplementedError("Experiment::columns is not specified")
if self._plotter is not None:
self._plotter.prepare()
plt.pause(0.01)
self.running = True
self.started_time = time.perf_counter()
self._data_queue = queue.Queue()
# start experiment thread
def run():
self.steps()
time.sleep(1)
self._completed = True
self._experiment_thread = Thread(target=run)
self._experiment_thread.daemon = True
self._experiment_thread.start()
dir = "data"
if not os.path.exists(dir):
os.mkdir(dir)
filename = dir + "/" + self.filename + "-" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ".csv"
with open(filename, "w") as self._f:
# write headers
header = ["t", "time"] + self.columns
self._f.write(",".join(header) + "\n")
print(",\t".join(header))
try:
self._main_loop()
# show plot in successful exit
if self._completed:
print("Measurement completed.", file=sys.stderr)
if self._plotter is not None:
if len(self._data) > 0:
df = pd.DataFrame(self._data)
self._plotter.update(df)
plt.show()
finally:
self.running = False
plt.close()
[ドキュメント] def get_t(self):
return time.perf_counter() - self.started_time
[ドキュメント] def send_row(self, row, *, capture: List[str] = None):
row = copy.copy(row)
row["t"] = self.get_t()
row["time"] = time.time()
if capture:
frame = inspect.currentframe()
try:
local_vars = frame.f_back.f_locals
finally:
del frame
for var in capture:
if var in row:
raise ValueError(f"Duplicate key: `{var}` is defined in row and capture.")
if var not in local_vars:
raise RuntimeError(f"No local variable `{var}` is found.")
row[var] = local_vars[var]
self._data_queue.put(row)
[ドキュメント] def steps(self):
raise NotImplementedError("Experiment::steps() must be implemented")