from .database import Dataset, Database
from .utils import random_code, ecoinvent_unit_convention
import wurst
import pandas as pd
import ast
import copy
import bw2data as bd
from tqdm import tqdm
[docs]
def create_new_database_with_esm_results(
self,
esm_results: pd.DataFrame,
new_end_use_types: pd.DataFrame = None,
tech_to_remove_layers: pd.DataFrame = None,
return_database: bool = False,
write_database: bool = True,
remove_background_construction_flows: bool = True,
harmonize_efficiency_with_esm: bool = True,
harmonize_capacity_factor_with_esm: bool = False,
esm_results_db_name: str = None,
name_capacity_factor_difference_file: str = 'capacity_factor_differences',
write_cp_report: bool = True,
) -> Database | None:
"""
Create a new database with the ESM results
:param esm_results: results of the ESM in terms of annual production and installed capacity. It should contain the
columns 'Name', 'Production', and 'Capacity'.
:param tech_to_remove_layers: technologies to remove from the result LCI datasets
:param new_end_use_types: adapt end use types to fit the results LCI datasets mapping
:param return_database: if True, return the new database
:param write_database: if True, write the new database in the Brightway project
:param remove_background_construction_flows: if True, the new LCI datasets undergo the double-counting removal
process to remove background construction flows. It should be set to True in the context of a loop between
the ESM and LCI database, in order to not count the infrastructure impacts several times over several
time-steps. It should be set to False if the new database is meant to be shared or used as a standalone
database.
:param harmonize_efficiency_with_esm: if True, apply the efficiency correction to harmonize the LCI datasets with
the ESM assumptions
:param harmonize_capacity_factor_with_esm: if True, apply the capacity factor correction to harmonize the LCI
datasets with the ESM assumptions. It should be set to False if the background construction flows are removed.
:param esm_results_db_name: name of the new database with the ESM results
:param name_capacity_factor_difference_file: name of the file to save the capacity factor differences
:param write_cp_report: if True, save a csv file reporting capacity factors differences in the results folder
:return: database of the ESM results if return_database is True, else None
"""
if return_database is False and write_database is False:
raise ValueError('The new database should be either returned or written.')
if harmonize_capacity_factor_with_esm and remove_background_construction_flows:
raise ValueError('The harmonization of capacity factors with the ESM results cannot be performed '
'if the background construction flows are removed. Please set one of the two to False.')
if self.df_flows_set_to_zero is None:
self.df_flows_set_to_zero = pd.read_csv(f'{self.results_path_file}removed_flows_list.csv')
if self.double_counting_removal_amount is None:
self.double_counting_removal_amount = pd.read_csv(f'{self.results_path_file}double_counting_removal.csv')
if 'Current_code' not in self.mapping.columns:
self._get_original_code()
if 'New_code' not in self.mapping.columns:
self._get_new_code()
# Store frequently accessed instance variables in local variables inside a method
mapping = self.mapping
esm_location = self.esm_location
if tech_to_remove_layers is None:
self.tech_to_remove_layers = pd.DataFrame(columns=['Layers', 'Technologies'])
else:
self.tech_to_remove_layers = tech_to_remove_layers
if new_end_use_types is None:
new_end_use_types = pd.DataFrame(columns=['Name', 'Search type', 'Old', 'New'])
if esm_results_db_name is not None:
self.esm_results_db_name = esm_results_db_name
esm_results_db_name = self.esm_results_db_name
flows = mapping[mapping.Type == 'Flow']
already_done = []
perform_d_c = []
# Create the new LCI datasets from the ESM results
self.logger.info("Creating new LCI datasets from the ESM results...")
for i in tqdm(range(len(flows))):
original_activity_prod = flows.Product.iloc[i]
original_activity_name = flows.Activity.iloc[i]
original_activity_database = flows.Database.iloc[i]
if (
original_activity_name,
original_activity_prod,
esm_location,
original_activity_database
) in already_done:
pass
else:
new_perform_d_c = self._create_or_modify_activity_from_esm_results(
original_activity_prod=original_activity_prod,
original_activity_name=original_activity_name,
original_activity_database=original_activity_database,
flows=flows,
esm_results=esm_results,
new_end_use_types=new_end_use_types,
)
already_done.append((original_activity_name,
original_activity_prod,
esm_location,
original_activity_database))
perform_d_c += new_perform_d_c
# Store frequently accessed instance variables in local variables inside a method
db_as_list = self.main_database.db_as_list
# Double counting removal of the construction activities
double_counting_act = pd.DataFrame(
data=perform_d_c,
columns=['Name', 'Product', 'Activity', 'Location', 'Database', 'Current_code']
)
double_counting_act.drop_duplicates(inplace=True) # remove potential duplicates
# Adding current code to the mapping file
mapping['Current_code'] = mapping.apply(lambda row: self.main_database.get_code(
product=row['Product'],
activity=row['Activity'],
location=row['Location'],
database=row['Database']
), axis=1)
model = self.model.pivot(index='Name', columns='Flow', values='Amount').reset_index()
model.fillna(0, inplace=True)
N = double_counting_act.shape[1]
double_counting_act = pd.merge(double_counting_act, model, on='Name', how='left')
double_counting_act['CONSTRUCTION'] = double_counting_act.shape[0] * [0]
double_counting_act = self._add_technology_specifics(double_counting_act)
# Injecting local variables into the instance variables
self.mapping = mapping
self.main_database.db_as_list = db_as_list
if remove_background_construction_flows:
# Double-counting removal of background construction flows
self.logger.info("Removing double-counted construction flows...")
flows_set_to_zero, ei_removal, activities_subject_to_double_counting = self._double_counting_removal(
df=double_counting_act,
N=N,
ESM_inputs=['OWN_CONSTRUCTION', 'CONSTRUCTION'],
db_type='esm results',
)
elif (harmonize_efficiency_with_esm or harmonize_capacity_factor_with_esm) and not remove_background_construction_flows:
# if we do not want to remove construction flows, but we want to harmonize the database with the ESM,
# the background processes of markets should be recorded in the ESM results database following the
# background search algorithm of the double-counting removal procedure. However, flows are NOT removed
# during this process.
flows_set_to_zero, ei_removal, activities_subject_to_double_counting = self._double_counting_removal(
df=double_counting_act,
N=N,
ESM_inputs=['OWN_CONSTRUCTION', 'CONSTRUCTION'],
db_type='esm results wo dcr',
)
if harmonize_efficiency_with_esm:
# Add ESM database to the main database (to retrieve the corrected datasets)
if self.esm_db is not None:
esm_db = self.esm_db
else:
esm_db = Database(self.esm_db_name)
self.main_database = self.main_database + esm_db
self.logger.info("Correcting efficiency differences between ESM and LCI datasets...")
self._correct_esm_and_lca_efficiency_differences(db_type='esm results', write_efficiency_report=False)
# Remove the ESM database from the main database
self.main_database = self.main_database - esm_db
if harmonize_capacity_factor_with_esm:
self.logger.info("Correcting capacity factor differences between ESM and LCI datasets...")
self._correct_esm_and_lca_capacity_factor_differences(
esm_results=esm_results,
write_cp_report=write_cp_report,
name_capacity_factor_difference_file=name_capacity_factor_difference_file,
)
# Injecting local variables into the instance variables
self.main_database.db_as_list = db_as_list
esm_results_db = Database(
db_as_list=[act for act in self.main_database.db_as_list if act['database'] == esm_results_db_name]
)
if write_database:
esm_results_db.write_to_brightway(esm_results_db_name)
self.logger.info("Relinking the energy flows of the ESM results database to itself...")
self.connect_esm_results_to_database(
create_new_db=False,
esm_results_db_name=esm_results_db_name,
specific_db_name=esm_results_db_name,
)
if return_database:
self.main_database = self.main_database - esm_results_db
return esm_results_db
self.main_database = self.main_database - esm_results_db
if write_database:
# Modifies the written database according to specifications in tech_specifics.csv
self._modify_written_activities(db=esm_results_db, db_type='esm results')
else:
self.logger.info('The techs_specifics.csv file has not been applied because the database has not been written.')
[docs]
def connect_esm_results_to_database(
self,
create_new_db: bool = False,
new_db_name: str = None,
specific_db_name: str = None,
locations: list[str] or str = None,
update_exchanges_based_on_activity_name: bool = True,
) -> None:
"""
Connect new LCI datasets obtained from the ESM results to the main database
:param create_new_db: if True, create a new database connected to the ESM results database. If False, directly
modifies the main database.
:param new_db_name: name of the new database if create_new_db is True
:param specific_db_name: if you want to connect another database than the main database
:param locations: list of locations to be considered for connection with the ESM results database.
If None, only the ESM location is considered. If 'all', all locations are considered.
:param update_exchanges_based_on_activity_name: if True, update similar flows based on the activity and product
names, if False, update similar flows based on the product name only.
:return: None (copies and/or modifies the main database)
"""
if create_new_db is True:
if isinstance(self.main_database.db_names, list) & (len(self.main_database.db_names) > 1):
raise ValueError('The main database should contain only one database.')
elif isinstance(self.main_database.db_names, list) & (len(self.main_database.db_names) == 1):
self.main_database.db_names = self.main_database.db_names[0]
if new_db_name is not None and create_new_db is False:
raise ValueError('The new database name should be None if create_new_db is False.')
if new_db_name is None and create_new_db is True:
new_db_name = self.main_database.db_names + f'_with_esm_results_for_{self.esm_location}'
if locations is None:
locations = [self.esm_location]
esm_results_db_name = self.esm_results_db_name
# Store frequently accessed instance variables in local variables inside a method
db_dict_name = self.main_database.db_as_dict_name
if specific_db_name is not None:
specific_db = Database(db_names=specific_db_name)
db_as_list = specific_db.db_as_list
if specific_db_name == esm_results_db_name:
esm_results_db = specific_db
esm_results_db_dict_name = esm_results_db.db_as_dict_name
else:
esm_results_db = Database(db_names=esm_results_db_name)
esm_results_db_dict_name = esm_results_db.db_as_dict_name
else:
db_as_list = self.main_database.db_as_list
esm_results_db = Database(db_names=esm_results_db_name)
esm_results_db_dict_name = esm_results_db.db_as_dict_name
esm_location = self.esm_location
mapping = self.mapping
flows = mapping[mapping.Type == 'Flow']
already_done = []
# Activities of the ESM region
if locations == 'all':
activities_of_esm_region = db_as_list
else:
activities_of_esm_region = [
a for loc in locations
for a in wurst.get_many(
db_as_list,
wurst.equals('location', loc)
)
]
# Plugging the new activity in the database
for i in tqdm(range(len(flows))):
original_activity_name = flows.Activity.iloc[i]
activity_prod = flows.Product.iloc[i]
activity_database = flows.Database.iloc[i]
if (
original_activity_name,
activity_prod,
esm_location,
esm_results_db_name
) in already_done:
continue
else:
already_done.append((
original_activity_name,
activity_prod,
esm_location,
esm_results_db_name
))
if (
original_activity_name,
activity_prod,
esm_location,
esm_results_db_name,
) in esm_results_db_dict_name:
# if not, it means that the activity has not been created in the previous step
# (e.g., no production, trivial results)
new_activity = esm_results_db_dict_name[
original_activity_name,
activity_prod,
esm_location,
esm_results_db_name,
]
for act in activities_of_esm_region:
if act['name'] == original_activity_name and act['location'] == esm_location:
pass # we do not want the new activity to be an input of itself
else:
if create_new_db:
for exc in Dataset(act).get_technosphere_flows():
if (
((exc['name'] == original_activity_name)
| (exc['name'] == original_activity_name.replace('market', 'market group'))
| (not update_exchanges_based_on_activity_name))
& (exc['product'] == activity_prod)
& (exc['database'] != esm_results_db_name)
):
exc['name'] = new_activity['name']
exc['code'] = new_activity['code']
exc['database'] = esm_results_db_name
exc['input'] = (esm_results_db_name, new_activity['code'])
exc['location'] = esm_location
else:
act_bw = bd.Database(act['database']).get(act['code'])
k = 0 # exchange modification counter
for exc in [i for i in act_bw.technosphere()]:
if (
((exc['name'] == original_activity_name)
| (exc['name'] == original_activity_name.replace('market', 'market group'))
| (not update_exchanges_based_on_activity_name))
& (exc['product'] == activity_prod)
& ((exc['database'] != esm_results_db_name)
| (exc['input'][0] != esm_results_db_name))
):
k += 1
exc['name'] = new_activity['name']
exc['code'] = new_activity['code']
exc['database'] = esm_results_db_name
exc['input'] = (esm_results_db_name, new_activity['code'])
exc['location'] = esm_location
exc.save()
if k > 0: # if exchanges have been modified
act_bw.save()
# Downstream activities of the original activity, if it exists for the ESM location
for loc in locations:
if (original_activity_name, activity_prod, loc, activity_database) in db_dict_name:
original_activity = db_dict_name[
original_activity_name, activity_prod, loc, activity_database
]
downstream_consumers = Dataset(original_activity).get_downstream_consumers(db_as_list)
for act in downstream_consumers:
if act['name'] == original_activity_name and act['location'] == esm_location:
pass # we do not want the new activity to be an input of itself
else:
if create_new_db:
for exc in Dataset(act).get_technosphere_flows():
if (
(exc['name'] == original_activity_name)
& (exc['product'] == activity_prod)
& (exc['location'] == loc)
& (exc['database'] != esm_results_db_name)
):
exc['code'] = new_activity['code']
exc['database'] = esm_results_db_name
exc['input'] = (esm_results_db_name, new_activity['code'])
exc['location'] = esm_location
else:
act_bw = bd.Database(act['database']).get(act['code'])
k = 0 # exchange modification counter
for exc in [i for i in act_bw.technosphere()]:
if (
(exc['name'] == original_activity_name)
& (exc['product'] == activity_prod)
& (exc['location'] == loc)
& ((exc['database'] != esm_results_db_name)
| (exc['input'][0] != esm_results_db_name))
):
k += 1
exc['code'] = new_activity['code']
exc['database'] = esm_results_db_name
exc['input'] = (esm_results_db_name, new_activity['code'])
exc['location'] = esm_location
exc.save()
if k > 0: # if exchanges have been modified
act_bw.save()
if specific_db_name is None:
# Injecting local variables into the instance variables
self.main_database.db_as_list = db_as_list
if create_new_db:
# Write the new database
new_db = Database(db_as_list=db_as_list)
new_db = new_db - esm_results_db
new_db.write_to_brightway(new_db_name)
[docs]
def _create_or_modify_activity_from_esm_results(
self,
original_activity_prod: str,
original_activity_name: str,
original_activity_database: str,
flows: pd.DataFrame,
esm_results: pd.DataFrame,
new_end_use_types: pd.DataFrame,
) -> list[list[str]]:
"""
Create or modify an activity in the LCI database based on the ESM results
:param original_activity_prod: reference product of the original activity
:param original_activity_name: name of the original activity
:param original_activity_database: database of the original activity
:param flows: mapping file between ESM flows and LCI datasets
:param esm_results: results of the ESM in terms of annual production and installed capacity. It should contain the
columns 'Name', 'Production', and 'Capacity'.
:param new_end_use_types: adapt end use types to fit the results LCI datasets mapping
:return: list of activities to perform double counting removal
"""
# Store frequently accessed instance variables in local variables inside a method
db_dict_name = self.main_database.db_as_dict_name
db_dict_code = self.main_database.db_as_dict_code
mapping = self.mapping
esm_location = self.esm_location
db_as_list = self.main_database.db_as_list
unit_conversion = self.unit_conversion
model = self.model
esm_results_db_name = self.esm_results_db_name
tech_to_remove_layers = self.tech_to_remove_layers
# Check if the original activity is in the database for the location under study
if (original_activity_name, original_activity_prod, esm_location, original_activity_database) in db_dict_name:
original_activity = db_dict_name[
original_activity_name, original_activity_prod, esm_location, original_activity_database
]
# If not, we take a similar activity with another location and regionalize its foreground
else:
original_activity = [a for a in wurst.get_many(db_as_list, *[
wurst.equals('name', original_activity_name),
wurst.equals('reference product', original_activity_prod),
wurst.equals('database', original_activity_database)
])][0]
original_activity = self._regionalize_activity_foreground(act=original_activity)
new_code = random_code()
original_activity_unit = original_activity['unit']
prod_flow = Dataset(original_activity).get_production_flow()
prod_flow_amount = prod_flow['amount'] # can be -1 if it is a waste activity
unit_conversion['LCA'] = unit_conversion['LCA'].apply(ecoinvent_unit_convention)
unit_conversion['ESM'] = unit_conversion['ESM'].apply(ecoinvent_unit_convention)
model['Flow'] = model.apply(lambda x: _replace_mobility_end_use_type(
row=x,
new_end_use_types=new_end_use_types
), axis=1)
model = pd.merge(
left=model,
right=model[model.Amount == 1.0].drop(columns=['Amount']).rename(columns={'Flow': 'Output'}),
how='left',
on='Name'
)
act_to_flows_dict = {(flows['Product'].iloc[i], flows['Activity'].iloc[i]): list(
flows[(flows['Product'] == flows['Product'].iloc[i])
& (flows['Activity'] == flows['Activity'].iloc[i])]['Name']
) for i in range(len(flows))}
flows_list = act_to_flows_dict[(original_activity_prod, original_activity_name)]
end_use_tech_list = list(model[model.Output.isin(flows_list)].Name.unique())
if len(end_use_tech_list) == 0:
# Case where the layer has no production
return []
try:
tech_to_remove_layers['Layers'] = tech_to_remove_layers['Layers'].apply(ast.literal_eval)
tech_to_remove_layers['Technologies'] = tech_to_remove_layers['Technologies'].apply(ast.literal_eval)
except ValueError:
pass
for i in range(len(tech_to_remove_layers)):
if set(flows_list) == set(tech_to_remove_layers.Layers.iloc[i]):
for tech in tech_to_remove_layers.Technologies.iloc[i]:
end_use_tech_list.remove(tech)
else:
pass
total_amount = 0 # initialize the total amount of production
check_layers_mapping = False
for tech in end_use_tech_list:
if tech in list(esm_results.Name.unique()):
amount = sum(esm_results[esm_results.Name == tech].Production)
else: # if the technology is not in the ESM results, we assume that its production is null
amount = 0
if tech in list(
mapping[(mapping.Type == 'Operation') | (mapping.Type == 'Resource')].Name.unique()):
total_amount += amount
check_layers_mapping = True
else:
pass # if the technology is not in the mapping file, we do not consider it in the result LCI dataset
if check_layers_mapping is False:
raise ValueError(f'The layer {flows_list} does not have any technology in the mapping file.')
if total_amount == 0: # no production in the layer
return []
exchanges = []
perform_d_c = []
for tech in end_use_tech_list:
if tech in list(esm_results.Name.unique()):
if self.operation_metrics_for_all_time_steps:
amount_per_year = {}
amount = 0
for year in [y for y in self.list_of_years if y <= self.year]:
amount_per_year[year] = sum(esm_results[
(esm_results.Name == tech)
& (esm_results['Year_inst'] == year)
].Production)
amount += amount_per_year[year]
else:
amount = sum(esm_results[esm_results.Name == tech].Production)
else:
amount = 0
if amount == 0:
pass
else:
if not self.pathway:
mapping.Year = None
for year in [self.year] if not self.operation_metrics_for_all_time_steps \
else [y for y in self.list_of_years if y <= self.year]:
if self.operation_metrics_for_all_time_steps:
amount = amount_per_year[year]
if tech in list(
mapping[
((mapping.Type == 'Operation') | (mapping.Type == 'Resource'))
& (True if not self.pathway else mapping.Year == year)
].Name.unique()):
(activity_name, activity_prod, activity_database,
activity_location, activity_current_code, activity_new_code) = mapping[
(mapping.Name == tech)
& ((mapping.Type == 'Operation') | (mapping.Type == 'Resource'))
& (True if not self.pathway else mapping.Year == year)
][['Activity', 'Product', 'Database', 'Location', 'Current_code', 'New_code']].values[0]
activity = db_dict_code[activity_database, activity_current_code]
activity_unit = activity['unit']
if activity_unit != original_activity_unit:
if original_activity_prod.split(',')[0] == 'transport':
conversion_factor = unit_conversion[
(unit_conversion.Name == tech)
& (unit_conversion.ESM == original_activity_unit)
& (unit_conversion.LCA == activity_unit)
].Value.values
else:
conversion_factor = unit_conversion[
(unit_conversion.Name == original_activity_prod.split(',')[0])
& (unit_conversion.ESM == original_activity_unit)
& (unit_conversion.LCA == activity_unit)
].Value.values
if len(list(set(conversion_factor))) == 0:
raise ValueError(f'The unit conversion factor between {activity_unit} and '
f'{original_activity_unit} for {original_activity_prod.split(",")[0]} '
f'is not in the unit conversion file.')
elif len(list(set(conversion_factor))) > 1:
raise ValueError(f'Multiple possible conversion factors between {activity_unit} and '
f'{original_activity_unit} for {original_activity_prod.split(",")[0]}')
else:
amount *= conversion_factor[0]
else:
conversion_factor = 1.0
if prod_flow_amount == -1.0: # for waste activities
amount *= -1.0
# Create new activity for the new exchange (because one activity may correspond to several ESM
# technologies, which might be adjusted later)
new_act = copy.deepcopy(activity)
if self.operation_metrics_for_all_time_steps:
new_act['name'] += f' ({tech}, {year})'
if year != self.year:
Dataset(new_act).relink(
name_database_unlink = [i['main_database'].db_names for i in self.time_steps
if i['year'] == year][0],
name_database_relink = [i['main_database'].db_names for i in self.time_steps
if i['year'] == self.year][0],
database_relink_as_list = db_as_list,
except_units = ['unit'],
)
else:
new_act['name'] += f' ({tech})'
new_act['code'] = activity_new_code
new_act['database'] = esm_results_db_name
prod_flow = Dataset(new_act).get_production_flow()
prod_flow['name'] = new_act['name']
prod_flow['code'] = activity_new_code
prod_flow['database'] = esm_results_db_name
if 'Operation' in self.regionalize_foregrounds:
# Regionalize the foreground of the new activity
new_act = self._regionalize_activity_foreground(act=new_act)
db_as_list.append(new_act)
db_dict_name[(
new_act['name'],
new_act['reference product'],
new_act['location'],
new_act['database']
)] = new_act
db_dict_code[(new_act['database'], new_act['code'])] = new_act
new_exc = {
'amount': amount / total_amount,
'code': activity_new_code,
'type': 'technosphere',
'name': new_act['name'],
'product': activity_prod,
'unit': activity_unit,
'location': new_act['location'],
'database': esm_results_db_name,
'comment': f'{tech}, {conversion_factor}',
}
exchanges.append(new_exc)
if tech in list(mapping[mapping.Type == 'Operation'].Name.unique()):
# we only perform double counting removal for the operation activities
perform_d_c.append(
[tech, activity_prod, activity_name, activity_location, esm_results_db_name, activity_new_code]
)
else:
self.logger.warning(f'The technology {tech} is not in the mapping file. '
f'It cannot be considered in the result LCI dataset.')
exchanges.append(
{
'amount': prod_flow_amount,
'code': new_code,
'type': 'production',
'name': original_activity_name,
'product': original_activity_prod,
'unit': original_activity_unit,
'location': esm_location,
'database': esm_results_db_name,
}
)
total_production_amount_original_activity = 0
for exc in original_activity['exchanges']:
if exc['unit'] not in [original_activity_unit, 'unit']:
# Add flows to the new activity that are not production or construction flows
exchanges.append(exc)
if exc['type'] == 'technosphere' and exc['unit'] == original_activity_unit:
total_production_amount_original_activity += exc['amount']
losses_original_activity = total_production_amount_original_activity - 1
if losses_original_activity > 0: # add a loss coefficient
exchanges.append(
{
'amount': losses_original_activity,
'code': new_code,
'type': 'technosphere',
'name': original_activity_name,
'product': original_activity_prod,
'unit': original_activity_unit,
'location': esm_location,
'database': esm_results_db_name,
}
)
new_activity = {
'database': esm_results_db_name,
'name': original_activity_name,
'location': esm_location,
'unit': original_activity_unit,
'reference product': original_activity_prod,
'code': new_code,
'classifications': original_activity['classifications'],
'comment': f'Activity derived from the ESM results in the layers {flows_list} for {esm_location}. '
+ original_activity.get('comment', ''),
'parameters': original_activity.get('parameters', {}),
'categories': original_activity.get('categories', None),
'exchanges': exchanges,
}
db_as_list.append(new_activity)
# Injecting local variables into the instance variables
self.main_database.db_as_list = db_as_list
self.mapping = mapping
return perform_d_c
[docs]
def _correct_esm_and_lca_capacity_factor_differences(
self,
esm_results: pd.DataFrame,
name_capacity_factor_difference_file: str,
write_cp_report: bool = True,
) -> None:
"""
Correct the differences of capacity factors between ESM technologies and their operation LCI datasets
during the creation of the ESM results database. Concretely, it changes the amount of the construction input flow
in the operation LCI dataset.
:param esm_results: results of the ESM in terms of annual production and installed capacity. It should contain the
columns 'Name', 'Production', and 'Capacity'.
:param name_capacity_factor_difference_file: name of the file to save the capacity factor differences
:param write_cp_report: if True, save a csv file reporting capacity factors differences in the results folder
:return: None
"""
db_dict_name = self.main_database.db_as_dict_name
mapping = self.mapping
esm_results_db_name = self.esm_results_db_name
df_flows_set_to_zero = self.df_flows_set_to_zero
unit_conversion = self.unit_conversion
lifetime = self.lifetime
tech_to_remove_layers = self.tech_to_remove_layers
# Keep only the operational flows (others are not relevant)
df_flows_set_to_zero = df_flows_set_to_zero[df_flows_set_to_zero['Type'] == 'Operation']
# readings lists as lists and not strings
try:
self.technology_compositions.Components = self.technology_compositions.Components.apply(ast.literal_eval)
except ValueError:
pass
technology_compositions_dict = {key: value for key, value in dict(zip(
self.technology_compositions.Name, self.technology_compositions.Components
)).items()}
capacity_factor_report_list = []
for tech in df_flows_set_to_zero.Name.unique():
skip_tech = False
if not self.pathway:
mapping.Year = None
esm_results.Year = None
if not self.operation_metrics_for_all_time_steps:
esm_results.Year_inst = None
for year in [self.year] if not self.operation_metrics_for_all_time_steps \
else [y for y in self.list_of_years if y <= self.year]:
if skip_tech:
continue
if len(esm_results[
(esm_results.Name == tech)
& (True if not self.operation_metrics_for_all_time_steps else esm_results.Year_inst == year)
]) == 0:
# if the technology is not in the ESM results, we skip it
continue
elif tech in [tec for sublist in tech_to_remove_layers.Technologies for tec in sublist]:
# if the technology is in the list of technologies to remove, we skip it
continue
act_to_adapt_list = []
techno_flows_to_correct_dict = {}
if tech not in technology_compositions_dict.keys(): # if the technology is not a composition
# simple technologies are seen as compositions of one technology
technology_compositions_dict[tech] = [tech]
amount_constr_per_subcomp = {}
for sub_comp in technology_compositions_dict[tech]:
try:
unit_conversion_factor_constr = unit_conversion[
(unit_conversion.Name == sub_comp)
& (unit_conversion.Type == 'Construction')
]['Value'].iloc[0]
except IndexError:
self.logger.warning(f'No unit conversion factor for construction found for {sub_comp}. '
f'The potential capacity factor difference cannot be corrected.')
skip_tech = True
continue
if sub_comp != tech:
unit_conversion_factor_constr *= unit_conversion[
(unit_conversion.Name == tech)
& (unit_conversion.Type == 'Construction')
]['Value'].iloc[0]
lifetime_lca = lifetime[(lifetime.Name == sub_comp)]['LCA'].iloc[0]
if pd.isna(lifetime_lca):
self.logger.warning(f'No LCA lifetime for {sub_comp}. Please provide a lifetime for this technology '
f'in the lifetime csv file. Until then, the capacity factor difference cannot '
f'be corrected.')
skip_tech = True
continue
annual_production = esm_results[
(esm_results.Name == tech)
& (True if not self.operation_metrics_for_all_time_steps else esm_results.Year_inst == year)
& (True if not self.pathway else esm_results.Year == self.year)
]['Production'].iloc[0]
installed_capacity = esm_results[
(esm_results.Name == tech)
& (True if not self.operation_metrics_for_all_time_steps else esm_results.Year_inst == year)
& (True if not self.pathway else esm_results.Year == self.year)
]['Capacity'].iloc[0]
# amount_constr_esm is the amount of infrastructure unit to be used in the operation LCI dataset
# given the annual production and installed capacity results of the ESM. This value can significantly differ
# from the original value in the operation LCI dataset, due to differences in assumptions and operating modes.
amount_constr_esm = installed_capacity * unit_conversion_factor_constr / (lifetime_lca * annual_production)
amount_constr_per_subcomp[sub_comp] = amount_constr_esm
if skip_tech:
continue
df_removed_construction_flows = df_flows_set_to_zero[
(df_flows_set_to_zero.Name == tech)
& (df_flows_set_to_zero.Unit == 'unit')
& (True if not self.pathway else df_flows_set_to_zero.Year == year)
]
for idx, row in df_removed_construction_flows.iterrows():
if row['Activity'] == f'{tech}, Operation':
act_name = mapping[(mapping.Name == tech) & (mapping.Type == 'Operation')]['Activity'].iloc[0]
if self.operation_metrics_for_all_time_steps:
act_name += f' ({tech}, {year})'
else:
act_name += f' ({tech})'
if (
act_name,
row['Product'],
row['Location'],
esm_results_db_name,
) in db_dict_name:
act_to_adapt = db_dict_name[(
act_name,
row['Product'],
row['Location'],
esm_results_db_name,
)]
else: # i.e., the technology is not used in the ESM configuration
act_to_adapt = None
else:
if (
row['Activity'],
row['Product'],
row['Location'],
esm_results_db_name,
) in db_dict_name:
act_to_adapt = db_dict_name[(
row['Activity'],
row['Product'],
row['Location'],
esm_results_db_name,
)]
else: # i.e., the technology is not used in the ESM configuration
act_to_adapt = None
if act_to_adapt is not None and act_to_adapt not in act_to_adapt_list: # avoid to apply correction several times
act_to_adapt_list.append(act_to_adapt)
techno_flows_to_correct_dict[
(act_to_adapt['database'], act_to_adapt['code'])
] = []
if act_to_adapt is not None:
act_exc = db_dict_name[(
row['Removed flow activity'],
row['Removed flow product'],
row['Removed flow location'],
row['Removed flow database'],
)]
techno_flows_to_correct_dict[
(act_to_adapt['database'], act_to_adapt['code'])
] += [(act_exc['database'], act_exc['code'])]
for act in act_to_adapt_list:
for exc in Dataset(act).get_technosphere_flows():
if (exc['database'], exc['code']) in techno_flows_to_correct_dict[(act['database'], act['code'])]:
if len(set([product for product in list(mapping[(mapping.Type == 'Construction')
& (mapping.Name == sub_comp)].Product.iloc[0]
for sub_comp in technology_compositions_dict[tech])])) < len(technology_compositions_dict[tech]):
# If several subcomponents have the same product name, we have to match flows based on the
# activity name as well
match_flows_based_on_activity_name = True
else:
match_flows_based_on_activity_name = False
for sub_comp in technology_compositions_dict[tech]:
if (
(exc['product'] == mapping[(mapping.Name == sub_comp) & (mapping.Type == 'Construction')].Product.iloc[0])
& ((exc['name'] == mapping[(mapping.Name == sub_comp) & (mapping.Type == 'Construction')].Activity.iloc[0])
| (not match_flows_based_on_activity_name))
):
amount_constr_esm = amount_constr_per_subcomp[sub_comp]
amount_constr_lca = exc['amount'] # original infrastructure amount in the operation LCI dataset
if amount_constr_lca < 0 < amount_constr_esm:
amount_constr_esm *= -1 # waste flows must remain negative
exc['amount'] = amount_constr_esm # we replace the latter by the one derived from ESM results
exc['comment'] = (f'TF multiplied by {round(amount_constr_esm / amount_constr_lca, 4)} (capacity '
f'factor). ' + exc.get('comment', ''))
# if amount_constr_lca == 0:
# print(act['name'], exc['name'])
capacity_factor_report_list.append([
tech,
exc['name'],
exc['product'],
exc['location'],
exc['database'],
exc['code'],
amount_constr_lca,
amount_constr_esm,
]) # reporting capacity factors differences
act['comment'] = (f'Infrastructure flows have been harmonized with the ESM to account for capacity factor '
f'differences. ') + act.get('comment', '')
if write_cp_report:
pd.DataFrame(
data=capacity_factor_report_list,
columns=['Name', 'Product', 'Activity', 'Location', 'Database', 'Code', 'Amount LCA', 'Amount ESM'],
).to_csv(f"{self.results_path_file}{name_capacity_factor_difference_file}.csv", index=False)
@staticmethod
[docs]
def _replace_mobility_end_use_type(row: pd.Series, new_end_use_types: pd.DataFrame) -> str:
"""
Reformat the end use type of the mobility technologies
:param row: row of the model dataframe
:param new_end_use_types: adapt end use types to fit the results LCI datasets mapping
:return: updated end use type
"""
for i in range(len(new_end_use_types)):
name = new_end_use_types.Name.iloc[i]
old_eut = new_end_use_types.Old.iloc[i]
new_eut = new_end_use_types.New.iloc[i]
search_type = new_end_use_types['Search type'].iloc[i]
if search_type == 'startswith':
if (row['Name'].startswith(name)) & (old_eut in row['Flow']) & (row['Amount'] == 1.0):
return new_eut
elif search_type == 'contains':
if (name in row['Name']) & (old_eut in row['Flow']) & (row['Amount'] == 1.0):
return new_eut
elif search_type == 'equals':
if (name == row['Name']) & (old_eut in row['Flow']) & (row['Amount'] == 1.0):
return new_eut
else:
raise ValueError('The search type should be either "startswith", "contains" or "equals".')
return row['Flow']