import pandas as pd
import ast
from .database import Dataset
from pathlib import Path
[docs]
def _correct_esm_and_lca_efficiency_differences(
self,
return_efficiency_report: bool = False,
write_efficiency_report: bool = True,
db_type: str = 'esm',
) -> None or pd.DataFrame:
"""
Correct the efficiency differences between ESM technologies and their operation LCI datasets. This method can be
used during the creation of the ESM database and during the creation of the ESM results database.
:param write_efficiency_report: if True, write the efficiency differences in a csv file
:param return_efficiency_report: if True, return the efficiency differences pandas DataFrame
:param db_type: type of database to use for the efficiency correction, can be either 'esm', 'esm results' or
'validation'
:return: None or pandas DataFrame if return_efficiency_report is True
"""
# Store frequently accessed instance variables in local variables inside a method if they don't need to be modified
db_dict_code = self.main_database.db_as_dict_code
db_dict_name = self.main_database.db_as_dict_name
mapping = self.mapping
efficiency = self.efficiency
unit_conversion = self.unit_conversion
mapping_esm_flows_to_CPC_cat = self.mapping_esm_flows_to_CPC_cat
esm_results_db_name = self.esm_results_db_name
if self.df_flows_set_to_zero is None:
self.df_flows_set_to_zero = pd.read_csv(f'{self.results_path_file}removed_flows_list.csv')
if self.double_counting_removal_amount is None:
self.double_counting_removal_amount = pd.read_csv(f'{self.results_path_file}double_counting_removal.csv')
removed_flows = self.df_flows_set_to_zero
double_counting_removal_amount = self.double_counting_removal_amount
# Keep only the operational flows (others are not relevant for efficiency correction)
removed_flows = removed_flows[removed_flows['Type'] == 'Operation']
double_counting_removal_amount = double_counting_removal_amount[double_counting_removal_amount['Type'] == 'Operation']
if db_type == 'validation':
efficiency = double_counting_removal_amount[['Name', 'Flow', 'Unit', 'Amount']].copy(deep=True)
efficiency.drop(efficiency[efficiency['Flow'].isin([
'CONSTRUCTION', 'OWN_CONSTRUCTION',
'DECOMMISSION', 'OWN_DECOMMISSION',
'TRANSPORT_FUEL', 'PROCESS_FUEL',
])].index, inplace=True)
efficiency.Flow = efficiency.Flow.apply(lambda x: f"['{x}']")
try:
efficiency.Flow = efficiency.Flow.apply(ast.literal_eval)
except ValueError:
pass
if db_type in ['esm', 'validation']:
if db_type == 'esm':
# Get in unit of the input flow in the LCI dataset
efficiency['LCA input unit'] = efficiency.apply(
self._get_lca_input_flow_unit_or_product,
axis=1,
output_type='unit',
removed_flows=removed_flows
)
efficiency.drop(efficiency[efficiency['LCA input unit'].isnull()].index, inplace=True)
efficiency = efficiency.explode(column=['LCA input unit'], ignore_index=True)
elif db_type == 'validation':
efficiency.rename(columns={'Unit': 'LCA input unit'}, inplace=True)
# Get the name of the reference product of the input flow in the LCI dataset
efficiency['LCA input product'] = efficiency.apply(
self._get_lca_input_flow_unit_or_product,
axis=1,
output_type='product',
removed_flows=removed_flows
)
efficiency.drop(efficiency[efficiency['LCA input product'].isnull()].index, inplace=True)
if db_type == 'esm':
# Get the physical unit of the input flow in the LCI dataset
efficiency['LCA input quantity (LCA unit)'] = efficiency.apply(
self._get_lca_input_quantity,
axis=1,
double_counting_removal_amount=double_counting_removal_amount
)
# efficiency.drop(efficiency[efficiency['LCA input quantity (LCA unit)'] == 0].index, inplace=True)
elif db_type == 'validation':
efficiency.rename(columns={'Amount': 'LCA input quantity (LCA unit)'}, inplace=True)
# Get the physical unit of the input flow in the ESM
efficiency['ESM input quantity (ESM unit)'] = efficiency.apply(
self._get_esm_input_quantity,
axis=1,
)
efficiency['ESM input unit'] = efficiency.apply(
self._get_esm_input_unit,
axis=1,
)
if len(self.resources_without_unit_conversion_factor) > 0:
raise ValueError(
f'The following ESM resources/flows do not have a unit conversion factor in the unit conversion file: '
f'{self.resources_without_unit_conversion_factor}. Please add them to the unit conversion file.'
)
# Derive the efficiency in the ESM
efficiency['ESM efficiency'] = 1 / efficiency['ESM input quantity (ESM unit)'] # inputs are scaled w.r.t. a unit output flow
# Add output flow units from the ESM and the LCI dataset, and the corresponding conversion factor
efficiency = efficiency.merge(
right=unit_conversion[unit_conversion.Type == 'Operation'][['Name', 'Value', 'LCA', 'ESM']],
how='left',
on='Name'
)
efficiency.rename(
columns={'Value': 'Output conversion factor', 'LCA': 'LCA output unit', 'ESM': 'ESM output unit'},
inplace=True)
# Add the input flow units from the ESM and the LCI dataset, and the corresponding conversion factor
efficiency = efficiency.merge(
right=unit_conversion[
(unit_conversion.Type == 'Other')
][['Name', 'Value', 'LCA', 'ESM']],
how='left',
left_on=['LCA input product', 'LCA input unit', 'ESM input unit'],
right_on=['Name', 'LCA', 'ESM'],
suffixes=('', '_to_remove')
)
efficiency.drop(columns=['Name_to_remove', 'LCA'], inplace=True)
efficiency.rename(columns={'Value': 'Input conversion factor'}, inplace=True)
efficiency['Input conversion factor'] = efficiency.apply(self._basic_unit_conversion, axis=1)
missing_units = efficiency[efficiency['Input conversion factor'].isna()][
['LCA input product', 'LCA input unit', 'ESM input unit']].values.tolist()
missing_units = [tuple(x) for x in set(tuple(x) for x in missing_units)]
if len(missing_units) > 0:
raise ValueError(f'No conversion factor found for the following units (product, unit from, unit to): '
f'{missing_units}')
efficiency['LCA input quantity (ESM unit)'] = (
(efficiency['Output conversion factor'] * efficiency['LCA input quantity (LCA unit)']) / efficiency['Input conversion factor'])
# To handle the case where multiple flows, with potentially different units, are within the same ESM flow
# category, we aggregate those flows to compute the efficiency
efficiency['Flow'] = efficiency['Flow'].apply(lambda x: str(x) if isinstance(x, list) else x) # Convert to string for merging
efficiency = efficiency.merge(
efficiency.groupby(['Name', 'Flow'])['LCA input quantity (ESM unit)'].sum().reset_index(),
how='left',
on=['Name', 'Flow'],
suffixes=('', ' aggregated'),
)
efficiency['Flow'] = efficiency['Flow'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
efficiency['LCA efficiency'] = 1 / efficiency['LCA input quantity (ESM unit) aggregated']
efficiency['efficiency_ratio'] = efficiency['LCA efficiency'] / efficiency['ESM efficiency']
elif db_type == 'esm results':
if self.efficiency_differences_report is None:
efficiency = pd.read_csv(f'{self.results_path_file}efficiency_differences.csv')
else:
efficiency = self.efficiency_differences_report
efficiency['Flow'] = efficiency['Flow'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
if self.operation_metrics_for_all_time_steps:
# Convert the 'Flow' column to string to be able to merge
self.efficiency['Flow'] = self.efficiency['Flow'].apply(lambda x: str(x) if isinstance(x, list) else x)
efficiency['Flow'] = efficiency['Flow'].apply(lambda x: str(x) if isinstance(x, list) else x)
self.efficiency = pd.merge(self.efficiency, efficiency[['Name', 'Flow', 'efficiency_ratio']],
on=['Name', 'Flow'], how='left')
self.efficiency.rename(columns={'efficiency_ratio': f'efficiency_ratio ({self.year})'}, inplace=True)
# Convert the 'Flow' column back to list
self.efficiency['Flow'] = self.efficiency['Flow'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
efficiency = self.efficiency
if db_type != 'validation':
for i in range(len(efficiency)):
for year in [self.year] if (not self.operation_metrics_for_all_time_steps or db_type == 'esm') \
else [y for y in self.list_of_years if y <= self.year]:
act_to_adapt_list = [] # there might be several activities to adapt for one technology in case of market
techno_flows_to_correct_dict = {}
tech = efficiency['Name'].iloc[i]
flows_list = efficiency['Flow'].iloc[i]
CPC_list = [] # list of CPC categories corresponding to the fuel flow(s) of the technology
for flow in flows_list:
CPC_list += mapping_esm_flows_to_CPC_cat[mapping_esm_flows_to_CPC_cat['Flow'] == flow]['CPC'].values[0]
df_removed_flows = removed_flows[removed_flows.Name == tech] # flows removed during double counting removal
for j in range(len(df_removed_flows)):
(main_act_database, main_act_code, removed_act_database, removed_act_code) = df_removed_flows[
['Database', 'Code', 'Removed flow database', 'Removed flow code']].iloc[j]
act_exc = db_dict_code[removed_act_database, removed_act_code]
if 'classifications' in act_exc:
if 'CPC' in dict(act_exc['classifications']):
if dict(act_exc['classifications'])['CPC'] in CPC_list:
# if this flow (that was removed during double counting removal) is a fuel flow of the
# technology, the biosphere flows the activity will be adjusted
if db_type == 'esm':
act_to_adapt = db_dict_code[main_act_database, main_act_code]
elif db_type == 'esm results':
act_in_esm_db = db_dict_code[main_act_database, main_act_code]
if act_in_esm_db['name'] == f'{tech}, Operation':
act_in_esm_db_name = mapping[
(mapping.Name == tech)
& (mapping.Type == 'Operation')
]['Activity'].iloc[0]
if self.operation_metrics_for_all_time_steps:
act_in_esm_db_name += f' ({tech}, {year})'
else:
act_in_esm_db_name += f' ({tech})'
if (
act_in_esm_db_name,
act_in_esm_db['reference product'],
act_in_esm_db['location'],
esm_results_db_name,
) not in db_dict_name.keys():
# If the technology is not in the ESM results database, it means that
# its annual production was null. Thus, we do not need to correct its
# efficiency.
act_to_adapt = None
else:
act_to_adapt = db_dict_name[(
act_in_esm_db_name,
act_in_esm_db['reference product'],
act_in_esm_db['location'],
esm_results_db_name,
)]
else:
if (
act_in_esm_db['name'],
act_in_esm_db['reference product'],
act_in_esm_db['location'],
esm_results_db_name,
) not in db_dict_name.keys():
# If the technology is not in the ESM results database, it means that
# its annual production was null. Thus, we do not need to correct its
# efficiency.
act_to_adapt = None
else:
act_to_adapt = db_dict_name[(
act_in_esm_db['name'],
act_in_esm_db['reference product'],
act_in_esm_db['location'],
esm_results_db_name,
)]
else:
raise ValueError(f'db_type must be either "esm" or "esm results"')
if act_to_adapt not in act_to_adapt_list and act_to_adapt is not None:
# in case there are several fuel flows in the same activity
act_to_adapt_list.append(act_to_adapt)
techno_flows_to_correct_dict[
(act_to_adapt['database'], act_to_adapt['code'])
] = []
if act_to_adapt is not None:
techno_flows_to_correct_dict[
(act_to_adapt['database'], act_to_adapt['code'])
] += [(act_exc['database'], act_exc['code'])]
if len(act_to_adapt_list) == 0:
if db_type == 'esm':
self.logger.warning(
f'No flow of type(s) {flows_list} found for {tech}. The efficiency of this technology '
f'cannot be adjusted.'
)
for act in act_to_adapt_list:
if self.operation_metrics_for_all_time_steps and db_type == 'esm results':
efficiency_ratio = efficiency[f'efficiency_ratio ({year})'].iloc[i]
else:
efficiency_ratio = efficiency['efficiency_ratio'].iloc[i]
techno_flows_to_correct = techno_flows_to_correct_dict[(act['database'], act['code'])]
act = self._adapt_flows_to_efficiency_difference(act, efficiency_ratio, techno_flows_to_correct)
if write_efficiency_report:
# saving the efficiency differences in a csv file
Path(self.results_path_file).mkdir(parents=True, exist_ok=True) # Create the folder if it does not exist
efficiency.to_csv(f'{self.results_path_file}efficiency_differences.csv', index=False)
if return_efficiency_report:
# returning the efficiency differences pandas DataFrame
return efficiency
@staticmethod
[docs]
def _adapt_flows_to_efficiency_difference(
act: dict,
efficiency_ratio: float,
techno_flows_to_correct: list[tuple[str, str]],
) -> dict:
"""
Adapt the biosphere flows of an activity to correct the efficiency difference between ESM and LCA
:param act: LCI dataset to adapt
:param efficiency_ratio: ratio between the LCA and ESM efficiencies
:param techno_flows_to_correct: list of (database, code) tuples to correct among the technosphere flows
:return: the adapted LCI dataset
"""
for exc in Dataset(act).get_biosphere_flows():
exc['amount'] *= efficiency_ratio
exc['comment'] = (f'EF multiplied by {round(efficiency_ratio, 4)} (efficiency).' + exc.get('comment', ''))
for exc in Dataset(act).get_technosphere_flows():
if (exc['database'], exc['code']) in techno_flows_to_correct:
exc['amount'] *= efficiency_ratio
exc['comment'] = (f'TF multiplied by {round(efficiency_ratio, 4)} (efficiency). '
+ exc.get('comment', ''))
act['comment'] = (f'Biosphere and fuel flows adjusted by a factor {round(efficiency_ratio, 4)} to correct '
f'efficiency difference between ESM and LCA. ' + act.get('comment', ''))
return act
@staticmethod
[docs]
def _basic_unit_conversion(row: pd.Series) -> float or None:
if row['LCA input unit'] == row['ESM input unit']:
return 1.0
elif row['LCA input unit'] in ['megajoule', 'MJ'] and row['ESM input unit'] in ['kilowatt hour', 'kWh']:
return 3.6
elif row['LCA input unit'] in ['kilowatt hour', 'kWh'] and row['ESM input unit'] in ['megajoule', 'MJ']:
return 1 / 3.6
else:
return row['Input conversion factor']