"""
Helper functions for the various taxbrain modules
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.ticker as ticker
from collections import defaultdict
from typing import Union, Tuple
import taxcalc as tc
from .typing import ParamToolsAdjustment, TaxcalcReform, PlotColors
[docs]
def weighted_sum(df, var, wt="s006"):
"""
Return the weighted sum of specified variable
Parameters
----------
df: Pandas DataFrame
data overwhich to compute weighted sum
var: str
variable name from df for which to computer weighted sum
wt: str
name of weight variable in df
Returns
-------
float
weighted sum
"""
return (df[var] * df[wt]).sum()
[docs]
def distribution_plot(
tb,
year: int,
figsize: Tuple[Union[int, float], Union[int, float]] = (6, 4),
title: str = "default",
include_text: bool = False,
):
"""
Create a horizontal bar chart to display the distributional change in
after tax income
Parameters
----------
tb: TaxBrain object
TaxBrain object for analysis
year: int
year to report distribution for
figsize: tuple
representing the size of the figure (width, height) in inches
title: str
title for plot
include_text: bool
whether to include text for labels
Returns
-------
fig: Matplotlib.pyplot figure object
distribution plot
"""
def find_percs(data, group):
"""
Find the percentage of people in the data set that saw
their income change by the given percentages
"""
pop = data["s006"].sum()
large_pos_chng = data["s006"][data["pct_change"] > 5].sum() / pop
small_pos_chng = (
data["s006"][
(data["pct_change"] <= 5) & (data["pct_change"] > 1)
].sum()
/ pop
)
small_chng = (
data["s006"][
(data["pct_change"] <= 1) & (data["pct_change"] >= -1)
].sum()
/ pop
)
small_neg_change = (
data["s006"][
(data["pct_change"] < -1) & (data["pct_change"] > -5)
].sum()
/ pop
)
large_neg_change = data["s006"][data["pct_change"] < -5].sum() / pop
return (
large_pos_chng,
small_pos_chng,
small_chng,
small_neg_change,
large_neg_change,
)
# extract needed data from the TaxBrain object
ati_data = pd.DataFrame(
{
"base": tb.base_data[year]["aftertax_income"],
"reform": tb.reform_data[year]["aftertax_income"],
"s006": tb.base_data[year]["s006"],
}
)
ati_data["diff"] = ati_data["reform"] - ati_data["base"]
ati_data["pct_change"] = (ati_data["diff"] / ati_data["base"]) * 100
ati_data = ati_data.fillna(0.0) # fill in NaNs for graphing
# group tupules: (low income, high income, income group name)
groups = [
(-9e99, 9e99, "All"),
(1e6, 9e99, "$1M or More"),
(500000, 1e6, "$500K-1M"),
(200000, 500000, "$200K-500K"),
(100000, 200000, "$100K-200K"),
(75000, 100000, "$75K-100K"),
(50000, 75000, "$50K-75K"),
(40000, 50000, "$40K-50K"),
(30000, 40000, "$30K-40K"),
(20000, 30000, "$20K-30K"),
(10000, 20000, "$10K-20K"),
(-9e99, 10000, "Less than $10K"),
]
plot_data = defaultdict(list)
# traverse list in reverse to get the axis of the plot in correct order
for low, high, grp in groups:
# find income changes by group
sub_data = ati_data[
(ati_data["base"] <= high) & (ati_data["base"] > low)
]
results = find_percs(sub_data, grp)
plot_data[grp] = results
legend_labels = [
"Increase of > 5%",
"Increase 1-5%",
"Change < 1%",
"Decrease of 1-5%",
"Decrease > 5%",
]
labels = list(plot_data.keys())
data = np.array(list(plot_data.values()))
data_cumsum = data.cumsum(axis=1)
category_colors = plt.get_cmap("GnBu")(
np.linspace(0.15, 0.85, data.shape[1])
)
fig, ax = plt.subplots(figsize=figsize)
ax.invert_yaxis()
ax.set_xlim(0, np.sum(data, axis=1).max())
for i, (colname, color) in enumerate(zip(legend_labels, category_colors)):
widths = data[:, i]
starts = data_cumsum[:, i] - widths
ax.barh(
labels, widths, left=starts, height=0.9, label=colname, color=color
)
if include_text:
# add text label
xcenters = starts + widths / 2
r, g, b, _ = color
text_color = "white" if r * g * b < 0.5 else "darkgrey"
for y, (x, c) in enumerate(zip(xcenters, widths)):
ax.text(
x,
y,
f"{c * 100:.1f}%",
ha="center",
va="center",
color=text_color,
)
ax.legend(bbox_to_anchor=(1, 1), loc="upper left", fontsize="small")
ax.set_xlabel("Portion of Bin", fontweight="bold")
ax.set_ylabel("Expanded Income Bin", fontweight="bold")
ax.get_xaxis().set_major_formatter(
mpl.ticker.FuncFormatter(lambda x, p: format(f"{int(x * 100)}%"))
)
if title == "default":
title = f"Percentage Change In After Tax Income - {year}"
ax.set_title(title)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.tick_params(axis="y", which="both", length=0, pad=15)
return fig
[docs]
def differences_plot(
tb,
tax_type: str,
figsize: Tuple[Union[int, float], Union[int, float]] = (6, 4),
title: str = "default",
):
"""
Create a bar chart that shows the change in total liability for a given
tax
Parameters
----------
tb: TaxBrain object
TaxBrain object for analysis
tax_type: str
tax for which to show the change in liability
options: 'income', 'payroll', 'combined'
figsize: tuple
representing the size of the figure (width, height) in inches
title: str
title for plot
Returns
-------
fig: Matplotlib.pyplot figure object
differences plot
"""
def axis_formatter(x, p):
if x >= 0:
return f"${x * 1e-9:,.2f}b"
else:
return f"-${x * 1e-9:,.2f}b"
acceptable_taxes = ["income", "payroll", "combined"]
msg = f"tax_type must be one of the following: {acceptable_taxes}"
assert tax_type in acceptable_taxes, msg
# find change in each tax variable
tax_vars = ["iitax", "payrolltax", "combined"]
agg_base = tb.multi_var_table(tax_vars, "base")
agg_reform = tb.multi_var_table(tax_vars, "reform")
agg_diff = agg_reform - agg_base
# transpose agg_diff to make plotting easier
plot_data = agg_diff.transpose()
tax_var = tax_vars[acceptable_taxes.index(tax_type)]
plot_data["color"] = np.where(plot_data[tax_var] < 0, "red", "blue")
fig, ax = plt.subplots(figsize=figsize)
ax.grid(True, axis="y", alpha=0.55)
ax.set_axisbelow(True)
ax.bar(
plot_data.index,
plot_data["combined"],
alpha=0.55,
color=plot_data["color"],
)
if title == "default":
title = f"Change in Aggregate {tax_type.title()} Tax Liability"
ax.set_title(title)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.get_yaxis().set_major_formatter(
mpl.ticker.FuncFormatter(axis_formatter)
)
ax.xaxis.set_ticks(list(plot_data.index))
ax.xaxis.set_major_formatter(mpl.ticker.ScalarFormatter(useOffset=False))
return fig
[docs]
def update_policy(
policy_obj: tc.Policy,
reform: Union[TaxcalcReform, ParamToolsAdjustment],
**kwargs,
):
"""
Convenience method that updates the Policy object with the reform
dict using the appropriate method, given the reform format.
Parameters
----------
policy_obj: Tax-Calculator Policy class object
Policy object for tax parameterization used for analysis
reform: str or dict
parameters for tax policy
Returns
-------
None
modifies the Policy object
"""
if is_paramtools_format(reform):
policy_obj.adjust(reform, **kwargs)
else:
policy_obj.implement_reform(reform, **kwargs)
[docs]
def lorenz_data(tb, year: int, var: str = "aftertax_income"):
"""
Pull data used for the lorenz curve plot
Parameters
----------
tb: TaxBrain class object
TaxBrain object for analysis
year: int
year of data to use
var: str
name of the variable to use
Returns
-------
final_data: Pandas DataFrame
DataFrame with Lorenz curve for baseline and reform
"""
data = pd.DataFrame(
{
"base": tb.base_data[year][var],
"reform": tb.reform_data[year][var],
"wt": tb.base_data[year]["s006"],
}
)
data["wt_base"] = data["base"] * data["wt"]
data["wt_reform"] = data["reform"] * data["wt"]
data.sort_values("base", inplace=True)
data["cwt"] = data["wt"].cumsum()
data["percentile"] = data["cwt"] / data["wt"].sum()
# each bin has 1% of the population
_bins = np.arange(0, 1.01, step=0.01)
data["bin"] = pd.cut(data["percentile"], bins=_bins)
gdf = data.groupby("bin", observed=False)
base = gdf["wt_base"].sum()
base = np.where(base < 0, 0, base)
reform = gdf["wt_reform"].sum()
reform = np.where(reform < 0, 0, reform)
final_data = pd.DataFrame(
{
"Base": base.cumsum() / data["wt_base"].sum(),
"Reform": reform.cumsum() / data["wt_reform"].sum(),
"Population": gdf["wt"].sum().cumsum() / data["wt"].sum(),
}
)
return final_data
[docs]
def lorenz_curve(
tb,
year: int,
var: str = "aftertax_income",
figsize: Tuple[Union[int, float], Union[int, float]] = (6, 4),
xlabel: str = "Cummulative Percentage of Tax Units",
ylabel: str = "Cummulative Percentage of Income",
base_color: PlotColors = "blue",
base_linestyle: str = "-",
reform_color: PlotColors = "red",
reform_linestyle: str = "--",
dpi: Union[int, float] = 100,
):
"""
Generate a Lorenz Curve
Parameters
----------
tb: TaxBrain class object
TaxBrain object for analysis
year: int
year of data you want to use for the lorenz curve
var: str
name of the variable to use
figsize: tuple
representing the size of the figure (width, height) in inches
xlabel: str
x axis label
ylabel: str
y axis label
base_color: str
color used for the base line
base_linestyle: str
linestyle for the base line
reform_color: str
color used for the reform line
reform_linestyle: str
linestyle for the reform line
dpi: int
dots per inch in the figure image
Returns
-------
None
"""
plot_data = lorenz_data(tb, year, var)
fig, ax = plt.subplots(figsize=figsize)
ax.plot([0, 1], [0, 1], c="black", alpha=0.5) # 45 degree line
ax.plot(
plot_data["Population"],
plot_data["Base"],
c=base_color,
linestyle=base_linestyle,
label="Base",
)
ax.plot(
plot_data["Population"],
plot_data["Reform"],
c=reform_color,
linestyle=reform_linestyle,
label="Reform",
)
ax.legend(loc="upper left")
ax.set_xlabel(xlabel, fontweight="bold")
ax.set_ylabel(ylabel, fontweight="bold")
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
return fig
[docs]
def volcano_plot(
tb,
year: int,
y_var: str = "expanded_income",
x_var: str = "combined",
min_y: Union[int, float] = 0.01,
max_y: Union[int, float] = 9e99,
log_scale: bool = True,
increase_color: PlotColors = "#F15FE4",
decrease_color: PlotColors = "#41D6C2",
dotsize: Union[int, float] = 0.75,
alpha: float = 0.5,
figsize: Tuple[Union[int, float], Union[int, float]] = (6, 4),
dpi: Union[int, float] = 100,
xlabel: str = "Change in Tax Liability",
ylabel: str = "Expanded Income",
):
"""
Create a volcano plot to show change in tax tax liability
Parameters
----------
tb: TaxBrain class object
TaxBrain object for analysis
year: int
year for the plot
min_y: float
minimum amount for the y variable to be included in the plot
max_y: float
maximum amount for the y variable to be included in the plot
y_var: str
variable on the y axis
x_var: str
variable on the x axis
log_scale: bool
whether the y-axis should use a log scale. If this is true,
min_inc must be >= 0
increase_color: str
color to use for dots when x increases
decrease_color: str
color to use for dots when x decrease
dotsize: int
size of the dots in the scatter plot
alpha: float
attribute for transparency of the dots
figsize: tuple
the figure size of the plot (width, height) in inches
dpi: int
dots per inch in the figure
xlabel: str
label on the x axis
ylabel: str
label on the y axis
Returns
-------
fig: Matplotlib.pyplot figure object
volcano plot figure
"""
def log_axis(x, pos):
"""
Converts y-axis log values
"""
return f"${np.exp(x):,.0f}"
def axis_formatter(x, pos):
if x >= 0:
return f"${x:,.0f}"
else:
return f"-${abs(x):,.0f}"
if log_scale and min_y < 0:
msg = "`min_y` must be >= 0 when `log_scale` is true"
raise ValueError(msg)
_y = tb.base_data[year][y_var]
_x_change = tb.reform_data[year][x_var] - tb.base_data[year][x_var]
mask = np.logical_and(_y >= min_y, _y <= max_y)
y = _y[mask]
x_change = _x_change[mask]
colors = [increase_color if x >= 0 else decrease_color for x in x_change]
xformatter = ticker.FuncFormatter(axis_formatter)
yformatter = ticker.FuncFormatter(axis_formatter)
if log_scale:
yformatter = ticker.FuncFormatter(log_axis)
y = np.log(y)
fig, ax = plt.subplots(figsize=figsize)
ax.scatter(x_change, y, c=colors, s=dotsize, alpha=alpha)
ax.axvline(0, color="black", alpha=0.5)
ax.grid(True, linestyle="--")
ax.xaxis.set_major_formatter(xformatter)
ax.xaxis.set_tick_params(rotation=25)
ax.yaxis.set_major_formatter(yformatter)
ax.set_xlabel(xlabel, fontweight="bold")
ax.set_ylabel(ylabel, fontweight="bold")
return fig
def revenue_plot(
tb,
tax_vars: list = ["iitax", "payrolltax", "combined"],
figsize: Tuple[Union[int, float], Union[int, float]] = (6, 4),
):
"""Plot the changes in tax revenue from a given reform
Parameters
----------
tb : TaxBrain class object
TaxBrain object for analysis
tax_vars: list
List of tax varaibles to include on the graph
"""
def axis_formatter(x, p):
if x >= 0:
return f"${x * 1e-9:,.2f}"
else:
return f"-${x * 1e-9:,.2f}"
assert tax_vars, "`tax_vars` must contain at least one tax variable"
for var in tax_vars:
if var not in ["iitax", "payrolltax", "combined"]:
msg = (
f"`{var}` is invalid. Valid tax variables are "
"`iitax`, `payrolltax`, `combined`"
)
raise ValueError(msg)
label_map = {
"iitax": "Income",
"payrolltax": "Payroll",
"combined": "Combined",
}
color_map = {
"Income: Base": "#12719e",
"Income: Reform": "#73bfe2",
"Payroll: Base": "#408941",
"Payroll: Reform": "#98cf90",
"Combined: Base": "#a4201d",
"Combined: Reform": "#e9807d",
}
base_data = tb.multi_var_table(tax_vars, "base", include_total=False)
reform_data = tb.multi_var_table(tax_vars, "reform", include_total=False)
fig, ax = plt.subplots(figsize=figsize)
years = base_data.columns
for tax in tax_vars:
base_label = f"{label_map[tax]}: Base"
reform_label = f"{label_map[tax]}: Reform"
ax.plot(
years,
base_data.loc[tax],
label=base_label,
color=color_map[base_label],
)
ax.plot(
years,
reform_data.loc[tax],
label=reform_label,
color=color_map[reform_label],
)
ax.legend(loc="upper right", bbox_to_anchor=(1.40, 1), title="Tax Type")
ax.set_ylabel("Tax Liability (Billions)")
ax.set_title("Tax Liability by Year")
# remove plot borders
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
# convert y axis to billions
ax.get_yaxis().set_major_formatter(
mpl.ticker.FuncFormatter(axis_formatter)
)
return fig