Source code for qurry.qurrium.experiment.experiment

"""ExperimentPrototype - The instance of experiment (:mod:`qurry.qurrium.experiment.experiment`)"""

import os
import json
import warnings
from abc import abstractmethod, ABC
from typing import Union, Optional, Any, Type, Literal, Generic
from collections.abc import Hashable
from multiprocessing import get_context
from pathlib import Path
import tqdm

from qiskit import transpile, QuantumCircuit
from qiskit.providers import Backend, JobV1 as Job
from qiskit.transpiler.passmanager import PassManager

from .arguments import Commonparams, _A, create_exp_args, create_exp_commons, create_exp_outfields
from .beforewards import Before, create_beforewards
from .afterwards import After, create_afterwards
from .analyses import AnalysesContainer, _R
from .export import Export
from .utils import (
    exp_id_process,
    memory_usage_factor_expect,
    implementation_check,
    summonner_check,
    make_statesheet,
    create_save_location,
    decide_folder_and_filename,
)
from ..utils import get_counts_and_exceptions, qasm_dumps, outfields_check, outfields_hint
from ..utils.chunk import very_easy_chunk_size
from ...tools import (
    ParallelManager,
    DatetimeDict,
    set_pbar_description,
    backend_name_getter,
    DEFAULT_POOL_SIZE,
    qurry_progressbar,
    GeneralSimulator,
)
from ...capsule import quickJSON, DEFAULT_MODE, DEFAULT_ENCODING
from ...capsule.hoshi import Hoshi
from ...declare import RunArgsType, TranspileArgs
from ...exceptions import QurryResetSecurityActivated, QurryTranspileConfigurationIgnored


