Source code for mescal.database

import os
import wurst
import copy
import bw2data as bd
import pandas as pd
import logging
from .caching import cache_database, load_db
from .utils import premise_changing_names, random_code
from .filesystem_constants import DIR_DATABASE_CACHE


[docs] class Dataset: """ Class to perform basic operations on LCI datasets (as dictionaries) """ def __init__(self, act: dict) -> None: """ Initialize the dataset :param act: LCI dataset """
[docs] self.act = act
[docs] self.product = act['reference product']
[docs] self.activity = act['name']
[docs] self.location = act['location']
[docs] self.database = act['database']
def __repr__(self) -> str: return f"Dataset({self.product}, {self.activity}, {self.location}, {self.database})"
[docs] def get_technosphere_flows(self) -> list[dict]: """ Get the technosphere flows of an activity :return: list of technosphere flows """ flows = [] for exc in self.act['exchanges']: if exc['type'] == 'technosphere': flows.append(exc) return flows
[docs] def get_biosphere_flows(self) -> list[dict]: """ Get the biosphere flows of an activity :return: list of biosphere flows """ flows = [] for exc in self.act['exchanges']: if exc['type'] == 'biosphere': flows.append(exc) return flows
[docs] def get_production_flow(self) -> dict or None: """ Get the production flow of an activity :return: production flow or None if it does not exist """ for exc in self.act['exchanges']: if exc['type'] == 'production': return exc return None
[docs] def get_downstream_consumers(self, db: list[dict]) -> list[dict]: """ Get the downstream consumers of an activity :param db: list of activities of the LCI database :return: list of downstream consumers """ act_code = self.act['code'] consumers = [] for a in db: ds = Dataset(a) for exc in ds.get_technosphere_flows(): if exc['code'] == act_code: consumers.append(a) continue return consumers
[docs] class Database: """ Class to perform basic operations on LCI databases. Databases can be formulated as a list of dictionaries or as a dictionary with the (database, code) tuple or the (name, product, location, database) tuple as key. """ def __init__( self, db_names: str | list[str] = None, db_as_list: list[dict] = None, create_pickle: bool = False, load_dependencies: bool = False, ): """ Initialize the database :param db_names: Name of the LCI database(s). Should be a string for a single database or a list of strings for several databases. :param db_as_list: List of dictionaries of the LCI database. :param create_pickle: if True, create a pickle file to store the database. Only used if db_names is provided. :param load_dependencies: if True, load the dependencies of the database(s). Only used if db_names is provided. """ # set up logging tool
[docs] self.logger = logging.getLogger('Database')
self.logger.setLevel(logging.INFO) self.logger.handlers = [] formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(formatter) self.logger.addHandler(ch) self.logger.propagate = False
[docs] self.create_pickle = create_pickle
if db_as_list is not None and db_names is not None: raise ValueError('Only one of db_names or db_as_list should be provided.') if db_as_list is not None: self.load_dependencies = False self.db_as_list = db_as_list self.db_names = list(set([i['database'] for i in db_as_list])) elif db_names is not None: self.load_dependencies = load_dependencies self.db_names = db_names self.db_as_list = self.load() else: raise ValueError('Database name or list of dictionaries must be provided') def __repr__(self) -> str: return f"Database({self.db_names}) contains {len(self.db_as_list)} activities" @property
[docs] def db_as_dict_name(self) -> dict: return self.list_to_dict(key='name')
@property
[docs] def db_as_dict_code(self) -> dict: return self.list_to_dict(key='code')
@db_as_dict_name.setter def db_as_dict_name(self, db_dict_name: dict) -> None: self.db_names = list(set([i[3] for i in db_dict_name.keys()])) self.db_as_list = list(db_dict_name.values()) @db_as_dict_code.setter def db_as_dict_code(self, db_dict_code: dict) -> None: self.db_names = list(set([i[0] for i in db_dict_code.keys()])) self.db_as_list = list(db_dict_code.values()) def __add__(self, other): return Database(db_as_list=self.db_as_list + other.db_as_list) def __sub__(self, other): return Database(db_as_list=[i for i in self.db_as_list if i not in other.db_as_list]) def __len__(self): return len(self.db_as_list) from .CPC import ( _add_product_or_activity_CPC_category, add_CPC_categories, _add_CPC_categories_based_on_existing_activities, _save_mapping_between_products_and_CPC_categories, )
[docs] def load(self) -> list[dict]: """ Load or extract a single database :return: list of dictionaries of the LCI database """ if isinstance(self.db_names, list): db = self.load_multiple() elif isinstance(self.db_names, str) and self.load_dependencies: self.db_names = [self.db_names] db = self.load_multiple() elif isinstance(self.db_names, str) and not self.load_dependencies: if self.db_names not in bd.databases: raise ValueError(f"{self.db_names} is not a registered database") elif os.path.isfile(DIR_DATABASE_CACHE / f'{self.db_names}.pickle'): db = load_db(self.db_names) self.logger.info(f"Loaded {self.db_names} from pickle!") else: db = wurst.extract_brightway2_databases(self.db_names, add_identifiers=True) self.logger.info(f"Loaded {self.db_names} from brightway!") if self.create_pickle: cache_database(db, self.db_names) else: raise ValueError('Database name must be a string or a list of strings') return db
[docs] def load_multiple(self) -> list[dict]: """ Concatenates databases in a list of dictionaries :return: list of dictionaries of the concatenated databases """ db = Database(db_as_list=[]) loaded_databases = set() db_names_copy = copy.deepcopy(self.db_names) for name in db_names_copy: if name not in loaded_databases: db += Database( db_names=name, create_pickle=self.create_pickle, load_dependencies=False, # only load dependencies of the first level ) loaded_databases.add(name) if self.load_dependencies: dependencies = list(set([a['exchanges'][i]['database'] for a in db.db_as_list for i in range(len(a['exchanges']))])) for dep_db_name in dependencies: if (dep_db_name not in self.db_names) & ('biosphere' not in dep_db_name): if dep_db_name not in loaded_databases: db += Database( db_names=dep_db_name, create_pickle=self.create_pickle, load_dependencies=False, # only load dependencies of the first level ) loaded_databases.add(dep_db_name) self.db_names.append(dep_db_name) return db.db_as_list
[docs] def merge( self, main_ecoinvent_db_name: str, new_db_name: str = None, old_main_db_names: list[str] or None = None, write: bool = False, check_duplicates: bool = False ) -> None: """ Merge multiple LCI databases in one database. The list of databases should contain one main database (e.g., an ecoinvent or premise database) towards which all other databases will be relinked. If you suspect that there might be duplicated LCI datasets over the different databases, you can set check_duplicates to True to remove them. :param main_ecoinvent_db_name: name of the main database, e.g., ecoinvent or premise database :param new_db_name: name of the new merged database (only required if write is True) :param old_main_db_names: other main databases that are not in the list of databases, thus the list of databases will be unlinked from those :param write: if True, write the new database to Brightway :param check_duplicates: if True, check for duplicates in terms of (product, name, location) and remove them from exchanges and from the database :return: None """ merged_db_as_list = [] # initialize the list of dictionaries for the merged database main_ecoinvent = Database(db_as_list=[i for i in self.db_as_list if i['database'] == main_ecoinvent_db_name]) main_ecoinvent_db = main_ecoinvent.db_as_list for db_name in self.db_names: db = Database(db_as_list=[i for i in self.db_as_list if i['database'] == db_name]) if db_name != main_ecoinvent_db_name: for old_db_name in old_main_db_names: db.relink( name_database_unlink=old_db_name, database_relink_as_list=main_ecoinvent_db, ) else: db.db_as_list = main_ecoinvent_db merged_db_as_list += db.db_as_list self.db_as_list = merged_db_as_list # Checking for duplicates in terms of (product, name, location) if check_duplicates: db_dict_name = self.db_as_dict_name db_dict_name_count = {k[:3]: 0 for k in db_dict_name.keys()} for act in self.db_as_list: # counting the number of occurrences of each activity db_dict_name_count[(act['name'], act['reference product'], act['location'])] += 1 for k, v in db_dict_name_count.items(): if v > 1: # duplicates are removed from exchanges and from the database ref_act = None for db_name in self.db_names: key = (k[0], k[1], k[2], db_name) if key in db_dict_name: ref_act = db_dict_name[key] # one of the duplicates is kept for act in self.db_as_list: if ( (act['name'] == ref_act['name']) & (act['reference product'] == ref_act['reference product']) & (act['location'] == ref_act['location']) ): # if we find a duplicate if act['code'] != ref_act['code']: # it is removed if its code is not the same as the reference self.db_as_list.remove(act) else: # for other activities, we update the exchanges for exc in Dataset(act).get_technosphere_flows(): if ( (exc['name'] == ref_act['name']) & (exc['product'] == ref_act['reference product']) & (exc['location'] == ref_act['location']) ): # if we find a duplicate in the exchanges, its code is updated with the reference exc['code'] = ref_act['code'] exc['database'] = ref_act['database'] exc['input'] = (ref_act['database'], ref_act['code']) # Verification that the merged database has no dependencies apart from biosphere databases dependencies = list(set([a['exchanges'][i]['database'] for a in self.db_as_list for i in range(len(a['exchanges']))])) for dep_db_name in dependencies: if 'biosphere' in dep_db_name: pass elif main_ecoinvent_db_name in dep_db_name: pass elif dep_db_name not in self.db_names: raise ValueError(f"Database {dep_db_name} is not in the list of databases to merge") if write: if new_db_name is None: raise ValueError('The "new_db_name" argument must be provided if "write" is True') self.write_to_brightway(new_db_name)
[docs] def list_to_dict( self, key: str, database_type: str = 'technosphere' ) -> dict: """ Converts a list of dictionaries into a dictionary with the (database, code) tuple as key when key = 'code', the (name, product, location, database) tuple as key when key = 'name' for a technosphere database, or the (name, categories, database) tuple as key when key = 'name' for a biosphere database. :param key: can be 'code' or 'name' :param database_type: can be 'technosphere' or 'biosphere' :return: LCI database as a dictionary """ if key == 'code': db_as_dict = {(a['database'], a['code']): a for a in self.db_as_list} elif key == 'name': if database_type == 'technosphere': db_as_dict = {(a['name'], a['reference product'], a['location'], a['database']): a for a in self.db_as_list} elif database_type == 'biosphere': db_as_dict = {(a['name'], a['categories'], a['database']): a for a in self.db_as_list} else: raise ValueError('Database type must be either "technosphere" or "biosphere"') else: raise ValueError('Key must be either "code" or "name"') return db_as_dict
[docs] def wurst_to_brightway( self, database_type: str = 'technosphere', ) -> None: """ Adjust the database to the Brightway format :param database_type: type of database to be written, can be 'technosphere' or 'biosphere' :return: None """ for act in self.db_as_list: if database_type == 'technosphere': # only activities from technosphere databases have exchanges # Add the input and output keys in exchanges for exc in act['exchanges']: if 'input' not in exc.keys(): exc['input'] = (exc['database'], exc['code']) else: # guaranteeing consistency between input, and database and code exc['database'] = exc['input'][0] exc['code'] = exc['input'][1] if 'output' not in exc.keys(): exc['output'] = (act['database'], act['code']) # Restore parameters to Brightway2 format which allows for uncertainty and comments if "parameters" in act: act["parameters"] = { name: {"amount": amount} for name, amount in act["parameters"].items() } # Correct unit names if act['unit'] == 'ton-kilometer': act['unit'] = 'ton kilometer' if database_type == 'technosphere': # only activities from biosphere databases should have categories if "categories" in act: del act["categories"] elif database_type == 'biosphere': act['type'] = 'natural resource' if act['categories'][0] == 'natural resource' else 'emission' if "reference product" in act: del act["reference product"] if "location" in act: del act["location"]
[docs] def change_name( self, new_db_name: str, database_type: str = 'technosphere', ) -> None: """ Change the name of the database :param new_db_name: new name of the database :param database_type: type of database to be written, can be 'technosphere' or 'biosphere' :return: None """ old_dbs_name = list(set([act['database'] for act in self.db_as_list])) for act in self.db_as_list: act['database'] = new_db_name if database_type == 'technosphere': # only activities from technosphere databases have exchanges for exc in act['exchanges']: if exc['database'] in old_dbs_name: exc['database'] = new_db_name if 'input' in exc.keys(): if exc['input'][0] in old_dbs_name: exc['input'] = (new_db_name, exc['input'][1]) if 'output' in exc.keys(): if exc['output'][0] in old_dbs_name: exc['output'] = (new_db_name, exc['output'][1]) self.db_names = new_db_name
[docs] def write_to_brightway( self, new_db_name: str, database_type: str = 'technosphere', overwrite: bool = True, ) -> None: """ Write a LCI database to a Brightway project. This function will overwrite the database if it already exists. :param new_db_name: name of the brightway database to be written :param database_type: type of database to be written, can be 'technosphere' or 'biosphere' :param overwrite: if True, overwrite the database if it already exists :return: None """ if database_type not in ['technosphere', 'biosphere']: raise ValueError('Database type must be either "technosphere" or "biosphere"') if new_db_name in list(bd.databases): # if the database already exists if overwrite: del bd.databases[new_db_name] self.logger.info(f"Previous {new_db_name} will be overwritten!") else: self.logger.info(f"{new_db_name} already exists in Brightway. To overwrite it, set 'overwrite' to " f"True or delete it manually") return else: pass bw_database = bd.Database(new_db_name) bw_database.register() old_db_names = list(set([act['database'] for act in self.db_as_list])) if (len(old_db_names) > 1) | (old_db_names[0] != new_db_name): self.change_name(new_db_name, database_type) self.wurst_to_brightway(database_type) db = {(i['database'], i['code']): i for i in self.db_as_list} bw_database.write(db) self.logger.info(f"{new_db_name} written to Brightway!")
[docs] def delete(self) -> None: """ Delete a database from Brightway :return: None """ if len(self.db_names) > 1: raise ValueError('Only one database can be deleted at a time') else: if self.db_names[0] in list(bd.databases): del bd.databases[self.db_names[0]] else: raise ValueError(f"{self.db_names[0]} is not a registered database")
[docs] def get_code( self, product: str, activity: str, location: str, database: str ) -> str or None: """ Get the code of an activity :param product: product of the LCI dataset :param activity: name of the LCI dataset :param location: location of the LCI dataset :param database: name of the LCI database :return: activity code in the LCI database or None if it does not exist """ ds = self.db_as_dict_name[(activity, product, location, database)] if 'code' in ds.keys(): return ds['code'] else: return None
[docs] def test_mapping_file(self, mapping: pd.DataFrame) -> list[tuple[str, str, str, str]]: """ Test if the mapping file is correctly linked to the database :param mapping: mapping file between the LCI database and the ESM database :return: the list missing flows if any """ db_dict_name = self.db_as_dict_name missing_flows = [] for i in range(len(mapping)): act_name = mapping.Activity.iloc[i] act_prod = mapping.Product.iloc[i] act_loc = mapping.Location.iloc[i] act_database = mapping.Database.iloc[i] if (act_name, act_prod, act_loc, act_database) in db_dict_name: pass else: missing_flows.append((act_name, act_prod, act_loc, act_database)) if len(missing_flows) > 0: self.logger.warning(f'Some flows could not be found in the database: {missing_flows}') else: self.logger.info('Mapping successfully linked to the database') return missing_flows
[docs] def dependencies(self) -> list[str]: """ Get the dependencies of the database :return: list of dependencies """ dependencies = list(set([a['exchanges'][i]['database'] for a in self.db_as_list for i in range(len(a['exchanges']))])) return dependencies
[docs] def create_complementary_database( self, df_mapping: pd.DataFrame, main_db_name: str = None, complement_db_name: str = None, premise_changes: pd.DataFrame = None, write_database: bool = True, ) -> pd.DataFrame: """ Create a new database containing all activities that tare not provided in your main database :param df_mapping: dataframe with the mapping of the technologies and resources :param main_db_name: name of the main LCI database :param complement_db_name: name of the complementary LCI database :param premise_changes: file of the changes in names, products, locations, in premise regarding the mapping :param write_database: if True, write the complementary database to Brightway :return: dataframe with the mapping of the technologies and resources linked to the premise database """ if (isinstance(self.db_names, str)) | (isinstance(self.db_names, list) & (len(self.db_names) == 1)): premise_db = self premise_db_list = premise_db.db_as_list premise_db_dict_name = premise_db.db_as_dict_name premise_db_dict_code = premise_db.db_as_dict_code name_premise_db = premise_db_list[0]['database'] base_db = Database(db_names=list(df_mapping.Database.unique())) base_db_dict_name = base_db.db_as_dict_name elif isinstance(self.db_names, list) & (len(self.db_names) > 1): premise_db = Database(db_as_list=[act for act in self.db_as_list if act['database'] == main_db_name]) premise_db_list = premise_db.db_as_list premise_db_dict_name = premise_db.db_as_dict_name premise_db_dict_code = premise_db.db_as_dict_code name_premise_db = main_db_name unlinked = self.test_mapping_file(df_mapping) if len(unlinked) > 0: base_db = Database(db_names=list(df_mapping.Database.unique())) base_db_dict_name = base_db.db_as_dict_name else: base_db = self base_db_dict_name = base_db.db_as_dict_name else: raise ValueError('Database name or list of names must be provided') tech_premise = pd.DataFrame(columns=['Name', 'Type', 'Product', 'Activity', 'Location', 'Database']) complement_premise = [] for i in range(len(df_mapping)): esm_tech_name = df_mapping.Name.iloc[i] act_type = df_mapping.Type.iloc[i] product = df_mapping.Product.iloc[i] activity = df_mapping.Activity.iloc[i] region = df_mapping.Location.iloc[i] database = df_mapping.Database.iloc[i] new_activity, new_product, new_location = premise_changing_names( activity_name=activity, activity_prod=product, activity_loc=region, name_premise_db=name_premise_db, premise_db_dict_name=premise_db_dict_name, premise_changes=premise_changes, ) try: premise_db_dict_name[(new_activity, new_product, new_location, name_premise_db)] except KeyError: new_product = new_product[0].lower() + new_product[1:] new_activity = new_activity[0].lower() + new_activity[1:] try: premise_db_dict_name[(new_activity, new_product, new_location, name_premise_db)] except KeyError: self.logger.info(f"No inventory in the premise database for {esm_tech_name, act_type}") complement_premise.append((esm_tech_name, act_type)) tech_premise.loc[i] = [esm_tech_name, act_type, product, activity, new_location, database] else: tech_premise.loc[i] = [esm_tech_name, act_type, new_product, new_activity, new_location, name_premise_db] else: tech_premise.loc[i] = [esm_tech_name, act_type, new_product, new_activity, new_location, name_premise_db] for i in range(len(complement_premise)): esm_tech_name = complement_premise[i][0] act_type = complement_premise[i][1] activity = tech_premise[(tech_premise.Name == esm_tech_name) & (tech_premise.Type == act_type)].Activity.iloc[0] product = tech_premise[(tech_premise.Name == esm_tech_name) & (tech_premise.Type == act_type)].Product.iloc[0] region = tech_premise[(tech_premise.Name == esm_tech_name) & (tech_premise.Type == act_type)].Location.iloc[0] database = tech_premise[(tech_premise.Name == esm_tech_name) & (tech_premise.Type == act_type)].Database.iloc[0] act = base_db_dict_name[(activity, product, region, database)] try: premise_db_dict_name[(activity, product, region, complement_db_name)] except KeyError: new_act = copy.deepcopy(act) ds = Dataset(new_act) new_code = random_code() new_act['database'] = complement_db_name new_act['code'] = new_code prod_flow = ds.get_production_flow() prod_flow['code'] = new_code prod_flow['database'] = complement_db_name premise_db_list.append(new_act) premise_db_dict_name[ (new_act['name'], new_act['reference product'], new_act['location'], new_act['database'])] = new_act premise_db_dict_code[(new_act['database'], new_act['code'])] = new_act unlinked_activities = [i for i in premise_db_list if i['database'] == complement_db_name] while len(unlinked_activities) > 0: unlinked_activities, premise_db_list = self.relink_complement_db_to_premise_db( name_complement_db=complement_db_name, base_db_list=base_db.db_as_list, premise_db_list=premise_db_list, name_premise_db=name_premise_db) complement_db = [i for i in premise_db_list if i['database'] == complement_db_name] if (len(complement_db) > 0) & (write_database == True): Database(db_as_list=complement_db).write_to_brightway(complement_db_name) else: self.logger.info(f"The complementary database did not have to be created") tech_premise.reset_index(inplace=True, drop=True) tech_premise_adjusted = pd.DataFrame(columns=tech_premise.columns) for i in range(len(tech_premise)): esm_tech_name = tech_premise.Name.iloc[i] act_type = tech_premise.Type.iloc[i] product = tech_premise.Product.iloc[i] activity = tech_premise.Activity.iloc[i] region = tech_premise.Location.iloc[i] database = tech_premise.Database.iloc[i] if database == name_premise_db: tech_premise_adjusted.loc[i] = [esm_tech_name, act_type, product, activity, region, database] else: tech_premise_adjusted.loc[i] = [esm_tech_name, act_type, product, activity, region, complement_db_name] return tech_premise_adjusted
@staticmethod