""" FP&A API Loader Reads generated CSV files and loads them into the Go FP&A API. Real endpoint map (from main.go): POST /api/v1/budgets ← one budget record per request PUT /api/v1/budgets/{id} ← update (not used in seeding) DELETE /api/v1/budgets/{id} ← delete (not used in seeding) POST /api/v1/actuals/ingest ← bulk actuals ingest GET /api/v1/variance ← read-only, not loaded GET /api/v1/variance/alerts ← read-only, not loaded GET /api/v1/health ← health check Load order matters: 1. Budgets first — actuals reference budget lines by category+period 2. Actuals second — ingested in bulk against existing budgets """ import csv import json import time import os import requests from typing import List, Dict, Any, Optional, Callable from dataclasses import dataclass DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "csv") # ── Config ──────────────────────────────────────────────────────────────────── @dataclass class LoaderConfig: base_url: str = "http://localhost:8080" batch_size: int = 50 # used for actuals ingest delay_between_batches: float = 0.05 dry_run: bool = False auth_token: Optional[str] = None DEFAULT_CONFIG = LoaderConfig() # ── CSV reading helpers ─────────────────────────────────────────────────────── INT_FIELDS = {"year", "month"} FLOAT_FIELDS = { "budget_amount", "actual_amount", "variance", "variance_pct", "product_revenue", "service_revenue", "total_revenue", "cogs_product", "cogs_service", "total_cogs", "gross_profit", "gross_margin_pct", "total_opex", "ebitda", "ebitda_margin_pct", "net_income", "cash_collected_product", "cash_collected_service", "cash_paid_opex", "cash_paid_cogs", "net_operating_cash_flow", "capex", "net_investing_cash_flow", "loan_repayment", "equity_raised", "net_financing_cash_flow", "net_change_in_cash", "closing_cash_balance", "annual_salary_budget", "actual_salary_paid_ytd", "headcount_fte", } def _coerce(row: Dict[str, str]) -> Dict[str, Any]: out = {} for k, v in row.items(): if k in INT_FIELDS: out[k] = int(v) if v else None elif k in FLOAT_FIELDS: out[k] = float(v) if v else None else: out[k] = v return out def read_csv(filename: str) -> List[Dict[str, Any]]: path = os.path.join(DATA_DIR, filename) if not os.path.exists(path): raise FileNotFoundError(f"CSV not found: {path} (run generators/generate_data.py first)") with open(path, newline="") as f: return [_coerce(row) for row in csv.DictReader(f)] # ── Payload mappers ─────────────────────────────────────────────────────────── # Each mapper takes one CSV row and returns the JSON body your Go handler expects. # Adjust field names here if your Go structs use different names. def revenue_row_to_budget(row: Dict) -> Dict: """revenue_budget_vs_actuals.csv → POST /api/v1/budgets""" return { "category": row["revenue_type"], # "Product" | "Service" "department": "Revenue", "period": row["period"], # "2023-01" "year": row["year"], "month": row["month"], "amount": row["budget_amount"], "currency": "USD", "notes": f"{row['revenue_type']} revenue budget", } def revenue_row_to_actual(row: Dict) -> Dict: """revenue_budget_vs_actuals.csv → POST /api/v1/actuals/ingest""" return { "category": row["revenue_type"], "department": "Revenue", "period": row["period"], "year": row["year"], "month": row["month"], "amount": row["actual_amount"], "currency": "USD", "source": "csv_import", } def opex_row_to_budget(row: Dict) -> Dict: """opex_budget_vs_actuals.csv → POST /api/v1/budgets""" return { "category": row["category"], # "Salaries", "Cloud Infrastructure", … "department": row["department"], "period": row["period"], "year": row["year"], "month": row["month"], "amount": row["budget_amount"], "currency": "USD", "notes": f"{row['department']} opex budget", } def opex_row_to_actual(row: Dict) -> Dict: """opex_budget_vs_actuals.csv → POST /api/v1/actuals/ingest""" return { "category": row["category"], "department": row["department"], "period": row["period"], "year": row["year"], "month": row["month"], "amount": row["actual_amount"], "currency": "USD", "source": "csv_import", } def pl_row_to_actual(row: Dict) -> Dict: """ pl_income_statement.csv → POST /api/v1/actuals/ingest The P&L is a derived/summary view; we ingest key line items as actuals. Budgets for these are already covered by revenue + opex CSVs. """ return { "category": "P&L Summary", "department": "Finance", "period": row["period"], "year": row["year"], "month": row["month"], "amount": row["net_income"], "currency": "USD", "source": "csv_import", "metadata": { "total_revenue": row["total_revenue"], "gross_profit": row["gross_profit"], "gross_margin_pct": row["gross_margin_pct"], "ebitda": row["ebitda"], "ebitda_margin_pct": row["ebitda_margin_pct"], }, } def cashflow_row_to_actual(row: Dict) -> Dict: """cash_flow.csv → POST /api/v1/actuals/ingest""" return { "category": "Cash Flow", "department": "Finance", "period": row["period"], "year": row["year"], "month": row["month"], "amount": row["net_change_in_cash"], "currency": "USD", "source": "csv_import", "metadata": { "net_operating_cash_flow": row["net_operating_cash_flow"], "net_investing_cash_flow": row["net_investing_cash_flow"], "net_financing_cash_flow": row["net_financing_cash_flow"], "closing_cash_balance": row["closing_cash_balance"], "equity_raised": row["equity_raised"], }, } def headcount_row_to_actual(row: Dict) -> Dict: """headcount_workforce.csv → POST /api/v1/actuals/ingest (active employees only)""" return { "category": "Headcount", "department": row["department"], "period": row["period"], "year": row["year"], "month": row["month"], "amount": row["actual_salary_paid_ytd"], "currency": "USD", "source": "csv_import", "metadata": { "employee_id": row["employee_id"], "role": row["role"], "status": row["status"], "fte": row["headcount_fte"], "annual_salary": row["annual_salary_budget"], }, } # ── HTTP helpers ────────────────────────────────────────────────────────────── def _headers(config: LoaderConfig) -> Dict: h = {"Content-Type": "application/json"} if config.auth_token: h["Authorization"] = f"Bearer {config.auth_token}" return h def post_one(url: str, payload: Dict, config: LoaderConfig) -> Dict: """Single POST — used for /api/v1/budgets (one record per call).""" if config.dry_run: print(f" [DRY RUN] POST {url}") print(f" {json.dumps(payload, indent=6)}") return {"status": "dry_run"} try: resp = requests.post(url, json=payload, headers=_headers(config), timeout=10) resp.raise_for_status() return resp.json() if resp.content else {"status": "ok"} except requests.exceptions.ConnectionError: return {"error": "connection_refused"} except requests.exceptions.HTTPError as e: return {"error": str(e), "status_code": resp.status_code, "body": resp.text} except Exception as e: return {"error": str(e)} def post_batch(url: str, records: List[Dict], config: LoaderConfig) -> Dict: """Batch POST — used for /api/v1/actuals/ingest.""" if config.dry_run: print(f" [DRY RUN] POST {url} ({len(records)} records)") print(f" Sample: {json.dumps(records[0], indent=6)}") return {"status": "dry_run", "count": len(records)} try: resp = requests.post(url, json={"records": records}, headers=_headers(config), timeout=30) resp.raise_for_status() return resp.json() if resp.content else {"status": "ok"} except requests.exceptions.ConnectionError: return {"error": "connection_refused"} except requests.exceptions.HTTPError as e: return {"error": str(e), "status_code": resp.status_code, "body": resp.text} except Exception as e: return {"error": str(e)} # ── Budget loader: POST /api/v1/budgets ─────────────────────────────────────── def load_budgets(config: LoaderConfig = DEFAULT_CONFIG) -> Dict: """ Loads budget rows from revenue + opex CSVs. Each row is a separate POST (budget records are individual, not batched). Deduplicated by (category, department, period) to avoid double-posting. """ url = config.base_url.rstrip("/") + "/api/v1/budgets" budget_rows = [] for filename, mapper in [ ("revenue_budget_vs_actuals.csv", revenue_row_to_budget), ("opex_budget_vs_actuals.csv", opex_row_to_budget), ]: rows = read_csv(filename) budget_rows.extend(mapper(r) for r in rows) # Deduplicate: last write wins for same category+dept+period seen = {} for row in budget_rows: key = (row["category"], row["department"], row["period"]) seen[key] = row unique = list(seen.values()) print(f"\n📋 Loading budgets → {url}") print(f" {len(unique)} unique budget lines") results = {"total": len(unique), "ok": 0, "errors": []} for i, payload in enumerate(unique, 1): result = post_one(url, payload, config) if "error" in result: results["errors"].append({**result, "payload": payload}) if len(results["errors"]) <= 3: # don't flood the console print(f" ⚠ [{i}/{len(unique)}] {result['error']}") else: results["ok"] += 1 if i % 50 == 0 or i == len(unique): print(f" ✓ {i}/{len(unique)} budgets posted") time.sleep(0.01) # light throttle for individual POSTs return results # ── Actuals loader: POST /api/v1/actuals/ingest ─────────────────────────────── def load_actuals(config: LoaderConfig = DEFAULT_CONFIG) -> Dict: """ Collects actuals from all CSV sources and bulk-ingests them. Filters headcount to Active employees only to avoid salary double-counting. """ url = config.base_url.rstrip("/") + "/api/v1/actuals/ingest" all_actuals = [] # Revenue actuals for row in read_csv("revenue_budget_vs_actuals.csv"): all_actuals.append(revenue_row_to_actual(row)) # Opex actuals for row in read_csv("opex_budget_vs_actuals.csv"): all_actuals.append(opex_row_to_actual(row)) # P&L summary actuals for row in read_csv("pl_income_statement.csv"): all_actuals.append(pl_row_to_actual(row)) # Cash flow actuals for row in read_csv("cash_flow.csv"): all_actuals.append(cashflow_row_to_actual(row)) # Headcount actuals — active employees only, one record per employee per month headcount_seen = set() for row in read_csv("headcount_workforce.csv"): if row["status"] != "Active": continue key = (row["employee_id"], row["period"]) if key in headcount_seen: continue headcount_seen.add(key) all_actuals.append(headcount_row_to_actual(row)) print(f"\n📥 Loading actuals → {url}") print(f" {len(all_actuals)} records | batch size {config.batch_size}") results = {"total": len(all_actuals), "batches": 0, "errors": []} for i in range(0, len(all_actuals), config.batch_size): batch = all_actuals[i : i + config.batch_size] result = post_batch(url, batch, config) results["batches"] += 1 if "error" in result: results["errors"].append(result) print(f" ⚠ batch {results['batches']}: {result['error']}") else: print(f" ✓ batch {results['batches']} ({len(batch)} records)") time.sleep(config.delay_between_batches) return results # ── Variance check: GET /api/v1/variance ────────────────────────────────────── def check_variance(config: LoaderConfig = DEFAULT_CONFIG, period: Optional[str] = None): """ Calls the variance report after loading to verify data landed correctly. """ url = config.base_url.rstrip("/") + "/api/v1/variance" params = {"period": period} if period else {} print(f"\n📊 Fetching variance report → {url}") if config.dry_run: print(" [DRY RUN] skipped") return {} try: resp = requests.get(url, params=params, timeout=10) resp.raise_for_status() data = resp.json() print(f" ✓ {len(data) if isinstance(data, list) else 1} variance entries returned") return data except Exception as e: print(f" ⚠ {e}") return {"error": str(e)} def check_alerts(config: LoaderConfig = DEFAULT_CONFIG): """Calls /api/v1/variance/alerts — shows any over/under budget flags.""" url = config.base_url.rstrip("/") + "/api/v1/variance/alerts" print(f"\n🚨 Fetching variance alerts → {url}") if config.dry_run: print(" [DRY RUN] skipped") return {} try: resp = requests.get(url, timeout=10) resp.raise_for_status() data = resp.json() count = len(data) if isinstance(data, list) else "?" print(f" ✓ {count} alerts") return data except Exception as e: print(f" ⚠ {e}") return {"error": str(e)} # ── Full seed run ───────────────────────────────────────────────────────────── def seed_all(config: LoaderConfig = DEFAULT_CONFIG): """ Full seed sequence: 1. Load budgets (POST /api/v1/budgets, one at a time) 2. Load actuals (POST /api/v1/actuals/ingest, batched) 3. Spot-check variance report """ print(f"\n🚀 FP&A Seed → {config.base_url}") if config.dry_run: print(" *** DRY RUN — no requests will be sent ***\n") budget_result = load_budgets(config) actuals_result = load_actuals(config) check_variance(config) check_alerts(config) b_errors = len(budget_result.get("errors", [])) a_errors = len(actuals_result.get("errors", [])) print("\n── Seed Summary ─────────────────────────────────") print(f" Budgets : {budget_result['ok']}/{budget_result['total']} ok" + (f" ⚠ {b_errors} errors" if b_errors else " ✅")) print(f" Actuals : {actuals_result['total']} records in " f"{actuals_result['batches']} batches" + (f" ⚠ {a_errors} errors" if a_errors else " ✅")) print() return {"budgets": budget_result, "actuals": actuals_result} # ── CLI ─────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Seed FP&A CSV data into Go API") parser.add_argument("--url", default="http://localhost:8080", help="API base URL") parser.add_argument("--batch", type=int, default=50, help="Actuals batch size") parser.add_argument("--dry-run", action="store_true", help="Print payloads, don't send") parser.add_argument("--token", default=None, help="Bearer auth token") parser.add_argument("--only", choices=["budgets", "actuals", "variance", "alerts"], help="Run only one step instead of full seed") args = parser.parse_args() cfg = LoaderConfig( base_url=args.url, batch_size=args.batch, dry_run=args.dry_run, auth_token=args.token, ) if args.only == "budgets": load_budgets(cfg) elif args.only == "actuals": load_actuals(cfg) elif args.only == "variance": print(json.dumps(check_variance(cfg), indent=2)) elif args.only == "alerts": print(json.dumps(check_alerts(cfg), indent=2)) else: seed_all(cfg)