"""MultiManager - Beforewards (:mod:`qurry.qurrium.multimanager.beforewards`)"""
from pathlib import Path
from collections.abc import Hashable
from typing import Literal, Union, Optional, NamedTuple, Any
from ...capsule import quickRead
from ...capsule.mori import TagList
TagListKeyable = Union[str, tuple[str, ...], Literal["_onetime"], Hashable]
"""Type of keyable in :class:`~qurry.capsule.mori.taglist.TagList`."""
EXPORTING_NAME = {
"exps_config": "exps.config",
"circuits_num": "circuitsNum",
"pending_pool": "pendingPools",
"circuits_map": "circuitsMap",
"job_id": "jobID",
"job_taglist": "job.tagList",
"index_taglist": "index.tagList",
}
"""The exporting name of :class:`Before` in V7 format."""
[docs]
class Before(NamedTuple):
"""`dataNeccessary` and `expsMultiMain` in V4 format."""
exps_config: dict[str, dict[str, Any]]
"""The dict of config of each experiments."""
circuits_num: dict[str, int]
"""The map with tags of index of experiments, which multiple experiments shared."""
pending_pool: TagList[TagListKeyable, int]
"""The pool of pending jobs, which multiple experiments shared,
it works only when executing experiments is remote.
"""
circuits_map: TagList[str, int]
"""The map of circuits of each experiments in the index of pending,
which multiple experiments shared.
"""
job_id: list[tuple[Optional[str], TagListKeyable]]
"""The list of job_id in pending, which multiple experiments shared,
it works only when executing experiments is remote.
"""
job_taglist: TagList[TagListKeyable, str]
"""The list of job id but grouped by tags, which multiple experiments shared."""
index_taglist: TagList[TagListKeyable, int]
"""The list of experiments index but grouped by tags, which multiple experiments shared."""
@staticmethod
def _exporting_name():
"""The exporting name of :class:`Before`."""
return EXPORTING_NAME
[docs]
@classmethod
def read(
cls,
export_location: Path,
file_location: Optional[dict[str, Union[str, dict[str, str]]]] = None,
version: Literal["v5", "v7"] = "v5",
):
"""Reads the data of :class:`Before` from the file.
Args:
export_location (Path): The location of exporting.
file_location (Optional[dict[str, Union[str, dict[str, str]]]): The location of file.
version (Literal["v5", "v7"], optional): The version of file. Defaults to "v5".
Returns:
Before: The data of :class:`Before`.
"""
if file_location is None:
file_location = {}
if version == "v7":
real_file_location = {k: f"{v}.json" for k, v in EXPORTING_NAME.items()}
else:
assert isinstance(file_location["exps_config"], str), "ExpsConfig must be Path"
assert isinstance(file_location["circuits_num"], str), "circuitsNum must be Path"
assert isinstance(file_location["job_id"], str), "job_id must be Path"
real_file_location = {
"exps_config": Path(file_location["exps_config"]).name,
"circuits_num": Path(file_location["circuits_num"]).name,
"job_id": Path(file_location["job_id"]).name,
}
return cls(
exps_config=quickRead(
filename=(real_file_location["exps_config"]),
save_location=export_location,
),
circuits_num=quickRead(
filename=(real_file_location["circuits_num"]),
save_location=export_location,
),
circuits_map=TagList.read(
filename=real_file_location["circuits_map"],
taglist_name="circuitsMap",
save_location=export_location,
),
pending_pool=TagList.read(
filename=real_file_location["pending_pool"],
taglist_name="pendingPools",
save_location=export_location,
),
job_id=quickRead(
filename=(real_file_location["job_id"]),
save_location=export_location,
),
job_taglist=TagList.read(
filename=real_file_location["job_taglist"],
taglist_name=("job.tagList" if version == "v7" else "tagMapExpsID"),
save_location=export_location,
),
index_taglist=TagList.read(
filename=real_file_location["index_taglist"],
taglist_name=("index.tagList" if version == "v7" else "tagMapIndex"),
save_location=export_location,
),
)