[docs] class ExperimentPrototype(ABC, Generic[_A, _R]): """The instance of experiment.""" __name__ = "ExperimentPrototype" """Name of the QurryExperiment which could be overwritten.""" @property @abstractmethod def arguments_instance(self) -> Type[_A]: """The arguments instance for this experiment.""" raise NotImplementedError("This method should be implemented.") @property @abstractmethod def analysis_instance(self) -> Type[_R]: """The analysis instance for this experiment.""" raise NotImplementedError("This method should be implemented.") @property def is_auto_analysis(self) -> bool: """Check if the experiment has auto analysis. Returns: bool: True if the experiment has auto analysis, False otherwise. """ return len(self.analysis_instance.input_type()._fields) == 0 @property def is_hold_by_multimanager(self) -> bool: """Check if the experiment is hold by a multimanager. Returns: bool: True if the experiment is hold by a multimanager, False otherwise. """ return summonner_check( self.commons.serial, self.commons.summoner_id, self.commons.summoner_name ) args: _A """The arguments of the experiment.""" commons: Commonparams """The common parameters of the experiment.""" outfields: dict[str, Any] """The outfields of the experiment.""" beforewards: Before """The beforewards of the experiment.""" afterwards: After """The afterwards of the experiment.""" memory_usage_factor: int = -1 """The factor of the memory usage of the experiment. When the experiment is created, it will be set to -1 for no measurement yet. When the experiment is built, it will be set to the memory usage of the experiment. The memory usage is estimated by the number of instructions in the circuits and the number of shots. The factor is calculated by the formula: .. code-block:: text factor = target_circuit_instructions_num + sqrt(shots) * target_circuit_instructions_num where `target_circuit_instructions_num` is the number of instructions in the target circuits, `transpiled_circuit_instructions_num` is the number of instructions in the circuits which has been transpiled and will be run on the backend, and `shots` is the number of shots. The factor is rounded to the nearest integer. The factor is used to estimate the memory usage of the experiment. """ def __init__( self, arguments: Union[_A, dict[str, Any]], commonparams: Union[Commonparams, dict[str, Any]], outfields: dict[str, Any], beforewards: Optional[Before] = None, afterwards: Optional[After] = None, reports: Optional[AnalysesContainer] = None, ) -> None: """Initialize the experiment. Args: arguments (Optional[Union[NamedTuple, dict[str, Any]]]): The arguments of the experiment. commonparams (Optional[Union[Commonparams, dict[str, Any]]]): The common parameters of the experiment. outfields (Optional[dict[str, Any]]): The outfields of the experiment. beforewards (Optional[Before], optional): The beforewards of the experiment. Defaults to None. afterwards (Optional[After], optional): The afterwards of the experiment. Defaults to None. reports (Optional[AnalysesContainer], optional): The reports of the experiment. Defaults to None. """ self.args, arguments_deprecated = create_exp_args(arguments, self.arguments_instance) self.commons, commonparams_deprecated = create_exp_commons(commonparams) self.outfields = create_exp_outfields(outfields) # Add deprecated arguments to outfields only if they are not empty if len(arguments_deprecated): self.outfields["arguments_deprecated"] = arguments_deprecated if len(commonparams_deprecated): self.outfields["commonparams_deprecated"] = commonparams_deprecated implementation_check(self.__name__, self.args, self.commons) summonner_check(self.commons.serial, self.commons.summoner_id, self.commons.summoner_name) self.beforewards = create_beforewards(beforewards) self.afterwards = create_afterwards(afterwards) self.reports: AnalysesContainer[_R] = ( reports if isinstance(reports, AnalysesContainer) else AnalysesContainer() ) """The reports of the experiment."""
[docs] @classmethod @abstractmethod def params_control( cls, targets: list[tuple[Hashable, QuantumCircuit]], exp_name: str, **custom_kwargs: Any ) -> tuple[_A, Commonparams, dict[str, Any]]: """Control the experiment's parameters. Args: targets (list[tuple[Hashable, QuantumCircuit]]): The circuits of the experiment. exp_name (str): Naming this experiment to recognize it when the jobs are pending to IBMQ Service. This name is also used for creating a folder to store the exports. custom_kwargs (Any): Other custom arguments. Raises: NotImplementedError: This method should be implemented. """ raise NotImplementedError("This method should be implemented.")
@classmethod def _params_control_core( cls, targets: list[tuple[Hashable, QuantumCircuit]], exp_id: Optional[str] = None, shots: int = 1024, backend: Optional[Backend] = None, exp_name: str = "experiment", run_args: RunArgsType = None, transpile_args: Optional[TranspileArgs] = None, # multimanager tags: Optional[tuple[str, ...]] = None, serial: Optional[int] = None, summoner_id: Optional[Hashable] = None, summoner_name: Optional[str] = None, # process tool mute_outfields_warning: bool = False, pbar: Optional[tqdm.tqdm] = None, **custom_kwargs: Any, ): """Control the experiment's general parameters. Args: targets (list[tuple[Hashable, QuantumCircuit]]): The circuits of the experiment. exp_id (Optional[str], optional): If input is `None`, then create an new experiment. If input is a existed experiment ID, then use it. Otherwise, use the experiment with given specific ID. Defaults to None. shots (int, optional): Shots of the job. Defaults to `1024`. backend (Optional[Backend], optional): The quantum backend. Defaults to None. exp_name (str, optional): The name of the experiment. Naming this experiment to recognize it when the jobs are pending to IBMQ Service. This name is also used for creating a folder to store the exports. Defaults to `'experiment'`. run_args (RunArgsType, optional): Arguments for :meth:`Backend.run`. Defaults to None. transpile_args (Optional[TranspileArgs], optional): Arguments of :func:`~qiskit.compiler.transpile`. Defaults to None. tags (Optional[tuple[str, ...]], optional): Given the experiment multiple tags to make a dictionary for recongnizing it. Defaults to None. serial (Optional[int], optional): Index of experiment in :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. **!!ATTENTION, this should only be used by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`!!** Defaults to None. summoner_id (Optional[Hashable], optional): ID of experiment of :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. **!!ATTENTION, this should only be used by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`!!** Defaults to None. summoner_name (Optional[str], optional): Name of experiment of :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. **!!ATTENTION, this should only be used by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`!!** Defaults to None. mute_outfields_warning (bool, optional): Mute the warning when there are unused arguments detected and stored in outfields. Defaults to False. pbar (Optional[tqdm.tqdm], optional): The progress bar for showing the progress of the experiment. Defaults to None. custom_kwargs (Any): Other custom arguments. Returns: ExperimentPrototype: The experiment. """ if run_args is None: run_args = {} if transpile_args is None: transpile_args = {} if backend is None: backend = GeneralSimulator() if tags is None: tags = () # Given parameters and default parameters set_pbar_description(pbar, "Prepaing parameters...") arguments, commonparams, outfields = cls.params_control( targets=targets, exp_id=exp_id_process(exp_id), shots=shots, backend=backend, run_args=run_args, transpile_args=transpile_args, exp_name=exp_name, tags=tags, save_location=Path("./"), serial=serial, summoner_id=summoner_id, summoner_name=summoner_name, datetimes=DatetimeDict(), **custom_kwargs, ) outfield_maybe, outfields_unknown = outfields_check( outfields, arguments._fields + commonparams._fields ) outfields_hint(outfield_maybe, outfields_unknown, mute_outfields_warning) set_pbar_description(pbar, "Create experiment instance... ") new_exps = cls(arguments, commonparams, outfields) assert isinstance(new_exps.commons.backend, Backend), "Require a valid backend." assert len(new_exps.beforewards.circuit) == 0, "New experiment should have no circuit." assert len(new_exps.beforewards.circuit_qasm) == 0, "New experiment should have no qasm." assert len(new_exps.afterwards.result) == 0, "New experiment should have no result." assert len(new_exps.afterwards.counts) == 0, "New experiment should have no counts." return new_exps
[docs] @classmethod @abstractmethod def method( cls, targets: list[tuple[Hashable, QuantumCircuit]], arguments: _A, pbar: Optional[tqdm.tqdm] = None, multiprocess: bool = True, ) -> tuple[list[QuantumCircuit], dict[str, Any]]: """The method to construct circuit. Where should be overwritten by each construction of new measurement. Args: targets (list[tuple[Hashable, QuantumCircuit]]): The circuits of the experiment. arguments (_Arg): The arguments of the experiment. pbar (Optional[tqdm.tqdm], optional): The progress bar for showing the progress of the experiment. Defaults to None. multiprocess (bool, optional): Whether to use multiprocessing. Defaults to `True`. Returns: tuple[list[QuantumCircuit], dict[str, Any]]: The circuits of the experiment and the outfields. """ raise NotImplementedError("This method should be implemented.")
[docs] @classmethod def build( cls, targets: list[tuple[Hashable, QuantumCircuit]], shots: int = 1024, backend: Optional[Backend] = None, exp_name: str = "experiment", run_args: RunArgsType = None, transpile_args: Optional[TranspileArgs] = None, passmanager_pair: Optional[tuple[str, PassManager]] = None, tags: Optional[tuple[str, ...]] = None, # multimanager serial: Optional[int] = None, summoner_id: Optional[Hashable] = None, summoner_name: Optional[str] = None, # process tool qasm_version: Literal["qasm2", "qasm3"] = "qasm3", export: bool = False, save_location: Optional[Union[Path, str]] = None, pbar: Optional[tqdm.tqdm] = None, multiprocess: bool = True, **custom_and_main_kwargs: Any, ): """Construct the experiment. Args: targets (list[tuple[Hashable, QuantumCircuit]]): The circuits of the experiment. shots (int, optional): Shots of the job. Defaults to `1024`. backend (Optional[Backend], optional): The quantum backend. Defaults to None. exp_name (str, optional): The name of the experiment. Naming this experiment to recognize it when the jobs are pending to IBMQ Service. This name is also used for creating a folder to store the exports. Defaults to `'experiment'`. run_args (RunArgsType, optional): Arguments for :meth:`Backend.run`. Defaults to None. transpile_args (Optional[TranspileArgs], optional): Arguments of :func:`~qiskit.compiler.transpile`. Defaults to None. passmanager_pair (Optional[tuple[str, PassManager]], optional): The passmanager pair for transpile. Defaults to None. tags (Optional[tuple[str, ...]], optional): Given the experiment multiple tags to make a dictionary for recongnizing it. Defaults to None. serial (Optional[int], optional): Index of experiment in :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. **!!ATTENTION, this should only be used by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`!!** Defaults to None. summoner_id (Optional[Hashable], optional): ID of experiment of :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. **!!ATTENTION, this should only be used by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`!!** Defaults to None. summoner_name (Optional[str], optional): Name of experiment of :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. **!!ATTENTION, this should only be used by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`!!** Defaults to None. qasm_version (Literal["qasm2", "qasm3"], optional): The export version of OpenQASM. Defaults to 'qasm3'. export (bool, optional): Whether to export the experiment. Defaults to False. save_location (Optional[Union[Path, str]], optional): The location to save the experiment. Defaults to None. pbar (Optional[tqdm.tqdm], optional): The progress bar for showing the progress of the experiment. Defaults to None. multiprocess (bool, optional): Whether to use multiprocessing. Defaults to `True`. custom_and_main_kwargs (Any): Other custom arguments. Returns: ExperimentPrototype: The experiment. """ # preparing set_pbar_description(pbar, "Parameter loading...") current_exp = cls._params_control_core( targets=targets, shots=shots, backend=backend, run_args=run_args, transpile_args=transpile_args, tags=tags, exp_name=exp_name, serial=serial, summoner_id=summoner_id, summoner_name=summoner_name, pbar=pbar, **custom_and_main_kwargs, ) if not isinstance(current_exp.commons.backend, Backend): if isinstance(backend, Backend): set_pbar_description(pbar, "Backend replacing...") current_exp.replace_backend(backend) else: raise ValueError( "No vaild backend to run, exisited backend: " + f"{current_exp.commons.backend} as type " + f"{type(current_exp.commons.backend)}, " + f"given backend: {backend} as type {type(backend)}." ) assert isinstance(current_exp.commons.backend, Backend), ( f"Invalid backend: {current_exp.commons.backend} as " + f"type {type(current_exp.commons.backend)}." ) # circuit set_pbar_description(pbar, "Circuit creating...") current_exp.beforewards.target.extend(targets) cirqs, side_prodict = current_exp.method( targets=targets, arguments=current_exp.args, pbar=pbar, multiprocess=multiprocess ) current_exp.beforewards.side_product.update(side_prodict) # qasm set_pbar_description(pbar, "Exporting OpenQASM string...") targets_keys, targets_values = zip(*targets) targets_keys: tuple[Hashable, ...] targets_values: tuple[QuantumCircuit, ...] if multiprocess: pool = ParallelManager() current_exp.beforewards.circuit_qasm.extend( pool.starmap(qasm_dumps, [(q, qasm_version) for q in cirqs]) ) current_exp.beforewards.target_qasm.extend( zip( [str(k) for k in targets_keys], pool.starmap(qasm_dumps, [(q, qasm_version) for q in targets_values]), ) ) else: current_exp.beforewards.circuit_qasm.extend( [qasm_dumps(q, qasm_version) for q in cirqs] ) current_exp.beforewards.target_qasm.extend( zip( [str(k) for k in targets_keys], [qasm_dumps(q, qasm_version) for q in targets_values], ) ) # transpile if passmanager_pair is not None: passmanager_name, passmanager = passmanager_pair set_pbar_description( pbar, f"Circuit transpiling by passmanager '{passmanager_name}'..." ) transpiled_circs = passmanager.run( circuits=cirqs, num_processes=None if multiprocess else 1 # type: ignore ) if len(current_exp.commons.transpile_args) > 0: warnings.warn( f"Passmanager '{passmanager_name}' is given, " + f"the transpile_args will be ignored in '{current_exp.exp_id}'", category=QurryTranspileConfigurationIgnored, ) else: set_pbar_description(pbar, "Circuit transpiling...") transpile_args = current_exp.commons.transpile_args.copy() transpile_args.pop("num_processes", None) transpiled_circs: list[QuantumCircuit] = transpile( cirqs, backend=current_exp.commons.backend, num_processes=None if multiprocess else 1, **current_exp.commons.transpile_args, ) set_pbar_description(pbar, "Circuit loading...") current_exp.beforewards.circuit.extend(transpiled_circs) # memory usage factor current_exp.memory_usage_factor = memory_usage_factor_expect( target=current_exp.beforewards.target, circuits=current_exp.beforewards.circuit, commonparams=current_exp.commons, ) # commons note_and_date = current_exp.commons.datetimes.add_only("build") set_pbar_description( pbar, f"Building Completed, denoted '{note_and_date[0]}' date: {note_and_date[1]}..." ) # export may be slow, consider export at finish or something if isinstance(save_location, (Path, str)) and export: set_pbar_description(pbar, "Setup data exporting...") current_exp.write(save_location=save_location) return current_exp
[docs] @classmethod def build_for_multiprocess(cls, config: dict[str, Any]): """Build wrapper for multiprocess. Args: config (dict[str, Any]): The arguments of the experiment. Returns: ExperimentPrototype: The experiment. """ config.pop("multiprocess", None) config.pop("pbar", None) config["multiprocess"] = False return cls.build(**config), config
# local execution
[docs] def run(self, pbar: Optional[tqdm.tqdm] = None) -> str: """Export the result after running the job. Args: pbar (Optional[tqdm.tqdm], optional): The progress bar for showing the progress of the experiment. Defaults to None. Raises: ValueError: No circuit ready. ValueError: The circuit has not been constructed yet. Returns: str: The ID of the experiment. """ if len(self.beforewards.circuit) == 0: raise ValueError("The circuit has not been constructed yet.") assert isinstance(self.commons.backend, Backend), ( f"Current backend {self.commons.backend} needs to be backend not " + f"{type({self.commons.backend})}." ) assert hasattr(self.commons.backend, "run"), "Current backend is not runnable." set_pbar_description(pbar, "Executing...") event_name, date = self.commons.datetimes.add_serial("run") execution: Job = self.commons.backend.run( # type: ignore self.beforewards.circuit, shots=self.commons.shots, **self.commons.run_args ) # commons set_pbar_description(pbar, f"Executing completed '{event_name}', denoted date: {date}...") # beforewards self.beforewards.job_id.append(execution.job_id()) # afterwards result = execution.result() self.afterwards.result.append(result) return self.exp_id
[docs] def result( self, export: bool = False, save_location: Optional[Union[Path, str]] = None, pbar: Optional[tqdm.tqdm] = None, ) -> str: """Export the result of the experiment. Args: export (bool, optional): Whether to export the experiment. Defaults to False. save_location (Optional[Union[Path, str]], optional): The location to save the experiment. Defaults to None. pbar (Optional[tqdm.tqdm], optional): The progress bar for showing the progress of the experiment. Defaults to None. Returns: str: The ID of the experiment. """ if len(self.afterwards.result) == 0: raise ValueError("The job has not been executed yet.") assert len(self.afterwards.result) == 1, "The job has been executed more than once." set_pbar_description(pbar, "Result loading...") num = len(self.beforewards.circuit) counts, exceptions = get_counts_and_exceptions(result=self.afterwards.result[-1], num=num) if len(exceptions) > 0: if "exceptions" not in self.outfields: self.outfields["exceptions"] = {} for result_id, exception_item in exceptions.items(): self.outfields["exceptions"][result_id] = exception_item set_pbar_description(pbar, "Counts loading...") self.afterwards.counts.extend(counts) if self.is_auto_analysis: if self.is_hold_by_multimanager: set_pbar_description( pbar, "Auto running analysis will take over by " f"{self.commons.summoner_id}: " f"{self.commons.summoner_name} after all experiments are done.", ) else: set_pbar_description(pbar, "Running analysis for no input required...") self.analyze() if export: # export may be slow, consider export at finish or something if isinstance(save_location, (Path, str)): set_pbar_description(pbar, "Setup data exporting...") self.write(save_location=save_location) return self.exp_id
# remote execution def _remote_result_taking( self, counts_tmp_container: dict[int, dict[str, int]], summoner_id: str, idx_circs: list[int], retrieve_times_name: str, ) -> list[dict[str, int]]: """Take the result from remote execution. Args: counts_tmp_container (dict[int, dict[str, int]]): The counts temporary container. summoner_id (str): The summoner ID. idx_circs (list[int]): The index of circuits. retrieve_times_name (str): The retrieve times name. current (str): The current time. Returns: list[dict[str, int]]: The counts. """ if summoner_id == self.commons.summoner_id: self.afterwards.counts.clear() self.afterwards.result.clear() for idx in idx_circs: self.afterwards.counts.append(counts_tmp_container[idx]) self.commons.datetimes.add_only(retrieve_times_name) else: warnings.warn( f"Summoner ID {summoner_id} is not equal to" + f" current summoner ID {self.commons.summoner_id}. " + "The counts will not be updated.", category=QurryResetSecurityActivated, ) return self.afterwards.counts
[docs] def replace_backend(self, backend: Backend) -> None: """Replace the backend of the experiment. Args: backend (Backend): The new backend. Raises: ValueError: If the new backend is not a valid backend. ValueError: If the new backend is not a runnable backend. """ if not isinstance(backend, Backend): raise ValueError(f"Require a valid backend, but new backend: {backend} does not.") if not hasattr(backend, "run"): raise ValueError(f"Require a runnable backend, but new backend: {backend} does not.") old_backend = self.commons.backend old_backend_name = backend_name_getter(old_backend) new_backend_name = backend_name_getter(backend) self.commons.datetimes.add_serial(f"replace-{old_backend_name}-to-{new_backend_name}") self.commons = self.commons._replace(backend=backend)
def __getitem__(self, key) -> Any: if key in self.beforewards._fields: return getattr(self.beforewards, key) if key in self.afterwards._fields: return getattr(self.afterwards, key) raise KeyError( f"{key} is not a valid field of " + f"'{Before.__name__}' and '{After.__name__}'." ) # analysis
[docs] @classmethod @abstractmethod def quantities(cls) -> dict[str, Any]: """Computing specific squantity. Where should be overwritten by each construction of new measurement. """
[docs] @abstractmethod def analyze(self) -> _R: """Analyzing the example circuit results in specific method. Where should be overwritten by each construction of new measurement. If the analysis requires additional parameters, they should be passed as arguments to this method. Also, they should be defined in the :meth:`~qurry.qurrium.analysis.AnalysisPrototype.input_type` for :meth:`result` will count the input fields from the analysis to determine whether to call this method for no input required. Returns: _R: The result of the analysis. """ raise NotImplementedError("This method should be implemented.")
# show info def __hash__(self) -> int: return hash(self.commons.exp_id) @property def exp_id(self) -> str: """ID of experiment.""" return self.commons.exp_id def __repr__(self) -> str: return ( f"<{self.__name__}(exp_id={self.commons.exp_id}, {self.args}, {self.commons}, " f"unused_args_num={len(self.outfields)}, analysis_num={len(self.reports)})>" ) def _repr_no_id(self) -> str: return ( f"<{self.__name__}({self.args}, {self.commons}, " f"unused_args_num={len(self.outfields)}, analysis_num={len(self.reports)})>" ) def _repr_pretty_(self, p, cycle): if cycle: p.text( f"<{self.__name__}(exp_id={self.commons.exp_id}, {self.args}, {self.commons}, " f"unused_args_num={len(self.outfields)}, analysis_num={len(self.reports)})>" ) else: with p.group(2, f"<{self.__name__}(", ")>"): p.text(f"exp_id={self.commons.exp_id}, ") p.breakable() p.text(f"{self.args},") p.breakable() p.text(f"{self.commons},") p.breakable() p.text(f"unused_args_num={len(self.outfields)},") p.breakable() p.text(f"analysis_num={len(self.reports)})")
[docs] def statesheet(self, report_expanded: bool = False, hoshi: bool = False) -> Hoshi: """Show the state of experiment. Args: report_expanded (bool, optional): Show more infomation. Defaults to False. hoshi (bool, optional): Showing name of Hoshi. Defaults to False. Returns: Hoshi: Statesheet of experiment. """ return make_statesheet( exp_name=self.__name__, args=self.args, commons=self.commons, outfields=self.outfields, beforewards=self.beforewards, afterwards=self.afterwards, reports=self.reports, report_expanded=report_expanded, hoshi=hoshi, )
[docs] def export( self, save_location: Optional[Union[Path, str]] = None, export_transpiled_circuit: bool = False, ) -> Export: """Export the data of experiment into specific namedtuples for exporting. Args: save_location (Optional[Union[Path, str]], optional): The location to save the experiment. Defaults to None. export_transpiled_circuit (bool, optional): Whether to export the transpiled circuit as txt. Defaults to False. When set to True, the transpiled circuit will be exported as txt. Otherwise, the circuit will be not exported but circuit qasm remains. Returns: Export: A namedtuple containing the data of experiment which can be more easily to export as json file. """ save_location = create_save_location(save_location, self.commons) if self.commons.save_location != save_location: self.commons = self.commons._replace(save_location=save_location) adventures, tales = self.beforewards.export(export_transpiled_circuit) legacy = self.afterwards.export() reports, tales_reports = self.reports.export() # multi-experiment mode folder, filename = decide_folder_and_filename(self.commons, self.args) files = { "folder": folder, "qurryinfo": folder + "qurryinfo.json", "args": folder + f"args/{filename}.args.json", "advent": folder + f"advent/{filename}.advent.json", "legacy": folder + f"legacy/{filename}.legacy.json", } for k in tales: files[f"tales.{k}"] = folder + f"tales/{filename}.{k}.json" files["reports"] = folder + f"reports/{filename}.reports.json" for k in tales_reports: files[f"reports.tales.{k}"] = folder + f"tales/{filename}.{k}.reports.json" return Export( exp_id=str(self.commons.exp_id), exp_name=str(self.args.exp_name), serial=(None if self.commons.serial is None else int(self.commons.serial)), summoner_id=(None if self.commons.summoner_id else str(self.commons.summoner_id)), summoner_name=(None if self.commons.summoner_name else str(self.commons.summoner_name)), filename=str(filename), files={k: str(Path(v)) for k, v in files.items()}, args=self.args._asdict(), commons=self.commons.export(), outfields=self.outfields, adventures=adventures, legacy=legacy, tales=tales, reports=reports, tales_reports=tales_reports, )
[docs] def write( self, save_location: Optional[Union[Path, str]] = None, export_transpiled_circuit: bool = False, qurryinfo_hold_access: Optional[str] = None, multiprocess: bool = True, pbar: Optional[tqdm.tqdm] = None, ) -> tuple[str, dict[str, str]]: """Export the experiment data, if there is a previous export, then will overwrite. Args: save_location (Optional[Union[Path, str]], optional): Where to save the export content as `json` file. If `save_location == None`, then use the value in `self.commons` to be exported, if it's None too, then raise error. Defaults to None. export_transpiled_circuit (bool, optional): Whether to export the transpiled circuit as txt. Defaults to False. When set to True, the transpiled circuit will be exported as txt. Otherwise, the circuit will be not exported but circuit qasm remains. qurryinfo_hold_access (str, optional): Whether to hold the I/O of `qurryinfo`, then export by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. It should be ONLY control by :class:`~qurry.qurrium.multimanager.multimanager.MultiManager`. Defaults to None. multiprocess (bool, optional): Whether to use multiprocessing. Defaults to `True`. pbar (Optional[tqdm.tqdm], optional): The progress bar for showing the progress of the experiment. Defaults to None. Returns: tuple[str, dict[str, str]]: The id of the experiment and the files location. """ set_pbar_description(pbar, "Preparing to export...") # experiment write export_material = self.export(save_location, export_transpiled_circuit) exp_id, files = export_material.write(multiprocess, pbar) assert "qurryinfo" in files, "qurryinfo location is not in files." # qurryinfo write real_save_location = Path(self.commons.save_location) if ( qurryinfo_hold_access == self.commons.summoner_id and self.commons.summoner_id is not None ): # if qurryinfo_hold_access is set, then export by MultiManager return exp_id, files qurryinfo_location = real_save_location / files["qurryinfo"] if os.path.exists(qurryinfo_location): with open(qurryinfo_location, "r", encoding=DEFAULT_ENCODING) as f: qurryinfo_found: dict[str, dict[str, str]] = dict(json.load(f)) qurryinfo_found[exp_id] = files quickJSON(qurryinfo_found, str(qurryinfo_location), DEFAULT_MODE) else: quickJSON({exp_id: files}, str(qurryinfo_location), DEFAULT_MODE) return exp_id, files
@classmethod def _read_core( cls, exp_id: str, file_index: dict[str, str], save_location: Union[Path, str] = Path("./"), ): """Core of read function. Args: exp_id (str): The id of the experiment to be read. file_index (dict[str, str]): The index of the experiment to be read. save_location (Union[Path, str]): The location of the experiment to be read. Raises: ValueError: 'save_location' needs to be the type of 'str' or 'Path'. FileNotFoundError: When `save_location` is not available. Returns: QurryExperiment: The experiment to be read. """ save_location = create_save_location(save_location) if not os.path.exists(save_location): raise FileNotFoundError(f"'save_location' does not exist, '{save_location}'.") reading_return_args = Commonparams.read_with_arguments( exp_id=exp_id, file_index=file_index, save_location=save_location ) exp_instance = cls( **reading_return_args, beforewards=Before.read(file_index=file_index, save_location=save_location), afterwards=After.read(file_index=file_index, save_location=save_location), reports=AnalysesContainer(), ) reports_read = exp_instance.analysis_instance.read( file_index=file_index, save_location=save_location ) exp_instance.reports.update(reports_read) return exp_instance @classmethod def _read_core_multiprocess(cls, all_arugments: tuple[str, dict[str, str], Union[Path, str]]): """Core of read function for multiprocess. Args: all_arugments (tuple[str, dict[str, str], Union[Path, str], str]): The arguments of the experiment to be read. - exp_id (str): The id of the experiment to be read. - file_index (dict[str, str]): The index of the experiment to be read. - save_location (Union[Path, str]): The location of the experiment to be read. Returns: QurryExperiment: The experiment to be read. """ return cls._read_core(*all_arugments)
[docs] @classmethod def read( cls, name_or_id: Union[Path, str], save_location: Union[Path, str] = Path("./"), ): """Read the experiment from file. Args: name_or_id (Union[Path, str]): The name or id of the experiment to be read. save_location (Union[Path, str], optional): The location of the experiment to be read. Defaults to Path('./'). Raises: ValueError: 'save_location' needs to be the type of 'str' or 'Path'. FileNotFoundError: When `save_location` is not available. Returns: list[ExperimentPrototype]: The experiment to be read. """ save_location = create_save_location(save_location) if not os.path.exists(save_location): raise FileNotFoundError(f"'save_location' does not exist, '{save_location}'.") export_location = save_location / name_or_id if not os.path.exists(export_location): raise FileNotFoundError(f"'ExportLoaction' does not exist, '{export_location}'.") qurryinfo_location = export_location / "qurryinfo.json" if not os.path.exists(qurryinfo_location): raise FileNotFoundError( f"'qurryinfo.json' does not exist at '{save_location}'. " + "It's required for loading all experiment data." ) qurryinfo: dict[str, dict[str, str]] = {} with open(qurryinfo_location, "r", encoding=DEFAULT_ENCODING) as f: qurryinfo_found: dict[str, dict[str, str]] = json.load(f) qurryinfo.update(qurryinfo_found) num_exps = len(qurryinfo) chunks_num = very_easy_chunk_size( tasks_num=num_exps, num_process=DEFAULT_POOL_SIZE, max_chunk_size=min(max(1, num_exps // DEFAULT_POOL_SIZE), 40), ) reading_pool = get_context("spawn").Pool( processes=DEFAULT_POOL_SIZE, maxtasksperchild=chunks_num * 2 ) with reading_pool as pool: exps_iterable = qurry_progressbar( pool.imap_unordered( cls._read_core_multiprocess, ( (exp_id, file_index, save_location) for exp_id, file_index in qurryinfo.items() ), ), total=num_exps, desc=f"Loading {num_exps} experiments ...", ) exps = list(exps_iterable) return exps