Source code for mescal.change_ecoinvent

import pandas as pd
from .utils import ecoinvent_unit_convention
from .filesystem_constants import DATA_DIR


[docs] def load_change_report_annex( v_from: str, v_to: str ) -> pd.DataFrame: """ Load the change report annex between two versions of the ecoinvent database :param v_from: initial version of the ecoinvent database :param v_to: next version of the ecoinvent database :return: change report annex as a pandas DataFrame """ v_from_main = '.'.join(v_from.split('.')[:2]) df = pd.read_excel(io=DATA_DIR / "ecoinvent_change_reports" / f"Change Report Annex v{v_from} - v{v_to}.xlsx", sheet_name="Qualitative Changes", usecols=[ f'Reference Product - {v_from}', f'Reference Product Unit - {v_from}', f'Activity Name - {v_from}', f'Geography - {v_from}', f'Reference Product - {v_to}', f'Reference Product Unit - {v_to}', f'Activity Name - {v_to}', f'Geography - {v_to}', f'Dataset in version {v_from_main} has been deleted' ]) df.rename(columns={ f'Activity Name - {v_from}': 'Activity Name', f'Geography - {v_from}': 'Geography', f'Reference Product - {v_from}': 'Reference Product', f'Reference Product Unit - {v_from}': 'Unit', f'Activity Name - {v_to}': 'Activity Name - new', f'Geography - {v_to}': 'Geography - new', f'Reference Product - {v_to}': 'Reference Product - new', f'Reference Product Unit - {v_to}': 'Unit - new', f'Dataset in version {v_from_main} has been deleted': 'Deleted' }, inplace=True) df['Version from'] = v_from df['Version to'] = v_to return df
[docs] def concatenate_change_reports( v_from: str, v_to: str ) -> pd.DataFrame: """ Concatenate change reports annexes of the ecoinvent database :param v_from: initial version of the ecoinvent database :param v_to: final version of the ecoinvent database :return: concatenated change report annex as a pandas DataFrame """ ecoinvent_versions = ['3.8', '3.9', '3.9.1', '3.10', '3.10.1', '3.11'] change_reports = [] i = 0 while v_from != ecoinvent_versions[i]: i += 1 while v_to != ecoinvent_versions[i]: change_reports.append(load_change_report_annex(ecoinvent_versions[i], ecoinvent_versions[i + 1])) i += 1 return pd.concat(change_reports)
[docs] def handle_multi_processes_ecoinvent(df: pd.DataFrame) -> pd.DataFrame: """ Handle multi-processes activities in the ecoinvent change report annex :param df: dataframe of the concatenated change report annex :return: updated dataframe with multi-processes activities separated """ updated_df = pd.DataFrame(data=[], columns=df.columns) for i in range(len(df)): if ';' in str(df['Reference Product'].iloc[i]): products = df['Reference Product'].iloc[i].split(';\n') units = df['Unit'].iloc[i].split(';\n') if str(df['Reference Product - new'].iloc[i]) == 'nan': new_products = ['nan'] * len(products) else: new_products = df['Reference Product - new'].iloc[i].split(';\n') if str(df['Unit - new'].iloc[i]) == 'nan': new_units = ['nan'] * len(units) else: new_units = df['Unit - new'].iloc[i].split(';\n') for product, unit, new_product, new_unit in zip(products, units, new_products, new_units): updated_df.loc[len(updated_df)] = [df['Activity Name'].iloc[i], df['Geography'].iloc[i], product, unit, df['Activity Name - new'].iloc[i], df['Geography - new'].iloc[i], new_product, new_unit, df['Deleted'].iloc[i], df['Version from'].iloc[i], df['Version to'].iloc[i]] else: updated_df.loc[len(updated_df)] = df.iloc[i].tolist() return updated_df
[docs] def load_concatenated_ecoinvent_change_report( v_from: str, v_to: str ) -> pd.DataFrame: """ Load the concatenated change report between two versions of the ecoinvent database :param v_from: initial version of the ecoinvent database :param v_to: next version of the ecoinvent database :return: concatenated change report as a pandas DataFrame """ df = concatenate_change_reports(v_from, v_to) df = handle_multi_processes_ecoinvent(df) df = df.reset_index(drop=True) df_glo = df[(df['Geography'] == 'GLO') & (df['Geography - new'] == 'GLO')] # global activities for i in range(len(df_glo)): # Add new row for the same activity but with RoW as location to fill missing locations in the change report new_row = df_glo.iloc[i].tolist() new_row[1], new_row[5] = 'RoW', 'RoW' df.loc[len(df)] = new_row # Only keep rows with changes in reference product, activity name or geography df = df.drop(df[ (df['Reference Product - new'] == df['Reference Product']) & (df['Activity Name - new'] == df['Activity Name']) & (df['Geography - new'] == df['Geography']) ].index) return df
[docs] def update_mapping_file( mapping: pd.DataFrame, change_report: pd.DataFrame, unit_to_change: list = None ) -> tuple[pd.DataFrame, int, [tuple[str, str], tuple[str, str, str], str, str]]: """ Update the mapping file with the concatenated change report :param mapping: mapping between the LCI datasets and the ESM technologies :param change_report: concatenated change report between two versions of the ecoinvent database :param unit_to_change: list of tuples in case a unit change has been detected :return: updated mapping, number of changes, list of tuples with unit changes """ changed_activities = [list(e) for e in {tuple(item) for item in change_report[ ['Reference Product', 'Activity Name', 'Geography']].values.tolist()}] updated_mapping = pd.DataFrame(data=[], columns=mapping.columns) counter = 0 if unit_to_change is None: unit_to_change = [] for i in range(len(mapping)): activity_name = mapping['Activity'].iloc[i] activity_prod = mapping['Product'].iloc[i] activity_geo = mapping['Location'].iloc[i] tech_name = mapping['Name'].iloc[i] tech_type = mapping['Type'].iloc[i] database = mapping['Database'].iloc[i] # REMIND and IMAGE regions are not in the change report if activity_geo in ['CAZ', 'CHA', 'NEU', 'EUR', 'IND', 'JPN', 'LAM', 'MEA', 'OAS', 'REF', 'SSA', 'USA', 'RSAM', 'RCAM', 'INDO', 'RSAF', 'CEU', 'SAF', 'INDIA', 'BRA', 'STAN', 'WAF', 'CHN', 'NAF', 'UKR', 'RSAS', 'RUS', 'SEAS', 'KOR', 'JAP', 'EAF', 'TUR', 'CAN', 'MEX', 'WEU']: activity_geo = 'RoW' if [activity_prod, activity_name, activity_geo] in changed_activities: counter += 1 activity_name_new, activity_prod_new, activity_geo_new, unit, unit_new, deleted = change_report[ (change_report['Reference Product'] == activity_prod) & (change_report['Activity Name'] == activity_name) & (change_report['Geography'] == activity_geo) ][['Activity Name - new', 'Reference Product - new', 'Geography - new', 'Unit', 'Unit - new', 'Deleted']].iloc[0] # Switch to ecoinvent database standard unit convention unit = ecoinvent_unit_convention(unit) unit_new = ecoinvent_unit_convention(unit_new) if unit != unit_new: print(f"WARNING: unit changed for {activity_prod} - {activity_name} - {activity_geo}") unit_to_change.append( [(tech_name, tech_type), (activity_prod, activity_name, activity_geo), unit, unit_new]) if (str(activity_name) == 'nan') & (deleted == 1): raise ValueError( f"Activity {activity_prod} - {activity_name} - {activity_geo} has been deleted in the last " f"ecoinvent version and should be replaced.") else: updated_mapping.loc[i] = [tech_name, tech_type, activity_prod_new, activity_name_new, activity_geo_new, database] print(tech_name, tech_type) print(f"Old: {activity_prod} - {activity_name} - {activity_geo}") print(f"New: {activity_prod_new} - {activity_name_new} - {activity_geo_new}") else: updated_mapping.loc[i] = mapping.iloc[i] return updated_mapping, counter, unit_to_change
[docs] def change_database_name_in_mapping_file( row: pd.Series, version_from: str, version_to: str, name_complementary_db: str = None ) -> pd.Series: """ Change the name of the database in the mapping file :param row: row of the mapping file :param version_from: initial version of the ecoinvent database :param version_to: target version of the ecoinvent database :param name_complementary_db: name of the complementary database :return: updated row """ if row.Database == name_complementary_db: # if it is the complementary database # Carculator truck databases if 'urban delivery' in row.Activity: row.Database = 'urban delivery_truck' elif 'regional delivery' in row.Activity: row.Database = 'regional delivery_truck' elif 'long haul' in row.Activity: row.Database = 'long haul_truck' else: raise ValueError(f"{row.Name} is in the complementary database and its database could not be updated.") else: row.Database = row.Database.replace(version_from, version_to) return row
[docs] def change_ecoinvent_version_mapping( mapping: pd.DataFrame, v_from: str, v_to: str, name_complementary_db: str = None ) -> tuple[pd.DataFrame, [tuple[str, str], tuple[str, str, str], str, str]]: """ Change the version of the ecoinvent database in the mapping file :param mapping: mapping between the LCI datasets and the ESM technologies :param v_from: initial version of the ecoinvent database :param v_to: target version of the ecoinvent database :param name_complementary_db: name of the complementary database :return: updated mapping, list of tuples with unit changes """ change_report = load_concatenated_ecoinvent_change_report(v_from, v_to) updated_mapping, counter, unit_to_change = update_mapping_file(mapping, change_report) while counter > 0: updated_mapping, counter, unit_to_change = update_mapping_file(updated_mapping, change_report, unit_to_change) updated_mapping = updated_mapping.apply( lambda row: change_database_name_in_mapping_file(row, v_from, v_to, name_complementary_db), axis=1 ) return updated_mapping, unit_to_change
[docs] def update_unit_conversion_file( unit_conversion: pd.DataFrame, unit_changes: list, new_unit_conversion_factors: dict ) -> pd.DataFrame: """ Adapt the unit conversion file according to the possible unit changes in the mapping file :param unit_conversion: file with unit conversion factors :param unit_changes: list of tuples with unit changes :param new_unit_conversion_factors: dictionary with new unit conversion factors :return: updated unit conversion file """ for i in range(len(unit_changes)): unit_esm, unit_lca = unit_conversion[ (unit_conversion.Name == unit_changes[i][0][0]) & (unit_conversion.Type == unit_changes[i][0][1]) ][['ESM', 'LCA']].values[0] if unit_lca != unit_changes[i][2]: raise ValueError(f'LCA unit for {unit_changes[i][0][0]} - {unit_changes[i][0][1]} ' f'is not the same as the one in the mapping file. {unit_lca} != {unit_changes[i][2]}') else: if unit_changes[i][0] in new_unit_conversion_factors.keys(): new_value = new_unit_conversion_factors[unit_changes[i][0]][0] new_comment = new_unit_conversion_factors[unit_changes[i][0]][1] else: raise ValueError(f"Missing new unit conversion factor for {unit_changes[i][0]}") # delete current row unit_conversion = unit_conversion.drop(unit_conversion[ (unit_conversion.Name == unit_changes[i][0][0]) & (unit_conversion.Type == unit_changes[i][0][1]) ].index) # add new row unit_conversion.loc[unit_conversion.index.max() + 1] = [ unit_changes[i][0][0], unit_changes[i][0][1], new_value, unit_changes[i][3], unit_esm, new_comment, ] return unit_conversion