Source code for mescal.decommission

from .utils import random_code
import pandas as pd
import ast


[docs] def _add_decommission_datasets( self, add_decom_ds_to_db: bool = True, ) -> None: """ This method aggregates the EoL flows that have been identified during the double-counting removal step in a decommission dataset. This is applied to every technology that has not already a decommission adataset. The mapping, unit conversion, and technology compositions files are updated accordingly. :param add_decom_ds_to_db: If True, decommission datasets are added to the main database. This argument is True when decommission datasets are created during the create_esm_database method. This argument is False there is a need to recover decommission datasets codes from the ESM database (outside the create_esm_database method). :return: None """ if self.df_flows_set_to_zero is None: self.df_flows_set_to_zero = pd.read_csv(f'{self.results_path_file}removed_flows_list.csv') # Store frequently accessed instance variables in local variables inside a method df = self.df_flows_set_to_zero mapping_constr = self.mapping_constr mapping_decom = self.mapping_decom unit_conversion = self.unit_conversion technology_compositions = self.technology_compositions esm_db_name = self.esm_db_name db_as_list = self.main_database.db_as_list db_dict_code = self.main_database.db_as_dict_code # readings lists as lists and not strings try: self.technology_compositions.Components = self.technology_compositions.Components.apply(ast.literal_eval) except ValueError: pass comp_to_tech = {} for _, row in technology_compositions.iterrows(): if row.Type == 'Construction': for comp in row.Components: comp_to_tech[comp] = row.Name new_mapping_data = [] new_technology_compositions_dict = {} df_removed_decom = df[ (df.Type == 'Construction') & (df['Amount (scaled to the FU)'] < 0) # waste flows only ] for tech in df_removed_decom.Name.unique(): # iterate over construction technologies if tech in mapping_decom.Name.unique(): continue # skip technologies that already have a decommission dataset if tech in comp_to_tech: parent_tech = comp_to_tech[tech] if parent_tech in mapping_decom.Name.unique(): continue # skip components of technologies that already have a decommission dataset else: if parent_tech in new_technology_compositions_dict: new_technology_compositions_dict[parent_tech] += [tech] else: new_technology_compositions_dict[parent_tech] = [tech] act_constr_code, act_constr_database = mapping_constr[mapping_constr.Name == tech][['Current_code', 'Database']].iloc[0] act_constr = db_dict_code[(act_constr_database, act_constr_code)] if add_decom_ds_to_db: df_removed_decom_tech = df_removed_decom[(df_removed_decom.Name == tech)].reset_index(drop=True) new_code = random_code() exchanges = [{ 'amount': -1, 'code': new_code, 'type': 'production', 'name': f'{tech}, Decommission', 'product': f'used {act_constr["reference product"]}', 'unit': act_constr['unit'], 'location': act_constr['location'], 'database': esm_db_name, }] for i in range(len(df_removed_decom_tech)): exchanges.append( { 'amount': df_removed_decom_tech['Amount (scaled to the FU)'].iloc[i], 'code': df_removed_decom_tech['Removed flow code'].iloc[i], 'type': 'technosphere', 'name': df_removed_decom_tech['Removed flow activity'].iloc[i], 'product': df_removed_decom_tech['Removed flow product'].iloc[i], 'unit': df_removed_decom_tech['Unit'].iloc[i], 'location': df_removed_decom_tech['Removed flow location'].iloc[i], 'database': df_removed_decom_tech['Removed flow database'].iloc[i], } ) new_activity = { 'database': esm_db_name, 'name': f'{tech}, Decommission', 'location': act_constr['location'], 'unit': act_constr['unit'], 'reference product': f'used {act_constr["reference product"]}', 'code': new_code, 'classifications': [('CPC', '39990: Other wastes n.e.c.')], 'comment': f'Activity derived from the aggregation of waste flows in ({act_constr["reference product"]} ' f'- {act_constr["name"]} - {act_constr["location"]})', 'parameters': {}, 'exchanges': exchanges, } db_as_list.append(new_activity) else: # Recover decommission datasets new codes from ESM database new_code = [i['code'] for i in self.esm_db.db_as_list if i['name'] == f'{tech}, Decommission'][0] new_mapping_data.append([ tech, 'Decommission', f'used {act_constr["reference product"]}', f'{tech}, Decommission', act_constr["location"], esm_db_name, None, new_code, ]) new_mapping = pd.DataFrame( data=new_mapping_data, columns=['Name', 'Type', 'Product', 'Activity', 'Location', 'Database', 'Current_code', 'New_code'] ) new_unit_conversion = unit_conversion[ (unit_conversion.Name.isin([tech for tech in new_mapping.Name.unique()])) & (unit_conversion.Type == 'Construction') ] new_unit_conversion['Type'] = 'Decommission' new_unit_conversion['Value'] *= -1 # waste flows new_unit_conversion_comp = unit_conversion[ (unit_conversion.Name.isin([tech for tech in new_technology_compositions_dict.keys()])) & (unit_conversion.Type == 'Construction') ] new_unit_conversion_comp['Type'] = 'Decommission' # no change to 'Value' because the -1 factor is already in the unit conversion factors of subcomponents new_technology_compositions = pd.DataFrame({ "Name": list(new_technology_compositions_dict.keys()), "Components": list(new_technology_compositions_dict.values()), }) new_technology_compositions['Type'] = 'Decommission' self.added_decom_to_input_data = True # Injecting local variables into the instance variables self.main_database.db_as_list = db_as_list self.mapping = pd.concat([self.mapping, new_mapping], ignore_index=True) self.unit_conversion = pd.concat([unit_conversion, new_unit_conversion, new_unit_conversion_comp], ignore_index=True) self.technology_compositions = pd.concat([technology_compositions, new_technology_compositions], ignore_index=True)