Source code for qurry.qurrium.multimanager.multimanager

"""MultiManager - The manager of multiple experiments.
(:mod:`qurry.qurrium.multimanager.multimanager`)"""

import os
import gc
import shutil
import tarfile
import warnings
from pathlib import Path
from typing import Union, Optional, Any, Type, Generic
from uuid import uuid4
from multiprocessing import get_context

from qiskit.providers import Backend

from .arguments import MultiCommonparams, PendingStrategyLiteral, PendingTargetProviderLiteral
from .beforewards import Before
from .afterwards import After
from .process import datetimedict_process
from .utils import experiment_writer, multimanager_report_naming
from ..utils.chunk import very_easy_chunk_size
from ..container import ExperimentContainer, QuantityContainer, _E
from ..utils.iocontrol import naming, IOComplex
from ...tools import qurry_progressbar, GeneralSimulator, DatetimeDict, DEFAULT_POOL_SIZE
from ...capsule import quickJSON, DEFAULT_ENCODING, DEFAULT_MODE, DEFAULT_INDENT
from ...capsule.mori import TagList, GitSyncControl
from ...declare import BaseRunArgs, AnalyzeArgs, SpecificAnalsisArgs
from ...exceptions import QurryResetAccomplished, QurryResetSecurityActivated


[docs] class MultiManager(Generic[_E]): """The manager of multiple experiments.""" __name__ = "MultiManager" multicommons: MultiCommonparams beforewards: Before afterwards: After quantity_container: QuantityContainer[tuple[str, ...]] """The container of quantity.""" exps: ExperimentContainer[_E] """The experiments container.""" _unexports: list[str] = ["allCounts", "retrievedResult"] """The content would not be exported.""" _not_sync = ["allCounts", "retrievedResult"] """The content would not be synchronized.""" after_lock: bool = False """Protect the `afterward` content to be overwritten. When setitem is called and completed, it will be setted as `False` automatically. """ mute_auto_lock: bool = False """Whether mute the auto-lock message.""" qurryinfo: dict[str, dict[str, str]] = {} """The qurryinfo of the multi-experiment. This is a dictionary with experiment IDs as keys, and the values are dictionaries containing the exported information. """
[docs] def reset_afterwards( self, *args, security: bool = False, mute_warning: bool = False, ) -> None: """Reset the measurement and release memory for overwrite. Args: security (bool, optional): Security for reset. Defaults to `False`. mute_warning (bool, optional): Mute warning. Defaults to `False`. """ if len(args) > 0: raise ValueError("Use '.reset(security=True)' to reset the afterwards.") if security and isinstance(security, bool): self.afterwards = self.afterwards._replace(retrievedResult=TagList(), allCounts={}) gc.collect() if not mute_warning: warnings.warn("Afterwards reset accomplished.", QurryResetAccomplished) else: warnings.warn( "Reset does not execute to prevent executing accidentally, " + "if you are sure to do this, then use '.reset(security=True)'.", QurryResetSecurityActivated, )
[docs] def clear_all_exps_result( self, *args, security: bool = False, mute_warning: bool = False, ) -> None: """Clear the result of all experiments. Args: security (bool, optional): Security for clearing. Defaults to `False`. mute_warning (bool, optional): Mute the warning when clearing. Defaults to `False`. """ if len(args) > 0: raise ValueError("Use '.clear_all_exps_result(security=True)' to clear all results.") if security and isinstance(security, bool): for exp in qurry_progressbar( self.exps.values(), desc="Clear jobs result...", bar_format="qurry-barless", ): exp.afterwards.clear_result(security=security, mute_warning=True) if not mute_warning: warnings.warn( "All experiments' results are cleared.", QurryResetAccomplished, ) else: warnings.warn( "Reset does not execute to prevent executing accidentally, " + "if you are sure to do this, then use '.reset(security=True)'.", QurryResetSecurityActivated, )
def __getitem__(self, key) -> Any: if key in self.beforewards._fields: return getattr(self.beforewards, key) if key in self.afterwards._fields: return getattr(self.afterwards, key) raise KeyError( f"{key} is not a valid field of '{Before.__name__}' and '{After.__name__}'." + f" Valid fields are {list(self.beforewards._fields) + list(self.afterwards._fields)}." ) @property def summoner_id(self) -> str: """ID of experiment of the MultiManager.""" return self.multicommons.summoner_id @property def id(self) -> str: """ID of experiment of the MultiManager.""" return self.multicommons.summoner_id def __hash__(self): """Hash the MultiManager by its ID.""" return hash(self.multicommons.summoner_id) @property def summoner_name(self) -> str: """Name of experiment of the MultiManager.""" return self.multicommons.summoner_name @property def name(self) -> str: """Name of experiment of the MultiManager.""" return self.multicommons.summoner_name def __init__( self, naming_complex: IOComplex, multicommons: MultiCommonparams, beforewards: Before, afterwards: After, quantity_container: QuantityContainer, outfields: dict[str, Any], gitignore: Optional[Union[GitSyncControl, list[str]]] = None, ): """Initialize the multi-experiment.""" if gitignore is None: self.gitignore = GitSyncControl() elif isinstance(gitignore, list): self.gitignore = GitSyncControl(gitignore) elif isinstance(gitignore, GitSyncControl): self.gitignore = gitignore else: raise ValueError(f"gitignore must be list or GitSyncControl, not {type(gitignore)}.") self.exps = ExperimentContainer() self.naming_complex = naming_complex self.multicommons = multicommons self.beforewards = beforewards self.afterwards = afterwards self.quantity_container = quantity_container self.outfields = outfields def __repr__(self): return ( f"<{self.__name__}(" + f'id="{self.multicommons.summoner_id}", ' + f'name="{self.multicommons.summoner_name}", ' + f"tags={self.multicommons.tags}, " + f'jobstype="{self.multicommons.jobstype}", ' + f'pending_strategy="{self.multicommons.pending_strategy}", ' + f"last_events={dict(self.multicommons.datetimes.last_events(3))}, " + f"exps_num={len(self.beforewards.exps_config)})>" ) def _repr_oneline(self): return ( f"<{self.__name__}(" + f'id="{self.multicommons.summoner_id}", ' + f'name="{self.multicommons.summoner_name}", ' + f'jobstype="{self.multicommons.jobstype}", ..., ' + f"exps_num={len(self.beforewards.exps_config)})>" ) def _repr_oneline_no_id(self): return ( f"<{self.__name__}(" + f'name="{self.multicommons.summoner_name}", ' + f'jobstype="{self.multicommons.jobstype}", ..., ' + f"exps_num={len(self.beforewards.exps_config)})>" ) def _repr_pretty_(self, p, cycle): max_events = 5 if cycle: p.text( f"<{self.__name__}(" + f'id="{self.multicommons.summoner_id}", ' + f'name="{self.multicommons.summoner_name}", ' + f'jobstype="{self.multicommons.jobstype}", ..., ' + f"exps_num={len(self.beforewards.exps_config)})>" ) else: with p.group(2, f"<{type(self).__name__}(", ")>"): p.text(f'id="{self.multicommons.summoner_id}",') p.breakable() p.text(f'name="{self.multicommons.summoner_name}",') p.breakable() p.text(f"tags={self.multicommons.tags},") p.breakable() p.text(f'jobstype="{self.multicommons.jobstype}",') p.breakable() p.text(f'pending_strategy="{self.multicommons.pending_strategy}",') p.breakable() p.text("last_events={") if len(self.multicommons.datetimes) > max_events: p.breakable() p.text(" ...,") for k, v in self.multicommons.datetimes.last_events(max_events): p.breakable() p.text(f" '{k}': '{v}',") p.text("},") p.breakable() p.text(f"exps_num={len(self.beforewards.exps_config)}")
[docs] def register( self, current_id: str, config: dict[str, Any], exps_instance: _E, ): """Register the experiment to multimanager. Args: current_id (str): ID of experiment. config (dict[str, Any]): The config of experiment. exps_instance (ExperimentPrototype): The instance of experiment. """ assert exps_instance.commons.exp_id == current_id, ( f"ID is not consistent, exp_id: {exps_instance.commons.exp_id} and " + f"current_id: {current_id}." ) self.beforewards.exps_config[current_id] = config self.beforewards.circuits_num[current_id] = len(exps_instance.beforewards.circuit) self.beforewards.job_taglist[exps_instance.commons.tags].append(current_id) assert isinstance(exps_instance.commons.serial, int), ( f"Serial is not int, exp_id: {exps_instance.commons.exp_id} and " + f"serial: {exps_instance.commons.serial}." + "It should be int." ) self.beforewards.index_taglist[exps_instance.commons.tags].append( exps_instance.commons.serial ) self.exps[current_id] = exps_instance
[docs] @classmethod def build( cls, config_list: list[dict[str, Any]], experiment_instance: Type[_E], summoner_name: Optional[str] = None, shots: Optional[int] = None, backend: Backend = GeneralSimulator(), tags: Optional[tuple[str, ...]] = None, manager_run_args: Optional[Union[BaseRunArgs, dict[str, Any]]] = None, jobstype: PendingTargetProviderLiteral = "local", pending_strategy: PendingStrategyLiteral = "tags", # save parameters save_location: Union[Path, str] = Path("./"), skip_writing: bool = False, multiprocess_build: bool = False, multiprocess_write: bool = False, ) -> "MultiManager[_E]": """Build the multi-experiment. Args: config_list (list[dict[str, Any]]): The list of config of experiments. This config is used to build the experiments. experiment_instance (ExperimentPrototype): The instance of experiment. summoner_name (Optional[str], optional): Name of experiment of the :class:`MultiManager`. Defaults to None. shots (Optional[int], optional): The shots of experiments. Defaults to None. backend (Backend, optional): The backend of experiments. Defaults to GeneralSimulator(). tags (Optional[tuple[str, ...]], optional): The tags of experiments. Defaults to None. manager_run_args (Optional[Union[BaseRunArgs, dict[str, Any]]], optional): The arguments of manager run. Defaults to None. jobstype (PendingTargetProviderLiteral, optional): The jobstype of experiments. Defaults to "local". pending_strategy (PendingStrategyLiteral, optional): The pending strategy of experiments. Defaults to "tags". save_location (Union[Path, str], optional): Location of saving experiment. Defaults to Path("./"). skip_writing (bool, optional): Whether skip writing. Defaults to False. multiprocess_build (bool, optional): Whether use multiprocess for building. Defaults to False. multiprocess_write (bool, optional): Whether use multiprocess for writing. Defaults to False. Returns: MultiManager: The container of experiments and multi-experiment. """ if summoner_name is None: summoner_name = "multiexps" if tags is None: tags = () if manager_run_args is None: manager_run_args = {} naming_complex = naming( exps_name=summoner_name, save_location=save_location, ) multicommons, outfields = MultiCommonparams.build( { "summoner_id": str(uuid4()), "summoner_name": naming_complex.expsName, "tags": tags, "shots": shots, "backend": backend, "save_location": naming_complex.save_location, "export_location": naming_complex.export_location, "files": {}, "jobstype": jobstype, "pending_strategy": pending_strategy, "manager_run_args": manager_run_args, "filetype": "json", "datetimes": DatetimeDict(), "outfields": {}, } ) assert ( naming_complex.save_location == multicommons.save_location ), "| save_location is not consistent with namingCpx.save_location." current_multimanager = cls( naming_complex=naming_complex, multicommons=multicommons, beforewards=Before( exps_config={}, circuits_num={}, circuits_map=TagList(), pending_pool=TagList(), job_id=[], job_taglist=TagList(), index_taglist=TagList(), ), afterwards=After( retrievedResult=TagList(), allCounts={}, ), quantity_container=QuantityContainer(), outfields=outfields, ) initial_config_list: list[dict[str, Any]] = [] for serial, config in enumerate(config_list): config.pop("export", None) config.pop("pbar", None) config.pop("multiprocess", None) initial_config_list.append( { **config, "shots": config.get("shots", shots), "backend": backend, "exp_name": current_multimanager.multicommons.summoner_name, "save_location": current_multimanager.multicommons.save_location, "serial": serial, "summoner_id": current_multimanager.multicommons.summoner_id, "summoner_name": current_multimanager.multicommons.summoner_name, } ) if multiprocess_build: chunks_num = very_easy_chunk_size( tasks_num=len(initial_config_list), num_process=DEFAULT_POOL_SIZE, max_chunk_size=min(max(1, len(initial_config_list) // DEFAULT_POOL_SIZE), 20), ) pool = get_context("spawn").Pool(processes=DEFAULT_POOL_SIZE, maxtasksperchild=4) with pool as p: exps_iterable = qurry_progressbar( p.imap_unordered( experiment_instance.build_for_multiprocess, initial_config_list, chunksize=chunks_num, ), total=len(initial_config_list), desc="MultiManager building...", ) exps_iterable.set_description_str( f"Loading {len(initial_config_list)} experiments..." ) for new_exps, config in exps_iterable: current_multimanager.register( current_id=new_exps.commons.exp_id, config=config, exps_instance=new_exps, ) exps_iterable.set_description_str( f"Loading {len(initial_config_list)} experiments done" ) else: exps_iterable = qurry_progressbar( ( (experiment_instance.build(multiprocess=True, **config), config) for config in initial_config_list ), total=len(initial_config_list), desc="MultiManager building...", ) for new_exps, config in exps_iterable: current_multimanager.register( current_id=new_exps.commons.exp_id, config=config, exps_instance=new_exps, ) if not skip_writing: current_multimanager.write(multiprocess=multiprocess_write) return current_multimanager
[docs] @classmethod def read( cls, summoner_name: str, experiment_instance: Type[_E], save_location: Union[Path, str] = Path("./"), is_read_or_retrieve: bool = False, read_from_tarfile: bool = False, ) -> "MultiManager[_E]": """Read the multi-experiment. Args: experiment_instance (ExperimentPrototype): The instance of experiment. summoner_name (Optional[str], optional): Name of experiment of the :class:`MultiManager`. Defaults to None. save_location (Union[Path, str], optional): Location of saving experiment. Defaults to Path("./"). is_read_or_retrieve (bool, optional): Whether read or retrieve. Defaults to False. read_from_tarfile (bool, optional): Whether read from tarfile. Defaults to False. Returns: MultiManager: The container of experiments and multi-experiment. """ naming_complex = naming( is_read=is_read_or_retrieve, exps_name=summoner_name, save_location=save_location, ) gitignore = GitSyncControl() gitignore.load(naming_complex.export_location) multiconfig_name_v5 = ( naming_complex.export_location / f"{naming_complex.expsName}.multiConfig.json" ) multiconfig_name_v7 = naming_complex.export_location / "multi.config.json" if naming_complex.tarLocation.exists(): print( f"| Found the tarfile '{naming_complex.tarName}' " + f"in '{naming_complex.save_location}', decompressing is available." ) if (not multiconfig_name_v5.exists()) and (not multiconfig_name_v7.exists()): print( "| No multi.config file found, " + f"decompressing all files in the tarfile '{naming_complex.tarName}'." ) cls.easydecompress(naming_complex) elif read_from_tarfile: print( f"| Decompressing all files in the tarfile '{naming_complex.tarName}'" + f", replace all files in '{naming_complex.export_location}'." ) cls.easydecompress(naming_complex) if multiconfig_name_v5.exists(): print("| Found the multiConfig.json, reading in 'v5' file structure.") raw_multiconfig = MultiCommonparams.rawread( mutlticonfig_name=multiconfig_name_v5, save_location=naming_complex.save_location, export_location=naming_complex.export_location, ) files: dict[str, Union[str, dict[str, str]]] = raw_multiconfig["files"] old_files = raw_multiconfig["files"].copy() beforewards = Before.read( export_location=naming_complex.export_location, file_location=files, version="v5", ) afterwards = After( retrievedResult=TagList(), allCounts={}, ) quantity_container = QuantityContainer() assert isinstance(files["tagMapQuantity"], dict), "Quantity must be dict." for qk in files["tagMapQuantity"].keys(): quantity_container.read( key=qk, name=f"{naming_complex.expsName}.{qk}", save_location=naming_complex.export_location, version="v5", ) elif multiconfig_name_v7.exists(): raw_multiconfig = MultiCommonparams.rawread( mutlticonfig_name=multiconfig_name_v7, save_location=naming_complex.save_location, export_location=naming_complex.export_location, ) files = raw_multiconfig["files"] old_files = {} beforewards = Before.read(export_location=naming_complex.export_location, version="v7") afterwards = After( retrievedResult=TagList(), allCounts={}, ) quantity_container = QuantityContainer() assert isinstance(files["quantity"], dict), "Quantity must be dict." for qk in files["quantity"].keys(): quantity_container.read( key=qk, save_location=naming_complex.export_location, name=f"{qk}", ) else: print(f"| v5: {multiconfig_name_v5}") print(f"| v7: {multiconfig_name_v7}") raise FileNotFoundError( f"Can't find the multi.config file in '{naming_complex.expsName}'." ) multicommons, outfields = MultiCommonparams.build(raw_multiconfig) datetimedict_process( multicommons=multicommons, naming_complex=naming_complex, multiconfig_name_v5=multiconfig_name_v5, multiconfig_name_v7=multiconfig_name_v7, is_read_or_retrieve=is_read_or_retrieve, read_from_tarfile=read_from_tarfile, old_files=old_files, ) assert ( naming_complex.save_location == multicommons.save_location ), "| save_location is not consistent with namingCpx.save_location." current_multimanager = cls( naming_complex=naming_complex, multicommons=multicommons, beforewards=beforewards, afterwards=afterwards, quantity_container=quantity_container, outfields=outfields, gitignore=gitignore, ) if multiconfig_name_v5.exists(): print( f"| {current_multimanager.naming_complex.expsName} auto-export " + 'in "v7" format and remove "v5" format.' ) current_multimanager.write() remove_v5_progress = qurry_progressbar( old_files.items(), bar_format="| {percentage:3.0f}%[{bar}] - remove v5 - {desc} - {elapsed}", ) for k, pathstr in remove_v5_progress: if isinstance(pathstr, str): remove_v5_progress.set_description_str(f"{k}") path = Path(pathstr) if path.exists(): path.unlink() elif isinstance(pathstr, dict): for k2, pathstr2 in pathstr.items(): remove_v5_progress.set_description_str(f"{k} - {k2}") path = Path(pathstr2) if path.exists(): path.unlink() reading_results: list[_E] = experiment_instance.read( # type: ignore save_location=current_multimanager.multicommons.save_location, name_or_id=current_multimanager.multicommons.summoner_name, ) for read_exps in reading_results: current_multimanager.exps[read_exps.commons.exp_id] = read_exps return current_multimanager
[docs] def update_save_location( self, save_location: Union[Path, str], without_serial: bool = True, ) -> dict[str, Any]: """Update the save location of the multi-experiment. Args: save_location (Union[Path, str]): Location of saving experiment. without_serial (bool, optional): Whether without serial number. Defaults to True. Returns: dict[str, Any]: The dict of multiConfig. """ save_location = Path(save_location) self.naming_complex = naming( without_serial=without_serial, exps_name=self.multicommons.summoner_name, save_location=save_location, ) self.multicommons = self.multicommons._replace( save_location=self.naming_complex.save_location, export_location=self.naming_complex.export_location, ) return self.naming_complex._asdict()
def _write_multiconfig(self) -> dict[str, Any]: multiconfig_name = Path(self.multicommons.export_location) / "multi.config.json" self.multicommons.files["multi.config"] = str(multiconfig_name) self.gitignore.sync("multi.config.json") multiconfig = { **self.multicommons._asdict(), "outfields": self.outfields, "files": self.multicommons.files, } quickJSON( content=multiconfig, filename=multiconfig_name, mode=DEFAULT_MODE, jsonable=True, encoding=DEFAULT_ENCODING, mute=True, ) return multiconfig
[docs] def write( self, save_location: Optional[Union[Path, str]] = None, export_transpiled_circuit: bool = False, skip_before_and_after: bool = False, skip_exps: bool = False, skip_quantities: bool = False, multiprocess: bool = False, ) -> dict[str, Any]: """Export the multi-experiment. Args: save_location (Union[Path, str], optional): Location of saving experiment. Defaults to None. export_transpiled_circuit (bool, optional): Export the transpiled circuit. Defaults to False. skip_before_and_after (bool, optional): Skip the beforewards and afterwards. Defaults to False. skip_exps (bool, optional): Skip the experiments. Defaults to False. skip_quantities (bool, optional): Skip the quantities container. Defaults to False. multiprocess (bool, optional): Whether to use multiprocess for exporting. Defaults to False. Returns: dict[str, Any]: The dict of multiConfig. """ print("| Export multimanager...") if save_location is None: save_location = self.multicommons.save_location else: self.update_save_location(save_location=save_location) self.gitignore.ignore("*.json") self.gitignore.sync("qurryinfo.json") if not os.path.exists(save_location): os.makedirs(save_location) if not os.path.exists(self.multicommons.export_location): os.makedirs(self.multicommons.export_location) self.gitignore.export(self.multicommons.export_location) # pylint: disable=protected-access exporting_name = self.afterwards._exporting_name() exporting_name.update(self.beforewards._exporting_name()) # pylint: enable=protected-access export_progress = qurry_progressbar( self.beforewards._fields + self.afterwards._fields, desc="Exporting MultiManager content...", bar_format="qurry-barless", ) # beforewards amd afterwards for i, k in enumerate(export_progress): if skip_before_and_after or (k in self._unexports): export_progress.set_description_str(f"{k} as {exporting_name[k]} - skip") elif isinstance(self[k], TagList): export_progress.set_description_str(f"{k} as {exporting_name[k]}") tmp: TagList = self[k] filename = tmp.export( name=None, save_location=self.multicommons.export_location, taglist_name=f"{exporting_name[k]}", ) self.multicommons.files[exporting_name[k]] = str(filename) self.gitignore.sync(f"{exporting_name[k]}.json") elif isinstance(self[k], (dict, list)): export_progress.set_description_str(f"{k} as {exporting_name[k]}") filename = Path(self.multicommons.export_location) / f"{exporting_name[k]}.json" self.multicommons.files[exporting_name[k]] = str(filename) if k not in self._not_sync: self.gitignore.sync(f"{exporting_name[k]}.json") quickJSON( content=self[k], filename=filename, mode=DEFAULT_MODE, jsonable=True, indent=DEFAULT_INDENT, encoding=DEFAULT_ENCODING, mute=True, ) else: warnings.warn(f"'{k}' is type '{type(self[k])}' which is not supported to export.") if i == len(export_progress) - 1: export_progress.set_description_str("Exporting done") # tagMapQuantity or quantity if not skip_quantities: self.multicommons.files["quantity"] = self.quantity_container.write( save_location=self.multicommons.export_location ) self.gitignore.sync("*.quantity.json") # multiConfig multiconfig = self._write_multiconfig() print(f"| Export multi.config.json for {self.summoner_id}") # gitignore self.gitignore.export(self.multicommons.export_location) # experiments if not skip_exps: self.qurryinfo.clear() self.qurryinfo.update( experiment_writer( experiment_container=self.exps, beforewards=self.beforewards, multicommons=self.multicommons, export_transpiled_circuit=export_transpiled_circuit, multiprocess=multiprocess, ) ) return multiconfig
[docs] def compress( self, compress_overwrite: bool = False, remain_only_compressed: bool = False, ) -> Path: """Compress the export_location to tar.xz. Args: compress_overwrite (bool, optional): Reproduce all the compressed files. Defaults to False. remain_only_compressed (bool, optional): Remove uncompressed files. Defaults to False. Returns: Path: Path of the compressed file. """ if remain_only_compressed: self.multicommons.datetimes.add_serial("uncompressedRemove") self._write_multiconfig() print(f"| Compress multimanager of '{self.naming_complex.expsName}'...", end="\r") loc = self.easycompress(overwrite=compress_overwrite) print(f"| Compress multimanager of '{self.naming_complex.expsName}'...done") if remain_only_compressed: print( f"| Remove uncompressed files in '{self.naming_complex.export_location}' ...", end="\r", ) shutil.rmtree(self.multicommons.export_location) print(f"| Remove uncompressed files in '{self.naming_complex.export_location}' ...done") return loc
[docs] def analyze( self, analysis_name: str = "report", no_serialize: bool = False, specific_analysis_args: SpecificAnalsisArgs = None, **analysis_args: Union[dict[str, Any], AnalyzeArgs], ) -> str: """Analyze the experiments. Args: exps_container (ExperimentContainer[_ExpInst]): The container of experiments. analysis_name (str, optional): The name of analysis. Defaults to "report". no_serialize (bool, optional): Whether serialize the analysis. Defaults to False. specific_analysis_args (SpecificAnalsisArgs, optional): The specific analysis arguments. Defaults to None. **analysis_args (Union[dict[str, Any], AnalyzeArgs]): The arguments of analysis. Returns: str: The name of analysis. """ if specific_analysis_args is None: specific_analysis_args = {} counts_check = [ k for k in self.beforewards.circuits_map.keys() if len(self.exps[k].afterwards.counts) == 0 ] if len(counts_check) > 0: raise ValueError( f"Counts of {len(counts_check)} experiments are empty, " + f"please check them before analysis: {counts_check}." ) name = multimanager_report_naming(analysis_name, no_serialize, self.quantity_container) self.quantity_container[name] = TagList() all_counts_progress = qurry_progressbar( self.beforewards.circuits_map.keys(), bar_format=("| {n_fmt}/{total_fmt} - Analysis: {desc} - {elapsed} < {remaining}"), ) for k in all_counts_progress: if k in specific_analysis_args: v_args = specific_analysis_args[k] if isinstance(v_args, bool): if v_args is False: all_counts_progress.set_description_str( f"Skipped {k} in {self.summoner_id}." ) continue report = self.exps[k].analyze( **analysis_args, **({"pbar": all_counts_progress}), ) else: report = self.exps[k].analyze( **v_args, **({"pbar": all_counts_progress}), ) else: report = self.exps[k].analyze( **analysis_args, **({"pbar": all_counts_progress}), ) main, _tales = report.export(jsonable=False) self.quantity_container[name][self.exps[k].commons.tags].append(main) self.multicommons.datetimes.add_only(name) return name
[docs] def remove_analysis(self, name: str): """Removes the analysis. Args: name (str): The name of the analysis. """ self.quantity_container.remove(name) print(f"| Removing analysis: {name}") self.multicommons.datetimes.add_only(f"{name}_remove")
[docs] def easycompress( self, overwrite: bool = False, ) -> Path: """Compress the export_location to tar.xz. Args: overwrite (bool, optional): Reproduce all the compressed files. Defaults to False. Returns: Path: Path of the compressed file. """ self.multicommons.datetimes.add_serial("compressed") self._write_multiconfig() is_exists = os.path.exists(self.naming_complex.tarLocation) if is_exists and overwrite: os.remove(self.naming_complex.tarLocation) with tarfile.open(self.naming_complex.tarLocation, "x:xz") as tar: tar.add( self.naming_complex.export_location, arcname=os.path.basename(self.naming_complex.export_location), ) else: with tarfile.open(self.naming_complex.tarLocation, "w:xz") as tar: tar.add( self.naming_complex.export_location, arcname=os.path.basename(self.naming_complex.export_location), ) return self.naming_complex.tarLocation
[docs] @classmethod def easydecompress( cls, naming_complex: IOComplex, ) -> Path: """Decompress the tar.xz file of experiment. Args: naming_complex (IOComplex): The naming complex of experiment. Returns: Path: Path of the decompressed file. """ with tarfile.open(naming_complex.tarLocation, "r:xz") as tar: tar.extractall(naming_complex.save_location) return naming_complex.tarLocation