Source code for ccvm_simulators.ccvmplotlib.problem_metadata.boxqp_metadata
from ccvm_simulators.ccvmplotlib.problem_metadata.problem_metadata import (
ProblemType,
ProblemMetadata,
)
from ccvm_simulators.ccvmplotlib.utils.sampleTTSmetric import SampleTTSMetric
import numpy as np
import pandas as pd
import json_stream
[docs]
class BoxQPMetadata(ProblemMetadata):
"""BoxQP Problem-specific Metadata class.
The problem-specific metadata class inherited from Problem Metadata parent
class for the BoxQP problem. This takes Box QP problem result data and
generates plotting data.
"""
def __init__(self, problem: ProblemType) -> None:
"""BoxQP Metadata class object constructor.
The constructor defines variables that are specific to the BoxQP problem
and are used to generate plotting data.
Args:
problem (ProblemType): A problem type.
"""
super().__init__(problem)
self.__problem_size_list: list[int] = []
self.__percent_gap_list: list[str] = []
self.__percentile_list: list[str] = ["25", "50", "75", "success_prob"]
self.__batch_size: int = 0
self.__df: pd.DataFrame = pd.DataFrame()
def __flatten_dict(self, result: dict) -> dict:
"""Flatten a nested dictionary.
Args:
result (dict): Result in the nested dictionary.
Returns:
dict: Flattened dictionary.
"""
flattened_dict = {}
for key_depth0, val_depth0 in result.items():
if isinstance(val_depth0, dict) or isinstance(
val_depth0, json_stream.base.TransientStreamingJSONObject
):
for key_depth1, val_depth1 in val_depth0.items():
flattened_dict[key_depth1] = val_depth1
else:
flattened_dict[key_depth0] = val_depth0
return flattened_dict
[docs]
def ingest_metadata(self, metadata_filepath: str) -> None:
"""A method to ingest raw metadata.
Take a file path to metadata and convert them into a pandas.DataFrame.
Args:
metadata_filepath (str): A file path to metadata.
"""
# populate percent gap list
with open(metadata_filepath, "r") as test_file:
data_stream = json_stream.load(test_file)
for key in data_stream["result_metadata"][0]["solution_performance"]:
self.__percent_gap_list.append(key)
# populate pd.DataFrame
with open(metadata_filepath, "r") as test_file:
data_stream = json_stream.load(test_file)
for data in data_stream["result_metadata"]:
self.__df = pd.concat(
[self.__df, pd.DataFrame([self.__flatten_dict(data)])],
ignore_index=True,
)
self.__batch_size = self.__df["batch_size"][0]
self.__problem_size_list = sorted(self.__df["problem_size"].unique().tolist())
[docs]
def generate_plot_data(
self,
metric_func: callable,
) -> pd.DataFrame:
"""Calculate the time to solution vs problem size for a particular gap and
quantile.
Args:
metric_func (callable): A callback function that is used when calculating
the metrics either to determine the `machine_time` or the `machine_energy`,
which are used when computing the TTS or ETS, respectively.
Returns:
(pd.Series): The time to solution for each problem size.
"""
plotting_df = pd.DataFrame(
index=pd.Index(self.__problem_size_list, name="Problem Size (N)"),
columns=pd.MultiIndex.from_product(
[self.__percent_gap_list, self.__percentile_list],
names=["Optimality Type", "Percentile"],
),
)
for percent_gap in self.__percent_gap_list:
for problem_size in self.__problem_size_list:
matching_df = self.__df.loc[self.__df["problem_size"] == problem_size]
for percentile in self.__percentile_list[:-1]:
sampler = SampleTTSMetric(
tau_attribute="time",
percentile=int(percentile),
seed=1,
num_bootstraps=100,
)
metric_value = metric_func(
dataframe=matching_df, problem_size=problem_size
)
success_prob = matching_df[percent_gap].values
frac_solved = (success_prob > 0).mean()
if frac_solved < (int(percentile) / 100):
R99 = np.inf
else:
R99_distribution = sampler.calc_R99_distribution(
success_probabilities=success_prob,
num_repeats=self.__batch_size,
)
R99 = np.mean(R99_distribution)
mean_metric = metric_value * R99
plotting_df.at[problem_size, (percent_gap, percentile)] = (
mean_metric
)
return plotting_df
[docs]
def generate_success_prob_plot_data(self) -> pd.DataFrame:
"""Calculate the success probability vs problem size for a particular gap and
quantile.
Returns:
pd.DataFrame: The success probability for each problem size.
"""
plotting_df = pd.DataFrame(
index=pd.Index(self.__problem_size_list, name="Problem Size (N)"),
columns=pd.MultiIndex.from_product(
[self.__percent_gap_list, self.__percentile_list],
names=["Optimality Type", "Percentile"],
),
)
for percent_gap in self.__percent_gap_list:
for problem_size in self.__problem_size_list:
matching_df = self.__df.loc[self.__df["problem_size"] == problem_size]
success_prob_list = matching_df[percent_gap].values
mean_success_prob = np.mean(
np.array(
[float(success_prob) for success_prob in success_prob_list]
)
)
plotting_df.at[problem_size, (percent_gap, "success_prob")] = (
mean_success_prob
)
return plotting_df