import random
import string
import pandas as pd
[docs]
def ecoinvent_unit_convention(unit: str) -> str:
"""
Reformat unit to the ecoinvent convention
:param unit: unit to reformat
:return: ecoinvent unit
"""
unit_dict = {
'kg': 'kilogram',
'kg*day': 'kilogram day',
'kg/h': 'kilogram per hour',
'm2': 'square meter',
'm3': 'cubic meter',
'MJ': 'megajoule',
'kWh': 'kilowatt hour',
'kW': 'kilowatt',
'h': 'hour',
'km': 'kilometer',
'km*year': 'kilometer-year',
'pkm': 'person kilometer',
'person*km': 'person kilometer',
'pkm/h': 'person kilometer per hour',
'tkm': 'ton kilometer',
'metric ton*km': 'ton kilometer',
'tkm/h': 'ton kilometer per hour',
'u': 'unit',
}
if unit in unit_dict:
return unit_dict[unit]
elif unit in [unit_dict[u] for u in unit_dict]:
return unit
else:
raise ValueError(f"Unmapped unit {unit}")
[docs]
def premise_changing_names(
activity_name: str,
activity_prod: str,
activity_loc: str,
name_premise_db,
premise_db_dict_name: dict,
premise_changes: pd.DataFrame = None
) -> tuple[str, str, str]:
"""
Returns the updated name, product and location in case some changes have occurred in premise
:param activity_name: name of the LCI dataset
:param activity_prod: product of the LCI dataset
:param activity_loc: location of the LCI dataset
:param name_premise_db: name of the premise database
:param premise_db_dict_name: dictionary of the database with (name, product, location, database) as key
:param premise_changes: file of the premise name changes impacting the mapping
:return: the updated name, product and location of the LCI dataset
"""
if (activity_name, activity_prod, activity_loc, name_premise_db) in premise_db_dict_name:
return activity_name, activity_prod, activity_loc
elif (activity_name, activity_prod, "RoW", name_premise_db) in premise_db_dict_name:
return activity_name, activity_prod, "RoW"
elif premise_changes is None:
return activity_name, activity_prod, activity_loc
else:
try:
activity_name_new, activity_prod_new, activity_loc_new = premise_changes[
(premise_changes['Activity - old'] == activity_name)
& (premise_changes['Product - old'] == activity_prod)
& (premise_changes['Location - old'] == activity_loc)
][['Activity - new', 'Product - new', 'Location - new']].values[0]
except IndexError: # the LCI dataset is not in the premise database
return activity_name, activity_prod, activity_loc
else:
return activity_name_new, activity_prod_new, activity_loc_new
[docs]
def change_year_in_name(row: pd.Series, year_from: int, year_to: int) -> pd.Series:
"""
Change the year in the name of the activity and database
:param row: row of the mapping file
:param year_from: year of the original mapping file
:param year_to: year of the new mapping file
:return: updated mapping row
"""
row['Activity'] = row['Activity'].replace(str(year_from), str(year_to))
row['Database'] = row['Database'].replace(str(year_from), str(year_to))
return row
[docs]
def change_mapping_year(mapping: pd.DataFrame, year_from: int, year_to: int) -> pd.DataFrame:
"""
Change the year in the name of the activities and databases in the mapping file
:param mapping: mapping file between the LCI database and the ESM database
:param year_from: year of the original mapping file
:param year_to: year of the new mapping file
:return: updated mapping file
"""
if year_from == year_to:
print(f'The mapping file is already for the year {year_to}')
return mapping
else:
mapping = mapping.apply(lambda row: change_year_in_name(row, year_from, year_to), axis=1)
return mapping
[docs]
def random_code() -> str:
"""
Create a random code
:return: code
"""
length = 32
code_rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
return code_rand
[docs]
def expand_impact_category_levels(
df: pd.DataFrame,
impact_category_col: str = 'Impact_category',
) -> pd.DataFrame:
"""
Expand the impact category levels into separate columns
:param df: dataframe with impact category column
:param impact_category_col: name of the impact category column
:return: the dataframe with expanded impact category levels
"""
max_len = df[impact_category_col].dropna().apply(
lambda x: len(x) if isinstance(x, (tuple, list)) else 0
).max()
expanded = pd.DataFrame(
df[impact_category_col].apply(
lambda x: list(x) + [None] * (max_len - len(x)) if isinstance(x, (tuple, list)) else [None] * max_len
).tolist(),
index=df.index
)
expanded.columns = [f'{impact_category_col} (level {i})' for i in range(max_len)]
return pd.concat([df, expanded], axis=1)
[docs]
def _short_name_ds_type(ds_type: str) -> str:
"""
Returns the short name of the LCI dataset type
:param ds_type: type of LCI dataset
:return: short name of the LCI dataset type
"""
if ds_type == 'Construction':
return 'constr'
elif ds_type == 'Decommission':
return 'decom'
elif ds_type == 'Operation':
return 'op'
elif ds_type == 'Resource':
return 'res'
else:
raise ValueError(f"Unknown technology type: {ds_type}")