Source code for mescal.double_counting

import copy
import ast
from .database import Dataset
from .utils import random_code, _short_name_ds_type
import pandas as pd
from tqdm import tqdm
import wurst






[docs] def _double_counting_removal( self, df: pd.DataFrame, N: int, ESM_inputs: list[str] or str = 'all', db_type: str = 'esm', ds_type: str = 'Operation', ) -> tuple[list[list], dict, list[list]]: """ Remove double counting in the ESM database and write it in the Brightway project :param df: mapping file with input flows of each technology or resource :param N: number of columns of the original mapping file :param ESM_inputs: list of the ESM flows to perform double counting removal on :param db_type: type of database to use, either 'esm', 'esm results' or 'esm results wo dcr' :param ds_type: type of LCI dataset to consider, can be 'Operation', 'Construction', 'Decommission' or 'Resource' :return: list of removed flows, dictionary of removed quantities, list of activities subject to double counting """ # Store frequently accessed instance variables in local variables inside a method. # Those will be passed to the background_search method via arguments. db_as_list = self.main_database.db_as_list db_dict_code = self.main_database.db_as_dict_code db_dict_name = self.main_database.db_as_dict_name # Store frequently accessed instance variables in local variables inside a method if they don't need to be modified esm_db_name = self.esm_db_name no_construction_list = self.no_construction_list no_decommission_list = self.no_decommission_list mapping_infra = self.mapping_infra mapping_constr = self.mapping_constr mapping_decom = self.mapping_decom no_background_search_list = self.no_background_search_list no_double_counting_removal_list = self.no_double_counting_removal_list regionalize_foregrounds = self.regionalize_foregrounds background_search_act = self.background_search_act # Initializing list of removed flows flows_set_to_zero = [] # Initializing the list of activities subject to double counting activities_subject_to_double_counting = [] # Initializing the dict of removed quantities ei_removal = {} for tech in list(df.Name): ei_removal[tech] = {} for res in list(df.iloc[:, N:].columns): ei_removal[tech][res] = {} ei_removal[tech][res]['amount'] = {} ei_removal[tech][res]['count'] = {} # readings lists as lists and not strings try: self.technology_compositions.Components = self.technology_compositions.Components.apply(ast.literal_eval) except ValueError: pass try: self.mapping_esm_flows_to_CPC_cat.CPC = self.mapping_esm_flows_to_CPC_cat.CPC.apply(ast.literal_eval) except ValueError: pass technology_compositions_dict = dict(zip( zip(self.technology_compositions['Name'], self.technology_compositions['Type']), self.technology_compositions['Components'] )) # inverse mapping dictionary (i.e., from CPC categories to the ESM flows) mapping_esm_flows_to_CPC_dict = {key: value for key, value in dict(zip( self.mapping_esm_flows_to_CPC_cat.Flow, self.mapping_esm_flows_to_CPC_cat.CPC )).items()} mapping_CPC_to_esm_flows_dict = {} for k, v in mapping_esm_flows_to_CPC_dict.items(): for x in v: mapping_CPC_to_esm_flows_dict.setdefault(x, []).append(k) for i in tqdm(range(len(df))): tech = df['Name'].iloc[i] # name of ESM technology # print(tech) # Initialization of the list of construction/decommission activities and corresponding CPC categories act_constr_list = [] CPC_constr_list = [] mapping_esm_flows_to_CPC_dict['OWN_CONSTRUCTION'] = [] act_decom_list = [] CPC_decom_list = [] mapping_esm_flows_to_CPC_dict['OWN_DECOMMISSION'] = [] for key in mapping_CPC_to_esm_flows_dict.keys(): # removing previous OWN_CONSTRUCTION/DECOMMISSION entries if 'OWN_CONSTRUCTION' in mapping_CPC_to_esm_flows_dict[key]: mapping_CPC_to_esm_flows_dict[key].remove('OWN_CONSTRUCTION') if 'OWN_DECOMMISSION' in mapping_CPC_to_esm_flows_dict[key]: mapping_CPC_to_esm_flows_dict[key].remove('OWN_DECOMMISSION') # Construction activity if tech in no_construction_list or ds_type != 'Operation': pass else: if (tech, 'Construction') not in technology_compositions_dict.keys(): # if the technology is not a composition # simple technologies are seen as compositions of one technology technology_compositions_dict[(tech, 'Construction')] = [tech] for sub_comp in technology_compositions_dict[(tech, 'Construction')]: # looping over the subcomponents of the composition database_constr, current_code_constr = mapping_infra[ (mapping_infra.Name == sub_comp) & (mapping_infra.Type == 'Construction') ][['Database', 'Current_code']].iloc[0] act_constr = db_dict_code[(database_constr, current_code_constr)] act_constr_list.append(act_constr) try: CPC_constr = dict(act_constr['classifications'])['CPC'] except KeyError: self.products_without_a_cpc_category.add(act_constr["reference product"]) self.logger.warning(f'Product {act_constr["reference product"]} has no CPC category.') CPC_constr = 'None' CPC_constr_list.append(CPC_constr) mapping_esm_flows_to_CPC_dict['OWN_CONSTRUCTION'] += [CPC_constr] mapping_CPC_to_esm_flows_dict[CPC_constr] = ['OWN_CONSTRUCTION'] # Decommission activity if tech in no_decommission_list or ds_type != 'Operation': pass else: if (tech, 'Decommission') not in technology_compositions_dict.keys(): # if the technology is not a composition # simple technologies are seen as compositions of one technology technology_compositions_dict[(tech, 'Decommission')] = [tech] for sub_comp in technology_compositions_dict[(tech, 'Decommission')]: # looping over the subcomponents of the composition database_decom, current_code_decom = mapping_infra[ (mapping_infra.Name == sub_comp) & (mapping_infra.Type == 'Decommission') ][['Database', 'Current_code']].iloc[0] act_decom = db_dict_code[(database_decom, current_code_decom)] act_decom_list.append(act_decom) try: CPC_decom = dict(act_decom['classifications'])['CPC'] except KeyError: self.products_without_a_cpc_category.add(act_decom["reference product"]) self.logger.warning(f'Product {act_decom["reference product"]} has no CPC category.') CPC_decom = 'None' CPC_decom_list.append(CPC_decom) mapping_esm_flows_to_CPC_dict['OWN_DECOMMISSION'] += [CPC_decom] if CPC_decom in mapping_CPC_to_esm_flows_dict: if 'OWN_DECOMMISSION' not in mapping_CPC_to_esm_flows_dict[CPC_decom]: # avoid duplicates mapping_CPC_to_esm_flows_dict[CPC_decom] += ['OWN_DECOMMISSION'] else: mapping_CPC_to_esm_flows_dict[CPC_decom] = ['OWN_DECOMMISSION'] # Main activity database_main = df['Database'].iloc[i] # LCA database of the technology current_code_main = df['Current_code'].iloc[i] # code in ecoinvent # identification of the activity in ecoinvent database act = db_dict_code[(database_main, current_code_main)] if db_type == 'esm': # Copy the activity and change the database (no new activity in original ecoinvent database) new_code = df['New_code'].iloc[i] # new code defined previously new_act = copy.deepcopy(act) new_act['code'] = new_code new_act['database'] = esm_db_name prod_flow = Dataset(new_act).get_production_flow() prod_flow['code'] = new_code prod_flow['database'] = esm_db_name db_as_list.append(new_act) db_dict_name[ (new_act['name'], new_act['reference product'], new_act['location'], new_act['database']) ] = new_act db_dict_code[(new_act['database'], new_act['code'])] = new_act else: new_act = act if tech in no_double_counting_removal_list[ds_type]: perform_d_c = [] elif tech in no_background_search_list[ds_type]: if db_type != 'esm results wo dcr': new_act['comment'] = f"Subject to double-counting removal. " + new_act.get('comment', '') perform_d_c = [[new_act['name'], new_act['code'], 1, 0, ESM_inputs]] else: perform_d_c = [] else: perform_d_c, db_dict_code, db_dict_name, db_as_list = self._background_search( act=new_act, k=0, k_lim=self.max_depth_double_counting_search, amount=1, explore_type='market', ESM_inputs=ESM_inputs, perform_d_c=[], db_dict_code=db_dict_code, db_dict_name=db_dict_name, db_as_list=db_as_list, db_type=db_type, ) # list of activities to perform double counting removal on if len(perform_d_c) == 0 and db_type != 'esm results wo dcr': # if the datasets has been identified as a market (one of the conditions was true), but none of the # technosphere flows correspond to the same CPC category, we consider the activity itself for # double-counting removal new_act['comment'] = f"Subject to double-counting removal. " + new_act.get('comment', '') perform_d_c = [[new_act['name'], new_act['code'], 1, 0, ESM_inputs]] if db_type == 'esm': new_act['name'] = f'{tech}, {ds_type}' # saving name after market identification prod_flow = Dataset(new_act).get_production_flow() prod_flow['name'] = f'{tech}, {ds_type}' id_d_c = 0 while id_d_c < len(perform_d_c): new_act_op_d_c_code = perform_d_c[id_d_c][1] # activity code new_act_op_d_c_amount = perform_d_c[id_d_c][2] # multiplying factor as we went down in the tree k_deep = perform_d_c[id_d_c][3] # depth level in the process tree new_act_op_d_c = None if (esm_db_name, new_act_op_d_c_code) in db_dict_code: new_act_op_d_c = db_dict_code[ (esm_db_name, new_act_op_d_c_code)] # activity in the database else: db_names_list = list(set([a['database'] for a in db_as_list])) for db_name in db_names_list: if (db_name, new_act_op_d_c_code) in db_dict_code: new_act_op_d_c = db_dict_code[(db_name, new_act_op_d_c_code)] if new_act_op_d_c is None: raise ValueError(f"Activity not found: {new_act_op_d_c_code}") if ds_type in regionalize_foregrounds: db_as_list.remove(new_act_op_d_c) new_act_op_d_c = self._regionalize_activity_foreground(act=new_act_op_d_c) db_as_list.append(new_act_op_d_c) db_dict_name[ (new_act_op_d_c['name'], new_act_op_d_c['reference product'], new_act_op_d_c['location'], new_act_op_d_c['database']) ] = new_act_op_d_c if perform_d_c[id_d_c][4] == 'all': # list of inputs in the ESM (i.e., negative flows in layers_in_out) ES_inputs = list(df.iloc[:, N:].iloc[i][df.iloc[:, N:].iloc[i] < 0].index) else: ES_inputs = perform_d_c[id_d_c][4] # CPCs corresponding to the ESM list of inputs CPC_inputs = list(mapping_esm_flows_to_CPC_dict[inp] for inp in ES_inputs) CPC_inputs = [item for sublist in CPC_inputs for item in sublist] # flatten the list of lists # Creating the list containing the CPCs of all technosphere flows of the activity technosphere_inputs = Dataset(new_act_op_d_c).get_technosphere_flows() technosphere_inputs_CPC = [] for exc in technosphere_inputs: # if (exc['amount'] < 0) & (exc['unit'] == 'unit'): # print('Potential decommission flow:', new_act_op_d_c['name'], exc['name'], exc['amount']) database = exc['database'] code = exc['code'] act_flow = db_dict_code[(database, code)] if 'classifications' in list(act_flow.keys()): if 'CPC' in dict(act_flow['classifications']).keys(): technosphere_inputs_CPC.append(dict(act_flow['classifications'])['CPC']) else: technosphere_inputs_CPC.append('None') else: technosphere_inputs_CPC.append('None') # Finding the indices of technosphere flows that are also in the ESM inputs # (i.e., flows that we want to put to zero) set_CPC_inputs = set(CPC_inputs) id_technosphere_inputs_zero = [i for i, e in enumerate(technosphere_inputs_CPC) if e in set_CPC_inputs] if tech in no_construction_list or ds_type != 'Operation': pass elif perform_d_c[id_d_c][4] != 'all': pass else: comp_condition = True for n in range(len(CPC_constr_list)): comp_condition &= ( CPC_constr_list[n] in [technosphere_inputs_CPC[i] for i in id_technosphere_inputs_zero]) if comp_condition: pass else: # if the construction phase was not detected via OWN_CONSTRUCTION, # then we need the generic CONSTRUCTION CPC categories CPC_inputs.extend(mapping_esm_flows_to_CPC_dict['CONSTRUCTION']) ES_inputs.append('CONSTRUCTION') set_CPC_inputs = set(CPC_inputs) id_technosphere_inputs_zero = [i for i, e in enumerate(technosphere_inputs_CPC) if e in set_CPC_inputs] if tech in no_decommission_list or ds_type != 'Operation': pass elif perform_d_c[id_d_c][4] != 'all': pass else: comp_condition = True for n in range(len(CPC_decom_list)): comp_condition &= ( CPC_decom_list[n] in [technosphere_inputs_CPC[i] for i in id_technosphere_inputs_zero]) if comp_condition: pass else: # if the decommission phase was not detected via OWN_DECOMMISSION, # then we need the generic DECOMMISSION CPC categories CPC_inputs.extend(mapping_esm_flows_to_CPC_dict['DECOMMISSION']) ES_inputs.append('DECOMMISSION') set_CPC_inputs = set(CPC_inputs) id_technosphere_inputs_zero = [i for i, e in enumerate(technosphere_inputs_CPC) if e in set_CPC_inputs] for n in id_technosphere_inputs_zero: flow = technosphere_inputs[n] if ds_type != 'Construction' and flow['amount'] < 0 and flow['unit'] != 'unit': # we keep waste flows (but not infrastructure decommissioning flows, which should be contained # in the infrastructure LCI dataset) continue # Keep track of the amount in the original activity as a comment old_amount = flow['amount'] database = flow['database'] code = flow['code'] act_flow = db_dict_code[(database, code)] res_categories = mapping_CPC_to_esm_flows_dict.get(dict(act_flow['classifications']).get('CPC', ''), '') if ds_type == 'Construction' and flow['amount'] > 0 and 'DECOMMISSION' in res_categories: # Positive flow identified as a waste flow, this is probably a mismatch continue flows_set_to_zero.append([ tech, ds_type, new_act_op_d_c['reference product'], # activity in which the flow is removed new_act_op_d_c['name'], new_act_op_d_c['location'], new_act_op_d_c['database'], new_act_op_d_c['code'], old_amount, # flow quantity old_amount * new_act_op_d_c_amount, # flow quantity scaled to the FU flow['unit'], # flow unit act_flow['reference product'], # removed flow act_flow['name'], act_flow['location'], act_flow['database'], act_flow['code'], ]) if db_type == 'esm': if 'OWN_CONSTRUCTION' in res_categories: # replace construction flow input by the one added before in the ESM database # (for validation purposes) for idx, sub_comp in enumerate(technology_compositions_dict[(tech, 'Construction')]): if code == act_constr_list[idx]['code']: new_code_constr = mapping_constr[mapping_constr.Name == sub_comp]['New_code'].iloc[0] flow['database'], flow['code'] = esm_db_name, new_code_constr flow['name'] = f'{sub_comp}, Construction' if 'OWN_DECOMMISSION' in res_categories: # replace decommission flow input by the one added before in the ESM database # (for validation purposes) for idx, sub_comp in enumerate(technology_compositions_dict[(tech, 'Decommission')]): if code == act_decom_list[idx]['code']: new_code_decom = mapping_decom[mapping_decom.Name == sub_comp]['New_code'].iloc[0] flow['database'], flow['code'] = esm_db_name, new_code_decom flow['name'] = f'{sub_comp}, Decommission' # add the removed amount in the ei_removal dict for post-analysis for cat in res_categories: if cat in ES_inputs: # only adds the amount for the relevant category # (e.g., ELECTRICITY_MV among [ELECTRICITY_LV, ELECTRICITY_MV, ELECTRICITY_HV] # which share the same CPCs if flow['unit'] not in ei_removal[tech][cat]['amount'].keys(): # old amount (e.g., GWh) multiplied by factor as we went down in the tree ei_removal[tech][cat]['amount'][flow['unit']] = abs(old_amount * new_act_op_d_c_amount) ei_removal[tech][cat]['count'][flow['unit']] = 1 # count (i.e., number of flows put to zero) else: # old amount (e.g., GWh) multiplied by factor as we went down in the tree ei_removal[tech][cat]['amount'][flow['unit']] += abs(old_amount * new_act_op_d_c_amount) ei_removal[tech][cat]['count'][flow['unit']] += 1 # count (i.e., number of flows put to zero) # Setting the amount to zero flow['comment'] = f'Original amount: {old_amount}. ' + flow.get('comment', '') flow['amount'] = 0 # Go deeper in the process tree if some flows were not found. # This is not applied to construction and decommission datasets, which should be found the foreground inventory. missing_ES_inputs = [] for cat in ES_inputs: if ( (tech in list(background_search_act[ds_type].keys())) & (cat not in ['CONSTRUCTION', 'OWN_CONSTRUCTION', 'OWN_DECOMMISSION']) # The two following conditions mean that the background search would stop when some # intermediary flows have already been found for a given esm flow, but some other # similar and relevant flows, further in the process tree, might also be there. & ((sum(ei_removal[tech][cat]['amount'].values()) == 0) | (not self.stop_background_search_when_first_flow_found)) & ((sum(ei_removal[tech][cat]['count'].values()) == 0) | (not self.stop_background_search_when_first_flow_found)) ): missing_ES_inputs.append(cat) if len(missing_ES_inputs) > 0: if k_deep <= background_search_act[ds_type][tech]: perform_d_c, db_dict_code, db_dict_name, db_as_list = self._background_search( act=new_act_op_d_c, k=k_deep, k_lim=background_search_act[ds_type][tech] - 1, amount=new_act_op_d_c_amount, explore_type='background_removal_'+_short_name_ds_type(ds_type), ESM_inputs=missing_ES_inputs, perform_d_c=perform_d_c, db_dict_code=db_dict_code, db_dict_name=db_dict_name, db_as_list=db_as_list, db_type=db_type, ) id_d_c += 1 activities_subject_to_double_counting.extend([[tech, ds_type, i[0], i[1], i[2]] for i in perform_d_c]) # Injecting local variables into the instance variables self.main_database.db_as_list = db_as_list return flows_set_to_zero, ei_removal, activities_subject_to_double_counting
[docs] def background_double_counting_removal( self, new_db_name: str = None, write_database: bool = True, ) -> None: """ Performs double-counting removal in the background inventory. Concretely, flows included in the ESM end-use demands (e.g., energy flows in the ESM geographical scope) are removed from the technosphere matrix. This step is needed if the ESM end-use demands include the production and operation of new infrastructures. :param new_db_name: name of the new database to write, if None, a default name is used (<original_db_name>_adjusted_for_double_counting) :param write_database: if True, writes the new database in Brightway :return: None """ if new_db_name is None: new_db_name = f'{self.esm_db_name}_adjusted_for_double_counting' db_as_list = self.main_database.db_as_list db_as_dict_code = self.main_database.db_as_dict_code mapping_esm_flows_to_CPC_cat = self.mapping_esm_flows_to_CPC_cat double_counting_report = [] activities_of_esm_region = [ a for a in wurst.get_many( db_as_list, wurst.equals('location', self.esm_location) ) ] cpc_list = [ast.literal_eval(i) for i in list(mapping_esm_flows_to_CPC_cat[mapping_esm_flows_to_CPC_cat.Flow.isin(self.esm_end_use_demands)].CPC)] cpc_list = list(set([item for sublist in cpc_list for item in sublist])) # flatten the list of lists for act in tqdm(activities_of_esm_region): technosphere_flows = Dataset(act).get_technosphere_flows() for flow in technosphere_flows: database = flow['database'] code = flow['code'] act_flow = db_as_dict_code[(database, code)] if 'classifications' in list(act_flow.keys()): if 'CPC' in dict(act_flow['classifications']).keys(): cpc_flow = dict(act_flow['classifications'])['CPC'] if cpc_flow in cpc_list: # Keep track of the amount in the original activity as a comment old_amount = flow['amount'] flow['comment'] = f'Original amount: {old_amount}. ' + flow.get('comment', '') flow['amount'] = 0 # Setting the amount to zero double_counting_report.append([ act['name'], act['reference product'], act['location'], act['database'], act['code'], old_amount, flow['unit'], act_flow['name'], act_flow['reference product'], act_flow['location'], act_flow['database'], act_flow['code'], ]) else: pass else: pass double_counting_report = pd.DataFrame(double_counting_report, columns=[ 'Activity name', 'Activity reference product', 'Activity location', 'Activity database', 'Activity code', 'Removed amount', 'Unit', 'Removed flow name', 'Removed flow reference product', 'Removed flow location', 'Removed flow database', 'Removed flow code', ]) # Injecting local variables into the instance variables self.main_database.db_as_list = db_as_list double_counting_report.to_csv(f'{self.results_path_file}background_double_counting_report.csv', index=False) if write_database: self.main_database.write_to_brightway(new_db_name)
[docs] def validation_double_counting( self, esm_results: pd.DataFrame = None, return_validation_report: bool = True, save_validation_report: bool = False, ) -> None or pd.DataFrame: """ Generate a validation report for the double-counting removal process: comparison of quantities removed in LCI datasets vs quantities in ESM flows. LCI datasets quantities are converted in ESM units (both in terms of inputs and outputs, i.e., quantities of input fuels per functional unit). If an ESM results dataframe is provided, the input flows are aggregated to compare the system's primary energy use. :param return_validation_report: if True, returns a DataFrame with the validation report (double-counting removal or primary energy use, depending on whether esm_results is provided). :param esm_results: dataframe containing the annual production of each technology in the ESM. It must contain the columns 'Name' and 'Production', and it can possibly contain the 'Run' and 'Year' columns too. If provided, the system's primary energy use will be compared. :param save_validation_report: if True, saves the validation report as a CSV file in self.results_path_file. :return: None or DataFrame with the validation report if return_validation_report is True """ if esm_results is None: df = self._correct_esm_and_lca_efficiency_differences( write_efficiency_report=False, return_efficiency_report=True, db_type='validation', ) df.Flow = df.Flow.astype('string').str.replace("['", "") df.Flow = df.Flow.astype('string').str.replace("']", "") df['Input difference (ESM unit)'] = df['ESM input quantity (ESM unit)'] - df['LCA input quantity (ESM unit) aggregated'] df['Input difference (%)'] = df.apply( lambda row: 100 * row['Input difference (ESM unit)'] / row['LCA input quantity (ESM unit)'] if row['LCA input quantity (ESM unit)'] != 0 else None, axis=1 ) df = df[[ 'Name', 'Flow', 'LCA input product', 'ESM input quantity (ESM unit)', 'LCA input quantity (ESM unit)', 'LCA input quantity (ESM unit) aggregated', 'Input difference (ESM unit)', 'Input difference (%)', 'LCA input quantity (LCA unit)', 'ESM input unit', 'LCA input unit', 'Input conversion factor', 'ESM output unit', 'LCA output unit', 'Output conversion factor', ]] if save_validation_report: df.to_csv(f'{self.results_path_file}validation_double_counting.csv', index=False) if return_validation_report: return df else: df = pd.read_csv(f'{self.results_path_file}validation_double_counting.csv') id_columns = ['Name'] group_by_columns = ['Flow'] if 'Year' in df.columns and 'Year' in esm_results.columns: id_columns.append('Year') group_by_columns.append('Year') if 'Run' in esm_results.columns: group_by_columns.append('Run') df_tot = df.merge(esm_results, on=id_columns) df_tot['ESM input quantity (ESM unit)'] *= df_tot['Production'] df_tot['LCA input quantity (ESM unit) aggregated'] *= df_tot['Production'] df_tot = df_tot[ group_by_columns + ['Name', 'ESM input quantity (ESM unit)', 'LCA input quantity (ESM unit) aggregated'] ].drop_duplicates() # avoid double-counting aggregated flows df_tot = df_tot.groupby(group_by_columns).sum()[ ['ESM input quantity (ESM unit)', 'LCA input quantity (ESM unit) aggregated'] ].reset_index() df_tot['Input difference'] = df_tot['ESM input quantity (ESM unit)'] - df_tot['LCA input quantity (ESM unit) aggregated'] df_tot['Input difference (%)'] = df_tot.apply( lambda row: (row['Input difference'] / row['LCA input quantity (ESM unit) aggregated']) * 100 if row['LCA input quantity (ESM unit) aggregated'] != 0 else None, axis=1 ) if save_validation_report: df_tot.to_csv(f'{self.results_path_file}validation_double_counting_system.csv', index=False) if return_validation_report: return df_tot