import taxcalc as tc
import pandas as pd
import numpy as np
import behresp
from taxcalc.utils import (
from dask import compute, delayed
import dask.multiprocessing
from collections import defaultdict
from taxbrain.utils import weighted_sum, update_policy
from taxbrain.corporate_incidence import distribute as dist_corp
from typing import Union
from paramtools import ValidationError

[docs] class TaxBrain: FIRST_BUDGET_YEAR = tc.Policy.JSON_START_YEAR LAST_BUDGET_YEAR = tc.Policy.LAST_BUDGET_YEAR # Default list of variables saved for each year DEFAULT_VARIABLES = list(set(DIST_VARIABLES).union(set(DIFF_VARIABLES))) # add dictionary to hold version of the various models VERSIONS = { "Tax-Calculator": tc.__version__, "Behavioral-Responses": behresp.__version__, } def __init__( self, start_year: int, end_year: int = LAST_BUDGET_YEAR, microdata: Union[str, dict] = "CPS", reform: Union[str, dict] = None, behavior: dict = None, assump=None, base_policy: Union[str, dict] = None, corp_revenue: Union[dict, list, np.array] = None, corp_incidence_assumptions: dict = None, verbose=False, stacked=False, ): """ Constructor for the TaxBrain class Parameters ---------- start_year: int First year in the analysis. Must be no earlier than the first year allowed in Tax-Calculator. end_year: int Last year in the analysis. Must be no later than the last year allowed in Tax-Calculator. microdata: str or dict A string in ["CPS", "PUF", "TMD"] or path to a micro-data file or a Pandas DataFrame, containing micro-data, or a dictionary containing a path to microdata, associated weights, and grow factors. If a dict, must have keys: "data", "start_year", "gfactors", "weights" reform: str or dict Individual income tax policy reform. Can be either a string pointing to a JSON reform file, or the contents of a JSON file, or a properly formatted JSON file. behavior: dict Individual behavior assumptions use by the Behavior-Response package. assump: str A string pointing to a JSON file containing user specified economic assumptions. base_policy: str or dict Individual income tax policy to use as the baseline for the analysis. This policy will be implemented in the base calculator instance as well as the reform Calculator before the user provided reform is implemented. Can either be a string pointing to a JSON reform file, the contents of a JSON file, or a properly formatted dictionary. corp_revenue: dict, list, or numpy array A set of corporate revenue estimates for a given set of years. The estimates much line up with start_year and end_year. corp_incidence_assumptions: dict A dictionary summarizing the assumptions about the distribution of the corporate income tax. See taxbrain.corporate_incidence.CI_params for an example. verbose: bool A boolean value indicated whether or not to write model progress reports. stacked: bool A boolean value indicating weather the provided reform is in the format used for stacked reform analysis Returns ------- None """ assert isinstance(start_year, int) & isinstance( end_year, int ), "Start and end years must be integers" assert start_year <= end_year, ( f"Specified end year, {end_year}, is before specified start year, " f"{start_year}." ) assert TaxBrain.FIRST_BUDGET_YEAR <= start_year, ( f"Specified start_year, {start_year}, comes before first known " f"budget year, {TaxBrain.FIRST_BUDGET_YEAR}." ) assert end_year <= TaxBrain.LAST_BUDGET_YEAR, ( f"Specified end_year, {end_year}, comes after last known " f"budget year, {TaxBrain.LAST_BUDGET_YEAR}." ) if corp_revenue: assert ( len(corp_revenue) == end_year - start_year + 1 ), f"Corporate revenue is not given for each budget year" self.microdata = microdata self.start_year = start_year self.end_year = end_year self.base_data = {yr: {} for yr in range(start_year, end_year + 1)} self.reform_data = {yr: {} for yr in range(start_year, end_year + 1)} self.corp_revenue = corp_revenue self.ci_params = corp_incidence_assumptions self.verbose = verbose self.stacked = stacked self.stacked_reforms = None # only used if stacked is true # Process user inputs early to throw any errors quickly self.params = self._process_user_mods(reform, assump) self.params["behavior"] = behavior if base_policy: base_policy = self._process_user_mods(base_policy, None) self.params["base_policy"] = base_policy["policy"] else: self.params["base_policy"] = None self.has_run = False
[docs] def run( self, varlist: list = DEFAULT_VARIABLES, client=None, num_workers=1 ): """ Run the calculators. TaxBrain will determine whether to do a static or partial equilibrium run based on the user's inputs when initializing the TaxBrain object. Parameters ---------- varlist: list variables from the microdata to be stored in each year Returns ------- None """ if not isinstance(varlist, list): msg = f"'varlist' is of type {type(varlist)}. Must be a list." raise TypeError(msg) if self.stacked: base_calc, policy, records = self._make_stacked_objects() self._stacked_run( varlist, base_calc, policy, records, client, num_workers ) del base_calc else: base_calc, reform_calc = self._make_calculators() if self.params["behavior"]: if self.verbose: print("Running dynamic simulations") self._dynamic_run( varlist, base_calc, reform_calc, client, num_workers ) else: if self.verbose: print("Running static simulations") self._static_run( varlist, base_calc, reform_calc, client, num_workers ) del base_calc, reform_calc setattr(self, "has_run", True)
[docs] def weighted_totals( self, var: str, include_total: bool = False ) -> pd.DataFrame: """ Create a pandas DataFrame that shows the weighted sum or a specified variable under the baseline policy, reform policy, and the difference between the two. Parameters ---------- var: str Variable name for variable you want the weighted total of. include_total: bool If true the returned DataFrame will include a "total" columns Returns ------- Pandas DataFrame A Pandas DataFrame with rows for the baseline total, reform total, and the difference between the two. """ base_totals = {} reform_totals = {} differences = {} for year in range(self.start_year, self.end_year + 1): base_totals[year] = ( self.base_data[year]["s006"] * self.base_data[year][var] ).sum() reform_totals[year] = ( self.reform_data[year]["s006"] * self.reform_data[year][var] ).sum() differences[year] = reform_totals[year] - base_totals[year] table = pd.DataFrame( [base_totals, reform_totals, differences], index=["Base", "Reform", "Difference"], ) if include_total: table["Total"] = table.sum(axis=1) return table
[docs] def multi_var_table( self, varlist: list, calc: str, include_total: bool = False ) -> pd.DataFrame: """ Create a Pandas DataFrame with multiple variables from the specified data source Parameters ---------- varlist: list list of variables to include in the table calc: str specify reform or base calculator data, can take either `'REFORM'` or `'BASE'` include_total: bool If true the returned DataFrame will include a "total" column Returns ------- df: Pandas DataFrame A Pandas DataFrame containing the weighted sum of each variable passed in the `varlist` argument for each year in the analysis. """ if not isinstance(varlist, list): msg = f"'varlist' is of type {type(varlist)}. Must be a list." raise TypeError(msg) if calc.upper() == "REFORM": data = self.reform_data elif calc.upper() == "BASE": data = self.base_data else: raise ValueError("'calc' must be 'base' or 'reform'") data_dict = defaultdict(list) for year in range(self.start_year, self.end_year + 1): for var in varlist: data_dict[var] += [weighted_sum(data[year], var)] df = pd.DataFrame( data_dict, index=range(self.start_year, self.end_year + 1) ) table = df.transpose() if include_total: table["Total"] = table.sum(axis=1) return table
[docs] def distribution_table( self, year: int, groupby: str, income_measure: str, calc: str, pop_quantiles: bool = False, ) -> pd.DataFrame: """ Method to create a distribution table Parameters ---------- year: int which year the distribution table data should be from groupby: str determines how the rows in the table are sorted options: 'weighted_deciles', 'standard_income_bins', 'soi_agi_bin' income_measure: str determines which variable is used to sort the rows in the table options: 'expanded_income' or 'expanded_income_baseline' calc: str which calculator to use, can take either `'REFORM'` or `'BASE'` calc: which calculator to use: base or reform pop_quantiles: bool whether or not weighted_deciles contain equal number of tax units (False) or people (True) Returns ------- table: Pandas DataFrame distribution table """ # pull desired data if calc.lower() == "base": data = self.base_data[year] elif calc.lower() == "reform": data = self.reform_data[year] else: raise ValueError("calc must be either BASE or REFORM") # minor data preparation before calling the function if pop_quantiles: data["count"] = data["s006"] * data["XTOT"] else: data["count"] = data["s006"] data["count_ItemDed"] = data["count"].where(data["c04470"] > 0.0, 0.0) data["count_StandardDed"] = data["count"].where( data["standard"] > 0.0, 0.0 ) data["count_AMT"] = data["count"].where(data["c09600"] > 0.0, 0.0) if income_measure == "expanded_income_baseline": base_income = self.base_data[year]["expanded_income"] data["expanded_income_baseline"] = base_income table = create_distribution_table( data, groupby, income_measure, pop_quantiles ) return table
[docs] def differences_table( self, year: int, groupby: str, tax_to_diff: str, pop_quantiles: bool = False, ) -> pd.DataFrame: """ Method to create a differences table Parameters ---------- year: int which year the difference table should be from groupby: str determines how the rows in the table are sorted options: 'weighted_deciles', 'standard_income_bins', 'soi_agi_bin' tax_to_diff: str which tax to take the difference of options: 'iitax', 'payrolltax', 'combined' pop_quantiles: bool whether weighted_deciles contain an equal number of tax units (False) or people (True) Returns ------- table: Pandas DataFrame differences table """ base_data = self.base_data[year] reform_data = self.reform_data[year] table = create_difference_table( base_data, reform_data, groupby, tax_to_diff, pop_quantiles ) return table
# ----- private methods ----- def _taxcalc_advance(self, calc, varlist, year, reform=False): """ This function advances the year used in Tax-Calculator, computes tax liability and rates, and saves the results to a dictionary. Args: calc (Tax-Calculator Calculator object): TC calculator varlist (list): variables to return year (int): year to begin advancing from reform (bool): whether Calculator object is for the reform policy Returns: tax_dict (dict): a dictionary of microdata with marginal tax rates and other information computed in TC """ calc.advance_to_year(year) if self.corp_revenue is not None: if reform: calc = dist_corp( calc, self.corp_revenue, year, self.start_year, self.ci_params, ) calc.calc_all() df = calc.dataframe(varlist) return df def _behresp_advance(self, base_calc, reform_calc, varlist, year): """ This function advances the year used in the Behavioral Responses model and saves the results to a dictionary. Args: calc1 (Tax-Calculator Calculator object): TC calculator year (int): year to begin advancing from Returns: tax_dict (dict): a dictionary of microdata with marginal tax rates and other information computed in TC """ base_calc.advance_to_year(year) reform_calc.advance_to_year(year) if self.corp_revenue is not None: reform_calc = dist_corp( reform_calc, self.corp_revenue, year, self.start_year, self.ci_params, ) base, reform = behresp.response( base_calc, reform_calc, self.params["behavior"], dump=True ) base_df = base[varlist] reform_df = reform[varlist] return [base_df, reform_df] def _static_run( self, varlist, base_calc, reform_calc, client, num_workers ): """ Run the calculator for a static analysis """ if "s006" not in varlist: # ensure weight is always included varlist.append("s006") lazy_values = [] for yr in range(self.start_year, self.end_year + 1): lazy_values.extend( [ delayed(self._taxcalc_advance(base_calc, varlist, yr)), delayed( self._taxcalc_advance( reform_calc, varlist, yr, reform=True ) ), ] ) if client: futures = client.compute(lazy_values, num_workers=num_workers) results = client.gather(futures) else: results = results = compute( *lazy_values, scheduler=dask.multiprocessing.get, num_workers=num_workers, ) # add results to base and reform data yr = self.start_year for i in np.arange(0, len(results), 2): self.base_data[yr] = results[i] self.reform_data[yr] = results[i + 1] yr += 1 del results def _dynamic_run( self, varlist, base_calc, reform_calc, client, num_workers ): """ Run a dynamic response """ if "s006" not in varlist: # ensure weight is always included varlist.append("s006") lazy_values = [] for yr in range(self.start_year, self.end_year + 1): lazy_values.extend( [ delayed( self._behresp_advance( base_calc, reform_calc, varlist, yr ) ) ] ) if client: futures = client.compute(lazy_values, num_workers=num_workers) results = client.gather(futures) else: results = results = compute( *lazy_values, scheduler=dask.multiprocessing.get, num_workers=num_workers, ) # add results to base and reform data for i in range(len(results)): yr = self.start_year + i self.base_data[yr] = results[i][0] self.reform_data[yr] = results[i][1] del results def _stacked_run( self, varlist, base_calc, policy, records, client, num_workers ): revenue_output = {} BW_len = self.end_year - self.start_year + 1 # run the base calc first to get baseline results lazy_values = [] for yr in range(self.start_year, self.end_year + 1): lazy_values.append( delayed(self._taxcalc_advance(base_calc, varlist, yr)) ) if client: futures = client.compute(lazy_values, num_workers=num_workers) results = client.gather(futures) else: results = results = compute( *lazy_values, scheduler=dask.multiprocessing.get, num_workers=num_workers, ) # add results to data and revenue outputs revenue_output["Baseline"] = np.zeros(BW_len) yr = self.start_year # for i in np.arange(0, len(results), 2): for i, res in enumerate(results): self.base_data[yr] = res combined = (res["combined"] * res["s006"]).sum() revenue_output["Baseline"][yr - self.start_year] = combined yr += 1 reform_list = list(self.stacked_reforms.keys()) # Loop over different provisions for k, v in self.stacked_reforms.items(): if self.verbose: print("Analyzing ", k) revenue_output[k] = np.zeros(BW_len) ref = policy.read_json_reform(v) # update Policy object with additional provisions policy.implement_reform(ref) # update Calculator object with new Policy object calc = tc.calculator.Calculator(policy=policy, records=records) # loop over each year in budget window for yr in np.arange(self.start_year, self.end_year + 1): calc.advance_to_year(yr) # change income in accordance with corp income tax # distributed across individual taxpayers if self.corp_revenue is not None: calc = dist_corp( calc, self.corp_revenue, yr, self.start_year, self.ci_params, ) # makes calculations on microdata calc.calc_all() # compute total revenue revenue_output[k][yr - self.start_year] = calc.weighted_total( "combined" ) # if we're on the last reform piece, save the data if k == reform_list[-1]: self.reform_data[yr] = calc.dataframe(varlist) df = pd.DataFrame.from_dict( revenue_output, orient="Index", columns=np.arange(self.start_year, self.start_year + BW_len), ) # Compute differences from one provision to another rev_est_tbl = df.diff() # Drop baseline revenue since reporting differences relative to baseline rev_est_tbl.drop(labels="Baseline", inplace=True) # Create totals across budget window tot_col = f"{self.start_year}-{self.end_year}" rev_est_tbl[tot_col] = rev_est_tbl[list(rev_est_tbl.columns)].sum( axis=1 ) # Create totals across provisions rev_est_tbl.loc["Total"] = rev_est_tbl.sum() # save the table as an attribute of the TaxBrain object setattr(self, "stacked_table", rev_est_tbl) def _process_user_mods(self, reform, assump): """ Logic to process user mods and set self.params """ def key_validation(actual_keys, required_keys, d_name): """ Validate keys if reform or assump is passed as a dictionary """ missing_keys = required_keys - actual_keys if missing_keys: msg = f"Required key(s) {missing_keys} missing from '{d_name}'" raise ValueError(msg) illegal_keys = actual_keys - required_keys if illegal_keys: msg = f"Illegal key(s) {illegal_keys} found in '{d_name}'" raise ValueError(msg) if isinstance(reform, dict): # If the reform is a dictionary, we'll leave it to Tax-Calculator # to catch errors in its implementation. Or, it's a stacked reform # and we will check that separately if isinstance(assump, str) or not assump: params = tc.Calculator.read_json_param_objects(None, assump) elif isinstance(assump, dict): actual_keys = set(assump.keys()) required_keys = tc.Calculator.REQUIRED_ASSUMP_KEYS key_validation(actual_keys, required_keys, "assump") params = {**assump} else: raise TypeError( "'assump' is not a string, dictionary, or None" ) params["policy"] = reform # Check stacked reforms if self.stacked: pol = tc.Policy() full_policy = {} for name, subreform in reform.items(): # assume that if the reform is a string it's a JSON reform # Otherwise the parameters are already a dictionary if isinstance(subreform, str): pol_params = tc.Policy.read_json_reform(subreform) else: m = "Reform must be valid JSON string or dictionary" assert isinstance(subreform, dict), m pol_params = subreform try: update_policy(pol, pol_params) except ValidationError as e: print(f"Validation error in {name}") raise e # update the full policy to save later full_policy = {**full_policy, **pol_params} setattr(self, "stacked_reforms", reform) params["policy"] = full_policy elif isinstance(reform, str) or not reform: if isinstance(assump, str) or not assump: params = tc.Calculator.read_json_param_objects(reform, assump) elif isinstance(assump, dict): # Check to ensure that the assumption dictionary contains # all the needed keys. Tax-Calculator will check that they # are ultimately defined correctly when attempting to # use them. actual_keys = set(assump.keys()) required_keys = tc.Calculator.REQUIRED_ASSUMP_KEYS key_validation(actual_keys, required_keys, "assump") params = tc.Calculator.read_json_param_objects(reform, None) for key in assump.keys(): params[key] = assump[key] else: raise TypeError( "'assump' is not a string, dictionary, or None" ) else: raise TypeError("'reform' is not a string, dictionary, or None") # confirm that all the expected keys are there required_keys = ( tc.Calculator.REQUIRED_ASSUMP_KEYS | tc.Calculator.REQUIRED_REFORM_KEYS ) assert set(params.keys()) == required_keys return params def _make_calculators(self): """ This function creates the baseline and reform calculators used when the `run()` method is called """ # Create two microsimulation calculators # Baseline calculator gd_base = tc.GrowDiff() gf_base = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_baseline"]: gd_base.update_growdiff(self.params["growdiff_baseline"]) gd_base.apply_to(gf_base) if self.microdata == "CPS": records = tc.Records.cps_constructor(data=None, gfactors=gf_base) elif self.microdata == "PUF": records = tc.Records( gfactors=gf_base, weights=tc.Records.PUF_WEIGHTS_FILENAME, ) elif self.microdata == "TMD": records = tc.Records.tmd_constructor( "tmd.csv", gfactors=gf_base, ) elif isinstance(self.microdata, dict): if self.microdata["growfactors"] is None: gd_base = tc.GrowDiff() gf_base = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_baseline"]: gd_base.update_growdiff(self.params["growdiff_baseline"]) gd_base.apply_to(gf_base) else: gd_base = tc.GrowDiff() gf_base = tc.GrowFactors(self.microdata["growfactors"]) # apply user specified growdiff if self.params["growdiff_baseline"]: gd_base.update_growdiff(self.params["growdiff_baseline"]) gd_base.apply_to(gf_base) records = tc.Records( self.microdata["data"], start_year=self.microdata["start_year"], gfactors=gf_base, weights=self.microdata["weights"], ) else: raise ValueError( "microdata must be 'CPS', 'PUF', 'TMD', or a dictionary" ) policy = tc.Policy(gf_base) if self.params["base_policy"]: update_policy(policy, self.params["base_policy"]) base_calc = tc.Calculator( policy=policy, records=records, verbose=self.verbose ) # Reform calculator gd_reform = tc.GrowDiff() gf_reform = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_response"]: gd_reform.update_growdiff(self.params["growdiff_response"]) gd_reform.apply_to(gf_reform) if self.microdata == "CPS": records = tc.Records.cps_constructor(data=None, gfactors=gf_reform) elif self.microdata == "PUF": records = tc.Records( gfactors=gf_reform, weights=tc.Records.PUF_WEIGHTS_FILENAME, ) elif self.microdata == "TMD": records = tc.Records.tmd_constructor( "tmd.csv", gfactors=gf_reform, ) elif isinstance(self.microdata, dict): if self.microdata["growfactors"] is None: gd_reform = tc.GrowDiff() gf_reform = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_response"]: gd_reform.update_growdiff(self.params["growdiff_response"]) gd_reform.apply_to(gf_reform) else: gd_reform = tc.GrowDiff() gf_reform = tc.GrowFactors(self.microdata["growfactors"]) # apply user specified growdiff if self.params["growdiff_response"]: gd_reform.update_growdiff(self.params["growdiff_response"]) gd_reform.apply_to(gf_reform) records = tc.Records( self.microdata["data"], start_year=self.microdata["start_year"], gfactors=gf_reform, weights=self.microdata["weights"], ) else: raise ValueError( "microdata must be 'CPS', 'PUF', 'TMD', or a dictionary" ) policy = tc.Policy(gf_reform) if self.params["base_policy"]: update_policy(policy, self.params["base_policy"]) update_policy(policy, self.params["policy"]) # Initialize Calculator reform_calc = tc.Calculator( policy=policy, records=records, verbose=self.verbose ) # delete all unneeded variables del gd_base, gd_reform, records, gf_base, gf_reform, policy return base_calc, reform_calc # TODO: update these method to allow for different microdata as above # but code becoming cumbersome, so should probably streamline in a single get calculator function def _make_stacked_objects(self): """ This method makes the base calculator and policy and records objects for stacked reforms. The difference between this and the standard _make_calcuators method is that this method only fully creates the baseline calculator. For the reform, it creates policy and records objects and implements any growth assumptions provided by the user. """ # Create two microsimulation calculators gd_base = tc.GrowDiff() gf_base = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_baseline"]: gd_base.update_growdiff(self.params["growdiff_baseline"]) gd_base.apply_to(gf_base) if self.microdata == "CPS": records = tc.Records.cps_constructor(data=None, gfactors=gf_base) elif self.microdata == "PUF": records = tc.Records( "puf.csv", gfactors=gf_base, weights=tc.Records.PUF_WEIGHTS_FILENAME, ) elif self.microdata == "TMD": records = tc.Records.tmd_constructor( "tmd.csv", gfactors=gf_base, ) elif isinstance(self.microdata, dict): if self.microdata["growfactors"] is None: gd_base = tc.GrowDiff() gf_base = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_baseline"]: gd_base.update_growdiff(self.params["growdiff_baseline"]) gd_base.apply_to(gf_base) else: gd_base = tc.GrowDiff() gf_base = tc.GrowFactors(self.microdata["growfactors"]) # apply user specified growdiff if self.params["growdiff_baseline"]: gd_base.update_growdiff(self.params["growdiff_baseline"]) gd_base.apply_to(gf_base) records = tc.Records( self.microdata["data"], start_year=self.microdata["start_year"], gfactors=gf_base, weights=self.microdata["weights"], ) else: raise ValueError( "microdata must be 'CPS', 'PUF', 'TMD', or a dictionary" ) policy = tc.Policy(gf_base) if self.params["base_policy"]: update_policy(policy, self.params["base_policy"]) base_calc = tc.Calculator( policy=policy, records=records, verbose=self.verbose ) # Reform calculator gd_reform = tc.GrowDiff() gf_reform = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_response"]: gd_reform.update_growdiff(self.params["growdiff_response"]) gd_reform.apply_to(gf_reform) if self.microdata == "CPS": reform_records = tc.Records.cps_constructor( data=None, gfactors=gf_reform ) elif self.microdata == "PUF": reform_records = tc.Records( "puf.csv", gfactors=gf_reform, weights=tc.Records.PUF_WEIGHTS_FILENAME, ) elif self.microdata == "TMD": records = tc.Records.tmd_constructor( "tmd.csv", gfactors=gf_reform, ) elif isinstance(self.microdata, dict): if self.microdata["growfactors"] is None: gd_reform = tc.GrowDiff() gf_reform = tc.GrowFactors() # apply user specified growdiff if self.params["growdiff_response"]: gd_reform.update_growdiff(self.params["growdiff_response"]) gd_reform.apply_to(gf_reform) else: gd_reform = tc.GrowDiff() gf_reform = tc.GrowFactors(self.microdata["growfactors"]) # apply user specified growdiff if self.params["growdiff_response"]: gd_reform.update_growdiff(self.params["growdiff_response"]) gd_reform.apply_to(gf_reform) reform_records = tc.Records( self.microdata["data"], start_year=self.microdata["start_year"], gfactors=gf_reform, weights=self.microdata["weights"], ) else: raise ValueError( "microdata must be 'CPS', 'PUF', 'TMD', or a dictionary" ) reform_policy = tc.Policy(gf_reform) return base_calc, reform_policy, reform_records