""" FP&A Test Data Generator Generates realistic CSV data for: Budget vs Actuals, Cash Flow, P&L, Headcount Covers 2 years (2023–2024), 4 departments, product & service revenue mix. """ import csv import random import os from datetime import date, timedelta from dataclasses import dataclass, fields, asdict from typing import List random.seed(42) # reproducible data OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "csv") os.makedirs(OUTPUT_DIR, exist_ok=True) # ── Company config ──────────────────────────────────────────────────────────── COMPANY = "AcmeSaaS Inc." DEPARTMENTS = ["Engineering", "Sales", "Marketing", "Operations"] YEARS = [2023, 2024] # Revenue split: product (SaaS subscriptions) vs service (consulting/support) PRODUCT_REVENUE_MIX = 0.70 # 70% product (recurring SaaS) SERVICE_REVENUE_MIX = 0.30 # 30% services # Monthly growth rates (realistic SaaS-style) PRODUCT_MONTHLY_GROWTH = 0.025 # 2.5% MoM SERVICE_MONTHLY_GROWTH = 0.015 # 1.5% MoM # Base monthly revenue ($) BASE_PRODUCT_REVENUE = 180_000 BASE_SERVICE_REVENUE = 75_000 # Variance helpers — actuals deviate from budget by ±% def vary(value: float, pct: float = 0.08) -> float: """Apply random variance to simulate actuals vs budget.""" return round(value * (1 + random.uniform(-pct, pct)), 2) def months_range(years: List[int]): for year in years: for month in range(1, 13): yield year, month # ── 1. Revenue (Budget vs Actuals) ─────────────────────────────────────────── @dataclass class RevenueRow: company: str year: int month: int period: str # e.g. "2023-01" revenue_type: str # "Product" | "Service" budget_amount: float actual_amount: float variance: float # actual - budget variance_pct: float # variance / budget * 100 def generate_revenue(): rows = [] prod_base = BASE_PRODUCT_REVENUE svc_base = BASE_SERVICE_REVENUE for year, month in months_range(YEARS): period = f"{year}-{month:02d}" for rev_type, base in [("Product", prod_base), ("Service", svc_base)]: budget = round(base, 2) actual = vary(budget, pct=0.10) variance = round(actual - budget, 2) vpct = round((variance / budget) * 100, 2) if budget else 0 rows.append(RevenueRow(COMPANY, year, month, period, rev_type, budget, actual, variance, vpct)) # grow base each month prod_base *= (1 + PRODUCT_MONTHLY_GROWTH) svc_base *= (1 + SERVICE_MONTHLY_GROWTH) write_csv("revenue_budget_vs_actuals.csv", rows) print(f" ✓ revenue_budget_vs_actuals.csv ({len(rows)} rows)") # ── 2. Department Opex (Budget vs Actuals) ─────────────────────────────────── DEPT_BUDGETS = { # (base_monthly_opex, growth_rate) "Engineering": (95_000, 0.012), "Sales": (70_000, 0.018), "Marketing": (55_000, 0.015), "Operations": (40_000, 0.008), } OPEX_CATEGORIES = ["Salaries", "Software & Tools", "Travel", "Marketing Spend", "Cloud Infrastructure", "Contractors", "Office & Facilities"] @dataclass class OpexRow: company: str department: str year: int month: int period: str category: str budget_amount: float actual_amount: float variance: float variance_pct: float def generate_opex(): rows = [] dept_bases = {d: v[0] for d, v in DEPT_BUDGETS.items()} for year, month in months_range(YEARS): period = f"{year}-{month:02d}" for dept, (_, growth) in DEPT_BUDGETS.items(): total_budget = dept_bases[dept] # Split across categories with random weights weights = [random.uniform(0.05, 0.35) for _ in OPEX_CATEGORIES] total_w = sum(weights) weights = [w / total_w for w in weights] for cat, w in zip(OPEX_CATEGORIES, weights): budget = round(total_budget * w, 2) actual = vary(budget, pct=0.12) variance = round(actual - budget, 2) vpct = round((variance / budget) * 100, 2) if budget else 0 rows.append(OpexRow(COMPANY, dept, year, month, period, cat, budget, actual, variance, vpct)) dept_bases[dept] *= (1 + growth) write_csv("opex_budget_vs_actuals.csv", rows) print(f" ✓ opex_budget_vs_actuals.csv ({len(rows)} rows)") # ── 3. P&L / Income Statement ───────────────────────────────────────────────── @dataclass class PLRow: company: str year: int month: int period: str product_revenue: float service_revenue: float total_revenue: float cogs_product: float # ~25% of product rev cogs_service: float # ~45% of service rev (labor-heavy) total_cogs: float gross_profit: float gross_margin_pct: float total_opex: float ebitda: float ebitda_margin_pct: float net_income: float # after ~25% tax estimate def generate_pl(): rows = [] prod_base = BASE_PRODUCT_REVENUE svc_base = BASE_SERVICE_REVENUE dept_bases = {d: v[0] for d, v in DEPT_BUDGETS.items()} for year, month in months_range(YEARS): period = f"{year}-{month:02d}" prod_rev = vary(prod_base, 0.08) svc_rev = vary(svc_base, 0.10) total_rev = round(prod_rev + svc_rev, 2) cogs_prod = round(prod_rev * vary(0.25, 0.05), 2) cogs_svc = round(svc_rev * vary(0.45, 0.06), 2) total_cogs = round(cogs_prod + cogs_svc, 2) gross_profit = round(total_rev - total_cogs, 2) gm_pct = round((gross_profit / total_rev) * 100, 2) if total_rev else 0 total_opex = round(sum( vary(dept_bases[d], 0.10) for d in DEPARTMENTS ), 2) ebitda = round(gross_profit - total_opex, 2) ebitda_pct = round((ebitda / total_rev) * 100, 2) if total_rev else 0 net_income = round(ebitda * 0.75, 2) # rough 25% tax rows.append(PLRow( COMPANY, year, month, period, round(prod_rev, 2), round(svc_rev, 2), total_rev, cogs_prod, cogs_svc, total_cogs, gross_profit, gm_pct, total_opex, ebitda, ebitda_pct, net_income )) prod_base *= (1 + PRODUCT_MONTHLY_GROWTH) svc_base *= (1 + SERVICE_MONTHLY_GROWTH) for d, (_, g) in DEPT_BUDGETS.items(): dept_bases[d] *= (1 + g) write_csv("pl_income_statement.csv", rows) print(f" ✓ pl_income_statement.csv ({len(rows)} rows)") # ── 4. Cash Flow ────────────────────────────────────────────────────────────── @dataclass class CashFlowRow: company: str year: int month: int period: str # Operating cash_collected_product: float # product ARR collections (may lag revenue) cash_collected_service: float cash_paid_opex: float cash_paid_cogs: float net_operating_cash_flow: float # Investing capex: float # infra / hardware net_investing_cash_flow: float # Financing loan_repayment: float equity_raised: float net_financing_cash_flow: float # Summary net_change_in_cash: float closing_cash_balance: float def generate_cashflow(): rows = [] cash_balance = 1_200_000.0 # starting cash (seed round runway) prod_base = BASE_PRODUCT_REVENUE svc_base = BASE_SERVICE_REVENUE dept_bases = {d: v[0] for d, v in DEPT_BUDGETS.items()} for year, month in months_range(YEARS): period = f"{year}-{month:02d}" # Collections slightly lag invoicing (DSO ~30 days effect) cash_prod = vary(prod_base * 0.95, 0.06) cash_svc = vary(svc_base * 0.90, 0.08) # services collect slower opex_paid = sum(vary(dept_bases[d], 0.08) for d in DEPARTMENTS) cogs_paid = vary((prod_base * 0.25) + (svc_base * 0.45), 0.07) net_op = round(cash_prod + cash_svc - opex_paid - cogs_paid, 2) # Investing — occasional capex spikes capex = vary(8_000, 0.40) if random.random() > 0.4 else 0.0 net_inv = round(-capex, 2) # Financing — occasional loan repayment loan = vary(5_000, 0.20) if month % 3 == 0 else 0.0 equity = 0.0 if year == 2023 and month == 6: equity = 500_000.0 # Series A mid-2023 net_fin = round(equity - loan, 2) net_change = round(net_op + net_inv + net_fin, 2) cash_balance = round(cash_balance + net_change, 2) rows.append(CashFlowRow( COMPANY, year, month, period, round(cash_prod, 2), round(cash_svc, 2), round(opex_paid, 2), round(cogs_paid, 2), net_op, round(capex, 2), net_inv, round(loan, 2), round(equity, 2), net_fin, net_change, cash_balance )) prod_base *= (1 + PRODUCT_MONTHLY_GROWTH) svc_base *= (1 + SERVICE_MONTHLY_GROWTH) for d, (_, g) in DEPT_BUDGETS.items(): dept_bases[d] *= (1 + g) write_csv("cash_flow.csv", rows) print(f" ✓ cash_flow.csv ({len(rows)} rows)") # ── 5. Headcount & Workforce ────────────────────────────────────────────────── ROLES = { "Engineering": [("Software Engineer", 120_000), ("Senior Engineer", 160_000), ("Engineering Manager", 180_000), ("DevOps Engineer", 130_000)], "Sales": [("Account Executive", 90_000), ("Sales Manager", 140_000), ("SDR", 65_000)], "Marketing": [("Marketing Manager", 110_000), ("Content Strategist", 80_000), ("Growth Analyst", 95_000)], "Operations": [("Operations Manager", 115_000), ("Customer Success", 75_000), ("Finance Analyst", 95_000)], } @dataclass class HeadcountRow: company: str employee_id: str department: str role: str hire_date: str termination_date: str # empty if active status: str # Active | Terminated annual_salary_budget: float actual_salary_paid_ytd: float # YTD for the given year year: int month: int period: str headcount_fte: float # 1.0 full time, 0.5 contractor etc. def generate_headcount(): rows = [] emp_id = 1000 employees = [] # Seed initial employees at start of 2023 for dept, role_list in ROLES.items(): # Start with 2-4 per department count = random.randint(2, 4) for _ in range(count): role, salary = random.choice(role_list) hire_date = date(2022, random.randint(1, 12), random.randint(1, 28)) employees.append({ "id": f"EMP{emp_id}", "dept": dept, "role": role, "salary": salary, "hire_date": hire_date, "term_date": None, "fte": 1.0, }) emp_id += 1 for year, month in months_range(YEARS): period = f"{year}-{month:02d}" current = date(year, month, 1) # Random hiring each month if random.random() > 0.55: dept = random.choice(DEPARTMENTS) role, salary = random.choice(ROLES[dept]) employees.append({ "id": f"EMP{emp_id}", "dept": dept, "role": role, "salary": salary, "hire_date": current, "term_date": None, "fte": random.choice([1.0, 1.0, 1.0, 0.5]), # mostly FT }) emp_id += 1 # Occasional attrition active = [e for e in employees if e["term_date"] is None] if len(active) > 6 and random.random() > 0.85: leaver = random.choice(active) leaver["term_date"] = current # Snapshot each employee for this month for emp in employees: if emp["hire_date"] > current: continue # not hired yet status = "Active" if emp["term_date"] is None or emp["term_date"] > current else "Terminated" months_in_year = month if emp["hire_date"].year < year else ( month - emp["hire_date"].month + 1 ) months_in_year = max(0, min(months_in_year, month)) ytd_paid = round((emp["salary"] / 12) * months_in_year * vary(1.0, 0.02), 2) rows.append(HeadcountRow( COMPANY, emp["id"], emp["dept"], emp["role"], str(emp["hire_date"]), str(emp["term_date"]) if emp["term_date"] else "", status, emp["salary"], ytd_paid, year, month, period, emp["fte"] )) write_csv("headcount_workforce.csv", rows) print(f" ✓ headcount_workforce.csv ({len(rows)} rows)") # ── CSV writer ──────────────────────────────────────────────────────────────── def write_csv(filename: str, rows: list): if not rows: return path = os.path.join(OUTPUT_DIR, filename) with open(path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=[field.name for field in fields(rows[0])]) writer.writeheader() writer.writerows([asdict(r) for r in rows]) # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": print(f"\n🏗 Generating FP&A test data for {COMPANY}") print(f" Periods : {YEARS[0]}-01 → {YEARS[-1]}-12 (24 months)") print(f" Depts : {', '.join(DEPARTMENTS)}\n") generate_revenue() generate_opex() generate_pl() generate_cashflow() generate_headcount() print(f"\n✅ All CSV files written to: {os.path.abspath(OUTPUT_DIR)}\n")