from bw2analyzer import ContributionAnalysis
from typing import Optional
import numpy as np
import os
import pandas as pd
import re
[docs]
class ABContributionAnalysis(ContributionAnalysis):
"""Activity Browser version of bw2analyzer.ContributionAnalysis"""
[docs]
def sort_array(self, data: np.array, limit: float = 25, limit_type: str = "number", total: Optional[float] = None) -> np.array:
"""Activity Browser version of bw2analyzer.ContributionAnalysis.sort_array.
Should be removed once https://github.com/brightway-lca/brightway2-analyzer/pull/32 is merged.
See PR above on why we overwrite this function.
"""
if not total:
total = np.abs(data).sum()
if total == 0 and limit_type == "cum_percent":
raise ValueError(
"Cumulative percentage cannot be calculated to a total of 0, use a different limit type or total")
if limit_type not in ("number", "percent", "cum_percent"):
raise ValueError(f"limit_type must be either 'number', 'percent' or 'cum_percent' not '{limit_type}'.")
if limit_type in ("percent", "cum_percent"):
if not 0 < limit <= 1:
raise ValueError("Percentage limits > 0 and <= 1.")
if limit_type == "number":
if not int(limit) == limit:
raise ValueError("Number limit must a whole number.")
if not 0 < limit:
raise ValueError("Number limit must be < 0.")
results = np.hstack(
(data.reshape((-1, 1)), np.arange(data.shape[0]).reshape((-1, 1)))
)
if limit_type == "number":
# sort and cut off at limit
return results[np.argsort(np.abs(data))[::-1]][:limit, :]
elif limit_type == "percent":
# identify good values, drop rest and sort
limit = (np.abs(data) >= (abs(total) * limit))
results = results[limit, :]
return results[np.argsort(np.abs(results[:, 0]))[::-1]]
elif limit_type == "cum_percent":
# if we would apply this on the 'correct' order, this would stop just before the limit,
# we want to be on or the first step over the limit.
results = results[np.argsort(np.abs(data))] # sort low to high impact
cumsum = np.cumsum(np.abs(results[:, 0])) / abs(total)
limit = (cumsum >= (1 - limit)) # find items under limit
return results[limit, :][::-1] # drop items under limit and set correct order
[docs]
def process_contribution_data(
contrib_df: pd.DataFrame,
impact_scores_df: pd.DataFrame,
unit_conversion_df: pd.DataFrame,
contribution_type: str = 'processes',
saving_path: str = None,
export_excel: bool = False,
act_types: list[str] = None,
) -> tuple[pd.DataFrame, dict]:
"""
Process contribution analysis data for environmental impacts.
:param contrib_df: contribution analysis dataframe (processes or emissions)
:param impact_scores_df: impact scores dataframe with total impacts
:param unit_conversion_df: unit conversion dataframe
:param contribution_type: Type of contribution analysis: 'processes' or 'emissions'
:param saving_path: Output directory for Excel file (required if export_excel=True)
:param export_excel: Whether to export comprehensive Excel file
:param act_types: List of activity types for Excel export
:return: Processed DataFrame with impact_share column, Unit type groups dictionary
"""
# Define column mappings based on contribution type
detail_col = 'process_name' if contribution_type == 'processes' else 'ef_name'
if contribution_type not in ['processes', 'emissions']:
raise ValueError("contribution_type must be 'processes' or 'emissions'")
# Filter and prepare contribution data
contrib_df = contrib_df[['act_name', 'impact_category', 'score', 'act_type', detail_col]]
# Rename columns for merge
impact_scores_df = impact_scores_df.rename(columns={
'Name': 'act_name',
'Impact_category': 'impact_category',
'Type': 'act_type',
'Value': 'total_impact'
})
# Merge dataframes
merged_df = pd.merge(
impact_scores_df[['act_name', 'impact_category', 'act_type', 'total_impact']],
contrib_df,
on=['act_name', 'impact_category', 'act_type'],
how='inner'
)
# Clean detail column names (process_name or ef_name)
def split_name(s):
parts = re.split(r',(?!\d)', s)
return parts[0] if parts else s
merged_df[detail_col] = merged_df[detail_col].apply(split_name)
# Group and aggregate
grouped_df = merged_df.groupby(
['act_name', 'act_type', 'impact_category', detail_col]
).agg({'score': 'sum', 'total_impact': 'first'}).reset_index()
# Calculate impact share
grouped_df['total_impact'] = grouped_df.groupby(
['act_name', 'act_type', 'impact_category']
)['score'].transform('sum')
grouped_df['impact_share'] = grouped_df['score'] / grouped_df['total_impact']
# Add 'Others' category
Others_rows = []
for keys, group in grouped_df.groupby(['act_name', 'impact_category', 'act_type']):
total_share = group['impact_share'].sum()
Others_share = 1 - total_share
if Others_share > 0.01:
Others_rows.append({
'act_name': keys[0],
'impact_category': keys[1],
'act_type': keys[2],
detail_col: 'Others',
'score': None,
'total_impact': group['total_impact'].iloc[0],
'impact_share': Others_share
})
if Others_rows:
grouped_df = pd.concat([grouped_df, pd.DataFrame(Others_rows)], ignore_index=True)
# Load unit conversion mapping
unit_conversion_df = unit_conversion_df[
(unit_conversion_df['ESM'] != 'unit') &
(unit_conversion_df['Type'] != 'Other') &
(unit_conversion_df['Type'] != 'Flow')
]
unit_type_groups_dict = {}
for _, row in unit_conversion_df.groupby(['ESM', 'Type'])['Name'].apply(list).reset_index().iterrows():
key = (row['ESM'], row['Type'])
unit_type_groups_dict[key] = row['Name']
# Export to Excel if requested
if export_excel:
if saving_path is None:
raise ValueError("saving_path must be provided when export_excel=True")
if act_types is None:
act_types = ['Construction', 'Decommission', 'Operation', 'Resource']
_export_comprehensive_excel(
grouped_df,
unit_type_groups_dict,
saving_path,
act_types,
contribution_type,
detail_col
)
return grouped_df, unit_type_groups_dict
[docs]
def _export_comprehensive_excel(
df: pd.DataFrame,
unit_type_groups_dict: dict,
saving_path: str,
act_types: list[str],
contribution_type: str,
detail_col: str,
) -> None:
"""
Internal function to export comprehensive Excel file.
:param df: DataFrame with contribution analysis results
:param unit_type_groups_dict: Dictionary mapping (ESM, Type) to list of technology names
:param saving_path: Output directory for Excel file
:param act_types: List of activity types to include in Excel export
:param contribution_type: Type of contribution analysis: 'processes' or 'emissions'
:param detail_col: Column name for process or emission details ('process_name' or 'ef_name')
:return: None
"""
os.makedirs(saving_path, exist_ok=True)
# Set filename based on contribution type
if contribution_type == 'processes':
filename = 'contribution_analysis_processes_results.xlsx'
else:
filename = 'contribution_analysis_emissions_results.xlsx'
output_path = os.path.join(saving_path, filename)
impact_categories = df['impact_category'].unique().tolist()
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for impact_category in impact_categories:
df_cat = df[df['impact_category'] == impact_category].copy()
# Calculate 'Others' share for small contributions
Others_share = df_cat[df_cat['impact_share'] <= 0.05].groupby(
['act_name', 'act_type', 'impact_category']
)['impact_share'].sum().reset_index()
Others_share[detail_col] = 'Others'
df_cat = df_cat[df_cat['impact_share'] > 0.05]
if not Others_share.empty:
df_cat = pd.concat([df_cat, Others_share], ignore_index=True)
if df_cat.empty:
continue
# Get unique ESM keys
esm_keys = sorted(set(esm for esm, typ in unit_type_groups_dict.keys() if typ in act_types))
sheet_data = []
for at in act_types:
for esm in esm_keys:
tech_names = unit_type_groups_dict.get((esm, at), [])
sub = df_cat[(df_cat['act_type'] == at) & (df_cat['act_name'].isin(tech_names))]
if sub.empty:
continue
# Add metadata columns
sub = sub.copy()
sub['esm_group'] = esm
# Convert impact_share to percentage format
sub['impact_share_pct'] = sub['impact_share'] * 100
# Reorder columns for clarity
sub = sub[['act_type', 'esm_group', 'act_name',
detail_col, 'impact_share_pct']]
sheet_data.append(sub)
# Add blank row separator between groups
blank_row = pd.DataFrame([{
'act_type': '',
'esm_group': '',
'act_name': '',
detail_col: '',
'impact_share_pct': None
}])
sheet_data.append(blank_row)
if sheet_data:
# Combine all data for this impact category
sheet_df = pd.concat(sheet_data, ignore_index=True)
# Rename column for clarity
sheet_df = sheet_df.rename(columns={'impact_share_pct': 'Impact Share (%)'})
# Create safe sheet name (Excel has 31 char limit)
safe_sheet_name = str(impact_category).replace('/', '_').replace(':', '_').replace(' ', '_').replace('(', '').replace(')', '').replace(',', '_').replace("'", "")[:31]
# Write to Excel
sheet_df.to_excel(writer, sheet_name=safe_sheet_name, index=False)
# Get worksheet to format
worksheet = writer.sheets[safe_sheet_name]
# Format the Impact Share column as percentage with 1 decimal
for row in range(2, len(sheet_df) + 2):
cell = worksheet.cell(row=row, column=5)
if cell.value is not None and isinstance(cell.value, (int, float)):
cell.number_format = '0.0"%"'
print(f"Comprehensive Excel saved to: {output_path}")
print(f"Created {len(impact_categories)} sheets")