tester work

This commit is contained in:
samantha42
2026-03-20 22:21:47 +01:00
parent 7e7c3f6bf4
commit 6ad5df839b
3 changed files with 183 additions and 579 deletions

View File

@@ -94,7 +94,7 @@ const (
) )
type VarianceLine struct { type VarianceLine struct {
GLCode string `json:"gl_code"` GLCode string `json:"gl_account_id"`
GLDescription string `json:"gl_description"` GLDescription string `json:"gl_description"`
GLType GLAccountType `json:"gl_type"` GLType GLAccountType `json:"gl_type"`
Budget float64 `json:"budget"` Budget float64 `json:"budget"`

View File

@@ -1,428 +0,0 @@
"""
FP&A API Loader
Reads generated CSV files and loads them into the Go FP&A API.
Real endpoint map (from main.go):
POST /api/v1/budgets ← one budget record per request
PUT /api/v1/budgets/{id} ← update (not used in seeding)
DELETE /api/v1/budgets/{id} ← delete (not used in seeding)
POST /api/v1/actuals/ingest ← bulk actuals ingest
GET /api/v1/variance ← read-only, not loaded
GET /api/v1/variance/alerts ← read-only, not loaded
GET /api/v1/health ← health check
Load order matters:
1. Budgets first — actuals reference budget lines by category+period
2. Actuals second — ingested in bulk against existing budgets
"""
import csv
import json
import time
import os
import requests
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "csv")
# ── Config ────────────────────────────────────────────────────────────────────
@dataclass
class LoaderConfig:
base_url: str = "http://localhost:8080"
batch_size: int = 50 # used for actuals ingest
delay_between_batches: float = 0.05
dry_run: bool = False
auth_token: Optional[str] = None
DEFAULT_CONFIG = LoaderConfig()
# ── CSV reading helpers ───────────────────────────────────────────────────────
INT_FIELDS = {"year", "month"}
FLOAT_FIELDS = {
"budget_amount", "actual_amount", "variance", "variance_pct",
"product_revenue", "service_revenue", "total_revenue",
"cogs_product", "cogs_service", "total_cogs",
"gross_profit", "gross_margin_pct", "total_opex",
"ebitda", "ebitda_margin_pct", "net_income",
"cash_collected_product", "cash_collected_service",
"cash_paid_opex", "cash_paid_cogs", "net_operating_cash_flow",
"capex", "net_investing_cash_flow", "loan_repayment",
"equity_raised", "net_financing_cash_flow",
"net_change_in_cash", "closing_cash_balance",
"annual_salary_budget", "actual_salary_paid_ytd", "headcount_fte",
}
def _coerce(row: Dict[str, str]) -> Dict[str, Any]:
out = {}
for k, v in row.items():
if k in INT_FIELDS:
out[k] = int(v) if v else None
elif k in FLOAT_FIELDS:
out[k] = float(v) if v else None
else:
out[k] = v
return out
def read_csv(filename: str) -> List[Dict[str, Any]]:
path = os.path.join(DATA_DIR, filename)
if not os.path.exists(path):
raise FileNotFoundError(f"CSV not found: {path} (run generators/generate_data.py first)")
with open(path, newline="") as f:
return [_coerce(row) for row in csv.DictReader(f)]
# ── Payload mappers ───────────────────────────────────────────────────────────
# Each mapper takes one CSV row and returns the JSON body your Go handler expects.
# Adjust field names here if your Go structs use different names.
def revenue_row_to_budget(row: Dict) -> Dict:
"""revenue_budget_vs_actuals.csv → POST /api/v1/budgets"""
return {
"category": row["revenue_type"], # "Product" | "Service"
"department": "Revenue",
"period": row["period"], # "2023-01"
"year": row["year"],
"month": row["month"],
"amount": row["budget_amount"],
"currency": "USD",
"notes": f"{row['revenue_type']} revenue budget",
}
def revenue_row_to_actual(row: Dict) -> Dict:
"""revenue_budget_vs_actuals.csv → POST /api/v1/actuals/ingest"""
return {
"category": row["revenue_type"],
"department": "Revenue",
"period": row["period"],
"year": row["year"],
"month": row["month"],
"amount": row["actual_amount"],
"currency": "USD",
"source": "csv_import",
}
def opex_row_to_budget(row: Dict) -> Dict:
"""opex_budget_vs_actuals.csv → POST /api/v1/budgets"""
return {
"category": row["category"], # "Salaries", "Cloud Infrastructure", …
"department": row["department"],
"period": row["period"],
"year": row["year"],
"month": row["month"],
"amount": row["budget_amount"],
"currency": "USD",
"notes": f"{row['department']} opex budget",
}
def opex_row_to_actual(row: Dict) -> Dict:
"""opex_budget_vs_actuals.csv → POST /api/v1/actuals/ingest"""
return {
"category": row["category"],
"department": row["department"],
"period": row["period"],
"year": row["year"],
"month": row["month"],
"amount": row["actual_amount"],
"currency": "USD",
"source": "csv_import",
}
def pl_row_to_actual(row: Dict) -> Dict:
"""
pl_income_statement.csv → POST /api/v1/actuals/ingest
The P&L is a derived/summary view; we ingest key line items as actuals.
Budgets for these are already covered by revenue + opex CSVs.
"""
return {
"category": "P&L Summary",
"department": "Finance",
"period": row["period"],
"year": row["year"],
"month": row["month"],
"amount": row["net_income"],
"currency": "USD",
"source": "csv_import",
"metadata": {
"total_revenue": row["total_revenue"],
"gross_profit": row["gross_profit"],
"gross_margin_pct": row["gross_margin_pct"],
"ebitda": row["ebitda"],
"ebitda_margin_pct": row["ebitda_margin_pct"],
},
}
def cashflow_row_to_actual(row: Dict) -> Dict:
"""cash_flow.csv → POST /api/v1/actuals/ingest"""
return {
"category": "Cash Flow",
"department": "Finance",
"period": row["period"],
"year": row["year"],
"month": row["month"],
"amount": row["net_change_in_cash"],
"currency": "USD",
"source": "csv_import",
"metadata": {
"net_operating_cash_flow": row["net_operating_cash_flow"],
"net_investing_cash_flow": row["net_investing_cash_flow"],
"net_financing_cash_flow": row["net_financing_cash_flow"],
"closing_cash_balance": row["closing_cash_balance"],
"equity_raised": row["equity_raised"],
},
}
def headcount_row_to_actual(row: Dict) -> Dict:
"""headcount_workforce.csv → POST /api/v1/actuals/ingest (active employees only)"""
return {
"category": "Headcount",
"department": row["department"],
"period": row["period"],
"year": row["year"],
"month": row["month"],
"amount": row["actual_salary_paid_ytd"],
"currency": "USD",
"source": "csv_import",
"metadata": {
"employee_id": row["employee_id"],
"role": row["role"],
"status": row["status"],
"fte": row["headcount_fte"],
"annual_salary": row["annual_salary_budget"],
},
}
# ── HTTP helpers ──────────────────────────────────────────────────────────────
def _headers(config: LoaderConfig) -> Dict:
h = {"Content-Type": "application/json"}
if config.auth_token:
h["Authorization"] = f"Bearer {config.auth_token}"
return h
def post_one(url: str, payload: Dict, config: LoaderConfig) -> Dict:
"""Single POST — used for /api/v1/budgets (one record per call)."""
if config.dry_run:
print(f" [DRY RUN] POST {url}")
print(f" {json.dumps(payload, indent=6)}")
return {"status": "dry_run"}
try:
resp = requests.post(url, json=payload, headers=_headers(config), timeout=10)
resp.raise_for_status()
return resp.json() if resp.content else {"status": "ok"}
except requests.exceptions.ConnectionError:
return {"error": "connection_refused"}
except requests.exceptions.HTTPError as e:
return {"error": str(e), "status_code": resp.status_code, "body": resp.text}
except Exception as e:
return {"error": str(e)}
def post_batch(url: str, records: List[Dict], config: LoaderConfig) -> Dict:
"""Batch POST — used for /api/v1/actuals/ingest."""
if config.dry_run:
print(f" [DRY RUN] POST {url} ({len(records)} records)")
print(f" Sample: {json.dumps(records[0], indent=6)}")
return {"status": "dry_run", "count": len(records)}
try:
resp = requests.post(url, json={"records": records},
headers=_headers(config), timeout=30)
resp.raise_for_status()
return resp.json() if resp.content else {"status": "ok"}
except requests.exceptions.ConnectionError:
return {"error": "connection_refused"}
except requests.exceptions.HTTPError as e:
return {"error": str(e), "status_code": resp.status_code, "body": resp.text}
except Exception as e:
return {"error": str(e)}
# ── Budget loader: POST /api/v1/budgets ───────────────────────────────────────
def load_budgets(config: LoaderConfig = DEFAULT_CONFIG) -> Dict:
"""
Loads budget rows from revenue + opex CSVs.
Each row is a separate POST (budget records are individual, not batched).
Deduplicated by (category, department, period) to avoid double-posting.
"""
url = config.base_url.rstrip("/") + "/api/v1/budgets"
budget_rows = []
for filename, mapper in [
("revenue_budget_vs_actuals.csv", revenue_row_to_budget),
("opex_budget_vs_actuals.csv", opex_row_to_budget),
]:
rows = read_csv(filename)
budget_rows.extend(mapper(r) for r in rows)
# Deduplicate: last write wins for same category+dept+period
seen = {}
for row in budget_rows:
key = (row["category"], row["department"], row["period"])
seen[key] = row
unique = list(seen.values())
print(f"\n📋 Loading budgets → {url}")
print(f" {len(unique)} unique budget lines")
results = {"total": len(unique), "ok": 0, "errors": []}
for i, payload in enumerate(unique, 1):
result = post_one(url, payload, config)
if "error" in result:
results["errors"].append({**result, "payload": payload})
if len(results["errors"]) <= 3: # don't flood the console
print(f" ⚠ [{i}/{len(unique)}] {result['error']}")
else:
results["ok"] += 1
if i % 50 == 0 or i == len(unique):
print(f"{i}/{len(unique)} budgets posted")
time.sleep(0.01) # light throttle for individual POSTs
return results
# ── Actuals loader: POST /api/v1/actuals/ingest ───────────────────────────────
def load_actuals(config: LoaderConfig = DEFAULT_CONFIG) -> Dict:
"""
Collects actuals from all CSV sources and bulk-ingests them.
Filters headcount to Active employees only to avoid salary double-counting.
"""
url = config.base_url.rstrip("/") + "/api/v1/actuals/ingest"
all_actuals = []
# Revenue actuals
for row in read_csv("revenue_budget_vs_actuals.csv"):
all_actuals.append(revenue_row_to_actual(row))
# Opex actuals
for row in read_csv("opex_budget_vs_actuals.csv"):
all_actuals.append(opex_row_to_actual(row))
# P&L summary actuals
for row in read_csv("pl_income_statement.csv"):
all_actuals.append(pl_row_to_actual(row))
# Cash flow actuals
for row in read_csv("cash_flow.csv"):
all_actuals.append(cashflow_row_to_actual(row))
# Headcount actuals — active employees only, one record per employee per month
headcount_seen = set()
for row in read_csv("headcount_workforce.csv"):
if row["status"] != "Active":
continue
key = (row["employee_id"], row["period"])
if key in headcount_seen:
continue
headcount_seen.add(key)
all_actuals.append(headcount_row_to_actual(row))
print(f"\n📥 Loading actuals → {url}")
print(f" {len(all_actuals)} records | batch size {config.batch_size}")
results = {"total": len(all_actuals), "batches": 0, "errors": []}
for i in range(0, len(all_actuals), config.batch_size):
batch = all_actuals[i : i + config.batch_size]
result = post_batch(url, batch, config)
results["batches"] += 1
if "error" in result:
results["errors"].append(result)
print(f" ⚠ batch {results['batches']}: {result['error']}")
else:
print(f" ✓ batch {results['batches']} ({len(batch)} records)")
time.sleep(config.delay_between_batches)
return results
# ── Variance check: GET /api/v1/variance ──────────────────────────────────────
def check_variance(config: LoaderConfig = DEFAULT_CONFIG, period: Optional[str] = None):
"""
Calls the variance report after loading to verify data landed correctly.
"""
url = config.base_url.rstrip("/") + "/api/v1/variance"
params = {"period": period} if period else {}
print(f"\n📊 Fetching variance report → {url}")
if config.dry_run:
print(" [DRY RUN] skipped")
return {}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
print(f"{len(data) if isinstance(data, list) else 1} variance entries returned")
return data
except Exception as e:
print(f"{e}")
return {"error": str(e)}
def check_alerts(config: LoaderConfig = DEFAULT_CONFIG):
"""Calls /api/v1/variance/alerts — shows any over/under budget flags."""
url = config.base_url.rstrip("/") + "/api/v1/variance/alerts"
print(f"\n🚨 Fetching variance alerts → {url}")
if config.dry_run:
print(" [DRY RUN] skipped")
return {}
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
count = len(data) if isinstance(data, list) else "?"
print(f"{count} alerts")
return data
except Exception as e:
print(f"{e}")
return {"error": str(e)}
# ── Full seed run ─────────────────────────────────────────────────────────────
def seed_all(config: LoaderConfig = DEFAULT_CONFIG):
"""
Full seed sequence:
1. Load budgets (POST /api/v1/budgets, one at a time)
2. Load actuals (POST /api/v1/actuals/ingest, batched)
3. Spot-check variance report
"""
print(f"\n🚀 FP&A Seed → {config.base_url}")
if config.dry_run:
print(" *** DRY RUN — no requests will be sent ***\n")
budget_result = load_budgets(config)
actuals_result = load_actuals(config)
check_variance(config)
check_alerts(config)
b_errors = len(budget_result.get("errors", []))
a_errors = len(actuals_result.get("errors", []))
print("\n── Seed Summary ─────────────────────────────────")
print(f" Budgets : {budget_result['ok']}/{budget_result['total']} ok"
+ (f"{b_errors} errors" if b_errors else ""))
print(f" Actuals : {actuals_result['total']} records in "
f"{actuals_result['batches']} batches"
+ (f"{a_errors} errors" if a_errors else ""))
print()
return {"budgets": budget_result, "actuals": actuals_result}
# ── CLI ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Seed FP&A CSV data into Go API")
parser.add_argument("--url", default="http://localhost:8080", help="API base URL")
parser.add_argument("--batch", type=int, default=50, help="Actuals batch size")
parser.add_argument("--dry-run", action="store_true", help="Print payloads, don't send")
parser.add_argument("--token", default=None, help="Bearer auth token")
parser.add_argument("--only", choices=["budgets", "actuals", "variance", "alerts"],
help="Run only one step instead of full seed")
args = parser.parse_args()
cfg = LoaderConfig(
base_url=args.url,
batch_size=args.batch,
dry_run=args.dry_run,
auth_token=args.token,
)
if args.only == "budgets":
load_budgets(cfg)
elif args.only == "actuals":
load_actuals(cfg)
elif args.only == "variance":
print(json.dumps(check_variance(cfg), indent=2))
elif args.only == "alerts":
print(json.dumps(check_alerts(cfg), indent=2))
else:
seed_all(cfg)

View File

@@ -136,19 +136,24 @@ BUDGET_VERSION = "v1" # matches BudgetVersion enum in your Go service
DEPARTMENT_IDS: dict = {} DEPARTMENT_IDS: dict = {}
GL_ACCOUNTS: dict = {} # category name → {"id": int, "code": str} GL_ACCOUNTS: dict = {} # category name → {"id": int, "code": str}
# ── Department definitions ────────────────────────────────────────────────────
# Matches model.Department: id, code, name, cost_center (no active field)
_DEPARTMENT_DEFS = [ _DEPARTMENT_DEFS = [
# code, name, cost_center — matches departments table exactly {"code": "REV", "name": "Revenue", "cost_center": "CC-100"},
{"code": "REV", "name": "Revenue", "cost_center": "CC-100", "active": True}, {"code": "ENG", "name": "Engineering", "cost_center": "CC-200"},
{"code": "ENG", "name": "Engineering", "cost_center": "CC-200", "active": True}, {"code": "SAL", "name": "Sales", "cost_center": "CC-300"},
{"code": "SAL", "name": "Sales", "cost_center": "CC-300", "active": True}, {"code": "MKT", "name": "Marketing", "cost_center": "CC-400"},
{"code": "MKT", "name": "Marketing", "cost_center": "CC-400", "active": True}, {"code": "OPS", "name": "Operations", "cost_center": "CC-500"},
{"code": "OPS", "name": "Operations", "cost_center": "CC-500", "active": True}, {"code": "FIN", "name": "Finance", "cost_center": "CC-600"},
{"code": "FIN", "name": "Finance", "cost_center": "CC-600", "active": True},
] ]
# ── GL account definitions ────────────────────────────────────────────────────
# Matches model.GLAccount: id, code, description, type (GLAccountType), favour_high
# GLAccountType values: "revenue" | "cogs" | "opex" | "capex" | "headcount"
# favour_high=True → beating budget is favourable (revenue accounts)
_GL_ACCOUNT_DEFS = [ _GL_ACCOUNT_DEFS = [
# code, description, type (revenue|cogs|opex|capex|headcount), favour_high
# favour_high=True means over-budget is good (i.e. revenue beating target)
# Revenue # Revenue
{"code": "4000", "description": "SaaS Subscription Revenue", "type": "revenue", "favour_high": True}, {"code": "4000", "description": "SaaS Subscription Revenue", "type": "revenue", "favour_high": True},
{"code": "4100", "description": "Professional Services Revenue","type": "revenue", "favour_high": True}, {"code": "4100", "description": "Professional Services Revenue","type": "revenue", "favour_high": True},
@@ -161,7 +166,7 @@ _GL_ACCOUNT_DEFS = [
{"code": "6200", "description": "Travel and Expenses", "type": "opex", "favour_high": False}, {"code": "6200", "description": "Travel and Expenses", "type": "opex", "favour_high": False},
{"code": "6300", "description": "Marketing and Paid Media", "type": "opex", "favour_high": False}, {"code": "6300", "description": "Marketing and Paid Media", "type": "opex", "favour_high": False},
{"code": "6400", "description": "Cloud Infrastructure", "type": "opex", "favour_high": False}, {"code": "6400", "description": "Cloud Infrastructure", "type": "opex", "favour_high": False},
{"code": "6500", "description": "Contractors and Freelancers","type": "opex", "favour_high": False}, {"code": "6500", "description": "Contractors and Freelancers", "type": "opex", "favour_high": False},
{"code": "6600", "description": "Office and Facilities", "type": "opex", "favour_high": False}, {"code": "6600", "description": "Office and Facilities", "type": "opex", "favour_high": False},
# Capex # Capex
{"code": "7000", "description": "Capital Expenditure", "type": "capex", "favour_high": False}, {"code": "7000", "description": "Capital Expenditure", "type": "capex", "favour_high": False},
@@ -169,7 +174,9 @@ _GL_ACCOUNT_DEFS = [
{"code": "9200", "description": "Headcount Cost", "type": "headcount", "favour_high": False}, {"code": "9200", "description": "Headcount Cost", "type": "headcount", "favour_high": False},
] ]
# Maps CSV category names → GL codes so the seeder can find the right account # ── CSV category → GL code mapping ───────────────────────────────────────────
# Translates CSV column values into the gl_code field used by IngestActualsRequest
_CSV_CATEGORY_TO_GL_CODE = { _CSV_CATEGORY_TO_GL_CODE = {
"Product": "4000", "Product": "4000",
"Service": "4100", "Service": "4100",
@@ -180,20 +187,21 @@ _CSV_CATEGORY_TO_GL_CODE = {
"Cloud Infrastructure": "6400", "Cloud Infrastructure": "6400",
"Contractors": "6500", "Contractors": "6500",
"Office & Facilities": "6600", "Office & Facilities": "6600",
"P&L Summary": "4000", # roll up to revenue GL for summary lines "P&L Summary": "4000",
"Cash Flow": "7000", "Cash Flow": "7000",
"Headcount": "9200", "Headcount": "9200",
} }
def seed_reference_data() -> Optional[str]: def seed_reference_data() -> Optional[str]:
""" """
POST all departments and GL accounts to the API and store returned IDs. POST all departments and GL accounts to the API and cache returned IDs.
Returns an error string on first failure, or None on success.
Safe to call multiple times — Go side uses ON CONFLICT(code) upsert.
Populates: Populates at runtime:
DEPARTMENT_IDS — {"Engineering": 3, ...} keyed by department name DEPARTMENT_IDS — {"Engineering": 3, ...} keyed by dept name
GL_ACCOUNTS — {"4000": {"id": 1, "code": "4000"}, ...} keyed by GL code GL_ACCOUNTS — {"4000": {"id": 1, "code": "4000"}} keyed by GL code
Safe to call multiple times — Go side uses ON CONFLICT(code) upsert.
Returns an error string on first failure, None on success.
""" """
for defn in _DEPARTMENT_DEFS: for defn in _DEPARTMENT_DEFS:
status, body = http_post(f"{API_BASE}/api/v1/departments", defn) status, body = http_post(f"{API_BASE}/api/v1/departments", defn)
@@ -204,27 +212,23 @@ def seed_reference_data() -> Optional[str]:
for defn in _GL_ACCOUNT_DEFS: for defn in _GL_ACCOUNT_DEFS:
status, body = http_post(f"{API_BASE}/api/v1/gl-accounts", defn) status, body = http_post(f"{API_BASE}/api/v1/gl-accounts", defn)
if status not in (200, 201): if status not in (200, 201):
return f"POST /gl-accounts '{defn['code']}' failed ({status}): {body}" return f"POST /gl-accounts code='{defn['code']}' failed ({status}): {body}"
GL_ACCOUNTS[defn["code"]] = {"id": body["id"], "code": defn["code"]} GL_ACCOUNTS[defn["code"]] = {"id": body["id"], "code": defn["code"]}
return None # success return None
def gl(category: str) -> dict: def gl(category: str) -> dict:
""" """Resolve a CSV category name to {"id": int, "code": str} via _CSV_CATEGORY_TO_GL_CODE."""
Resolve a CSV category name to a seeded GL account dict {"id": int, "code": str}.
Looks up via _CSV_CATEGORY_TO_GL_CODE then into the runtime GL_ACCOUNTS map.
"""
code = _CSV_CATEGORY_TO_GL_CODE.get(category) code = _CSV_CATEGORY_TO_GL_CODE.get(category)
if code is None: if code is None:
raise AssertionError( raise AssertionError(
f"No GL code mapping for CSV category '{category}'. " f"No GL code for CSV category '{category}'. Add it to _CSV_CATEGORY_TO_GL_CODE."
f"Add it to _CSV_CATEGORY_TO_GL_CODE."
) )
acct = GL_ACCOUNTS.get(code) acct = GL_ACCOUNTS.get(code)
if acct is None: if acct is None:
raise AssertionError( raise AssertionError(
f"GL code '{code}' (for '{category}') not in runtime map — " f"GL '{code}' (for '{category}') missing — call seed_reference_data() first. "
f"was seed_reference_data() called? Known codes: {list(GL_ACCOUNTS.keys())}" f"Known: {list(GL_ACCOUNTS.keys())}"
) )
return acct return acct
@@ -288,12 +292,11 @@ def suite_revenue() -> Suite:
assert_true(err is None, err or "seed_reference_data failed") assert_true(err is None, err or "seed_reference_data failed")
s.run("POST /api/v1/departments + /api/v1/gl-accounts (seed FK parents)", seed_refs) s.run("POST /api/v1/departments + /api/v1/gl-accounts (seed FK parents)", seed_refs)
# ── Step 3: POST budgets ────────────────────────────────────────────────── # ── Step 3: POST budgets (CreateBudgetRequest) ───────────────────────────
# One CreateBudgetRequest per CSV row. # version uses BudgetVersion consts: "original" | "forecast_1" | "forecast_2"
# We only post the first period (2023-01) for both types to keep the # department_id and gl_account_id are the integer IDs returned by seed step.
# test focused; the loader handles the full dataset.
budget_ids: dict[str, int] = {} # key: "revenue_type:period" → returned id budget_ids: dict = {} # "revenue_type:period" → returned Budget.id
def post_product_budget(): def post_product_budget():
row = next(r for r in rows row = next(r for r in rows
@@ -302,7 +305,7 @@ def suite_revenue() -> Suite:
payload = { payload = {
"fiscal_year": fy, "fiscal_year": fy,
"fiscal_period": fp, "fiscal_period": fp,
"version": BUDGET_VERSION, "version": "original", # model.VersionOriginal
"department_id": DEPARTMENT_IDS["Revenue"], "department_id": DEPARTMENT_IDS["Revenue"],
"gl_account_id": gl("Product")["id"], "gl_account_id": gl("Product")["id"],
"amount": float(row["budget_amount"]), "amount": float(row["budget_amount"]),
@@ -313,11 +316,10 @@ def suite_revenue() -> Suite:
status, body = http_post(f"{API_BASE}/api/v1/budgets", payload) status, body = http_post(f"{API_BASE}/api/v1/budgets", payload)
assert_true(status in (200, 201), assert_true(status in (200, 201),
f"POST /budgets failed ({status}): {body}") f"POST /budgets failed ({status}): {body}")
budget_id = body.get("id") assert_true(body.get("id") is not None, "Budget response missing 'id'")
assert_true(budget_id is not None, "Response missing 'id' field") budget_ids["Product:2023-01"] = body["id"]
budget_ids["Product:2023-01"] = budget_id
s.run("POST /api/v1/budgets — Product revenue 2023-01", post_product_budget) s.run("POST /api/v1/budgets — Product revenue 2023-01 (version=original)", post_product_budget)
def post_service_budget(): def post_service_budget():
row = next(r for r in rows row = next(r for r in rows
@@ -326,7 +328,7 @@ def suite_revenue() -> Suite:
payload = { payload = {
"fiscal_year": fy, "fiscal_year": fy,
"fiscal_period": fp, "fiscal_period": fp,
"version": BUDGET_VERSION, "version": "original",
"department_id": DEPARTMENT_IDS["Revenue"], "department_id": DEPARTMENT_IDS["Revenue"],
"gl_account_id": gl("Service")["id"], "gl_account_id": gl("Service")["id"],
"amount": float(row["budget_amount"]), "amount": float(row["budget_amount"]),
@@ -337,118 +339,151 @@ def suite_revenue() -> Suite:
status, body = http_post(f"{API_BASE}/api/v1/budgets", payload) status, body = http_post(f"{API_BASE}/api/v1/budgets", payload)
assert_true(status in (200, 201), assert_true(status in (200, 201),
f"POST /budgets failed ({status}): {body}") f"POST /budgets failed ({status}): {body}")
budget_ids["Service:2023-01"] = body.get("id") assert_true(body.get("id") is not None, "Budget response missing 'id'")
budget_ids["Service:2023-01"] = body["id"]
s.run("POST /api/v1/budgets — Service revenue 2023-01", post_service_budget) s.run("POST /api/v1/budgets — Service revenue 2023-01 (version=original)", post_service_budget)
# ── Step 4: POST actuals ────────────────────────────────────────────────── # ── Step 4: POST actuals (IngestActualsRequest) ───────────────────────────
# Ingest actual amounts for the same period so variance can be computed. # IngestActualsRequest uses dept_code + gl_code strings (not integer IDs).
# The Go service resolves them to IDs internally.
ingested_actuals: list[dict] = []
def post_actuals(): def post_actuals():
period_rows = [r for r in rows if r["period"] == "2023-01"] period_rows = [r for r in rows if r["period"] == "2023-01"]
records = []
for r in period_rows: for r in period_rows:
fy, fp = period_to_fiscal(r["period"]) fy, fp = period_to_fiscal(r["period"])
account = gl(r["revenue_type"]) account = gl(r["revenue_type"])
records.append({ # IngestActualsRequest shape — matches model exactly
payload = {
"fiscal_year": fy, "fiscal_year": fy,
"fiscal_period": fp, "fiscal_period": fp,
"department_id": DEPARTMENT_IDS["Revenue"], "dept_code": "REV", # Department.Code for Revenue
"gl_account_id": account["id"], "gl_code": account["code"], # GLAccount.Code e.g. "4000"
"gl_code": account["code"],
"amount": float(r["actual_amount"]), "amount": float(r["actual_amount"]),
"currency": "USD", "currency": "USD",
"source": "test_suite_csv", "source": "test_suite_csv",
}) }
status, body = http_post(f"{API_BASE}/api/v1/actuals/ingest", payload)
status, body = http_post(f"{API_BASE}/api/v1/actuals/ingest",
{"records": records})
assert_true(status in (200, 201), assert_true(status in (200, 201),
f"POST /actuals/ingest failed ({status}): {body}") f"POST /actuals/ingest gl={account['code']} failed ({status}): {body}")
# Store returned actuals for later assertions
returned = body.get("actuals") or body.get("records") or []
ingested_actuals.extend(returned)
s.run("POST /api/v1/actuals/ingest — Product + Service 2023-01", post_actuals) s.run("POST /api/v1/actuals/ingest — Product + Service 2023-01", post_actuals)
# ── Step 5: GET variance and verify numbers ─────────────────────────────── # ── Step 5: GET variance and verify VarianceReport shape ──────────────────
# Response: VarianceReport { department, fiscal_year, fiscal_period, version,
# currency, lines: []VarianceLine, total_budget, total_actual, total_variance, total_variance_pct }
# VarianceLine: { gl_account_id (actually gl_code string), gl_description, gl_type,
# budget, actual, variance_abs, variance_pct, status, currency }
variance_data: list[dict] = [] report: dict = {}
def fetch_variance(): def fetch_variance():
url = f"{API_BASE}/api/v1/variance?fiscal_year=2023&fiscal_period=1" url = (f"{API_BASE}/api/v1/variance"
f"?fiscal_year=2023&fiscal_period=1&dept_code=REV&version=original")
status, body = http_get(url) status, body = http_get(url)
assert_true(status == 200, f"GET /variance failed ({status}): {body}") assert_true(status == 200, f"GET /variance failed ({status}): {body}")
# API may return a list directly or wrapped in a key
items = body if isinstance(body, list) else body.get("variance") or body.get("data") or []
assert_true(len(items) > 0, "Variance response is empty — data may not have landed")
variance_data.extend(items)
s.run("GET /api/v1/variance returns entries for 2023-01", fetch_variance) # Response is a VarianceReport object (not a list)
assert_true(isinstance(body, dict), f"Expected VarianceReport object, got: {type(body)}")
assert_true("lines" in body,
f"VarianceReport missing 'lines' key. Got keys: {list(body.keys())}")
assert_true(len(body["lines"]) > 0, "VarianceReport.lines is empty")
report.update(body)
def verify_product_variance(): s.run("GET /api/v1/variance returns VarianceReport for REV dept 2023-01", fetch_variance)
def verify_report_top_level():
assert_eq(report.get("fiscal_year"), 2023, "fiscal_year mismatch")
assert_eq(report.get("fiscal_period"), 1, "fiscal_period mismatch")
assert_eq(report.get("version"), "original", "version mismatch")
assert_true(report.get("department") is not None, "department field missing")
assert_true(report.get("currency") is not None, "currency field missing")
assert_true(report.get("total_budget") is not None, "total_budget field missing")
assert_true(report.get("total_actual") is not None, "total_actual field missing")
s.run("VarianceReport top-level fields present", verify_report_top_level)
def verify_variance_lines():
lines = report.get("lines", [])
# VarianceLine fields: gl_account_id (gl_code string), gl_description,
# gl_type, budget, actual, variance_abs, variance_pct, status, currency
for line in lines:
assert_true("gl_account_id" in line, f"VarianceLine missing gl_account_id: {line}")
assert_true("gl_description" in line, f"VarianceLine missing gl_description: {line}")
assert_true("gl_type" in line, f"VarianceLine missing gl_type: {line}")
assert_true("budget" in line, f"VarianceLine missing budget: {line}")
assert_true("actual" in line, f"VarianceLine missing actual: {line}")
assert_true("variance_abs" in line, f"VarianceLine missing variance_abs: {line}")
assert_true("status" in line, f"VarianceLine missing status: {line}")
# status must be a valid VarianceStatus
assert_true(line["status"] in ("favourable", "unfavourable", "on_budget"),
f"Unknown VarianceStatus: {line['status']}")
s.run("VarianceLine fields and status values valid", verify_variance_lines)
def verify_product_line():
csv_row = next(r for r in rows csv_row = next(r for r in rows
if r["revenue_type"] == "Product" and r["period"] == "2023-01") if r["revenue_type"] == "Product" and r["period"] == "2023-01")
expected_budget = float(csv_row["budget_amount"]) expected_budget = float(csv_row["budget_amount"])
expected_actual = float(csv_row["actual_amount"]) expected_actual = float(csv_row["actual_amount"])
expected_variance = float(csv_row["variance"]) expected_var = float(csv_row["variance"])
# Find the matching variance entry by GL account id # VarianceLine.gl_account_id holds the GL code string (e.g. "4000")
entry = next( line = next((l for l in report["lines"]
(v for v in variance_data if l.get("gl_account_id") == gl("Product")["code"]), None)
if v.get("gl_account_id") == gl("Product")["id"] assert_true(line is not None,
or v.get("gl_code") == gl("Product")["code"]), f"No VarianceLine for Product (gl_code={gl('Product')['code']}). "
None f"Lines: {[l.get('gl_account_id') for l in report['lines']]}")
)
assert_true(entry is not None,
f"No variance entry found for Product (GL {gl('Product')['id']})")
api_budget = float(entry.get("budget_amount") or entry.get("budget") or 0) assert_true(abs(float(line["budget"]) - expected_budget) < 1.0,
api_actual = float(entry.get("actual_amount") or entry.get("actual") or 0) f"budget mismatch: got {line['budget']} expected {expected_budget:.2f}")
api_variance = float(entry.get("variance") or entry.get("variance_amount") or 0) assert_true(abs(float(line["actual"]) - expected_actual) < 1.0,
f"actual mismatch: got {line['actual']} expected {expected_actual:.2f}")
assert_true(abs(float(line["variance_abs"]) - abs(expected_var)) < 1.0,
f"variance_abs mismatch: got {line['variance_abs']} expected {abs(expected_var):.2f}")
# Revenue account: favour_high=True, so over-actual should be favourable
if float(csv_row["actual_amount"]) >= float(csv_row["budget_amount"]):
assert_eq(line["status"], "favourable",
f"Revenue over-budget should be favourable, got {line['status']}")
assert_true(abs(api_budget - expected_budget) < 1.0, s.run("VarianceLine values match CSV for Product 2023-01", verify_product_line)
f"Budget mismatch: API={api_budget} CSV={expected_budget}")
assert_true(abs(api_actual - expected_actual) < 1.0,
f"Actual mismatch: API={api_actual} CSV={expected_actual}")
assert_true(abs(api_variance - expected_variance) < 1.0,
f"Variance mismatch: API={api_variance} CSV={expected_variance}")
s.run("Variance amounts match CSV for Product 2023-01", verify_product_variance) def verify_totals():
# total_budget and total_actual should be sum of the lines
line_budget_sum = sum(float(l["budget"]) for l in report["lines"])
line_actual_sum = sum(float(l["actual"]) for l in report["lines"])
assert_true(abs(float(report["total_budget"]) - line_budget_sum) < 1.0,
f"total_budget {report['total_budget']} != sum of lines {line_budget_sum:.2f}")
assert_true(abs(float(report["total_actual"]) - line_actual_sum) < 1.0,
f"total_actual {report['total_actual']} != sum of lines {line_actual_sum:.2f}")
def verify_service_variance(): s.run("VarianceReport totals equal sum of lines", verify_totals)
csv_row = next(r for r in rows
if r["revenue_type"] == "Service" and r["period"] == "2023-01")
expected_budget = float(csv_row["budget_amount"])
expected_actual = float(csv_row["actual_amount"])
expected_variance = float(csv_row["variance"])
entry = next( # ── Step 6: GET /variance/alerts and verify AlertThreshold shape ──────────
(v for v in variance_data # AlertThreshold: { gl_code, description, budget, actual, variance_pct, status, department }
if v.get("gl_account_id") == gl("Service")["id"]
or v.get("gl_code") == gl("Service")["code"]),
None
)
assert_true(entry is not None,
f"No variance entry found for Service (GL {gl('Service')['id']})")
api_budget = float(entry.get("budget_amount") or entry.get("budget") or 0) def check_alerts():
api_actual = float(entry.get("actual_amount") or entry.get("actual") or 0) url = f"{API_BASE}/api/v1/variance/alerts?fiscal_year=2023&fiscal_period=1"
api_variance = float(entry.get("variance") or entry.get("variance_amount") or 0) status, body = http_get(url)
assert_true(status == 200, f"GET /variance/alerts failed ({status}): {body}")
assert_true(abs(api_budget - expected_budget) < 1.0, alerts = body if isinstance(body, list) else body.get("alerts", [])
f"Budget mismatch: API={api_budget} CSV={expected_budget}") # Alerts may be empty if nothing breaches threshold — just validate shape if any exist
assert_true(abs(api_actual - expected_actual) < 1.0, for alert in alerts:
f"Actual mismatch: API={api_actual} CSV={expected_actual}") assert_true("gl_code" in alert, f"AlertThreshold missing gl_code: {alert}")
assert_true(abs(api_variance - expected_variance) < 1.0, assert_true("description" in alert, f"AlertThreshold missing description: {alert}")
f"Variance mismatch: API={api_variance} CSV={expected_variance}") assert_true("budget" in alert, f"AlertThreshold missing budget: {alert}")
assert_true("actual" in alert, f"AlertThreshold missing actual: {alert}")
assert_true("variance_pct" in alert, f"AlertThreshold missing variance_pct: {alert}")
assert_true("status" in alert, f"AlertThreshold missing status: {alert}")
assert_true("department" in alert, f"AlertThreshold missing department: {alert}")
assert_true(alert["status"] in ("favourable", "unfavourable", "on_budget"),
f"Unknown alert status: {alert['status']}")
s.run("Variance amounts match CSV for Service 2023-01", verify_service_variance) s.run("GET /api/v1/variance/alerts returns valid AlertThreshold list", check_alerts)
# ── Step 6: update a budget and verify variance shifts ──────────────────── # ── Step 7: update budget to forecast_1 and verify variance shifts ────────
# PUT /api/v1/budgets/{id} with version="forecast_1" (model.VersionForecast1)
def update_and_reverify(): def update_and_reverify():
budget_id = budget_ids.get("Product:2023-01") budget_id = budget_ids.get("Product:2023-01")
@@ -457,7 +492,7 @@ def suite_revenue() -> Suite:
csv_row = next(r for r in rows csv_row = next(r for r in rows
if r["revenue_type"] == "Product" and r["period"] == "2023-01") if r["revenue_type"] == "Product" and r["period"] == "2023-01")
revised_amount = float(csv_row["budget_amount"]) * 1.10 # +10% revision revised_amount = float(csv_row["budget_amount"]) * 1.10 # +10% reforecast
fy, fp = period_to_fiscal("2023-01") fy, fp = period_to_fiscal("2023-01")
status, body = http_put( status, body = http_put(
@@ -465,36 +500,33 @@ def suite_revenue() -> Suite:
{ {
"fiscal_year": fy, "fiscal_year": fy,
"fiscal_period": fp, "fiscal_period": fp,
"version": BUDGET_VERSION, "version": "forecast_1", # model.VersionForecast1
"department_id": DEPARTMENT_IDS["Revenue"], "department_id": DEPARTMENT_IDS["Revenue"],
"gl_account_id": gl("Product")["id"], "gl_account_id": gl("Product")["id"],
"amount": revised_amount, "amount": revised_amount,
"currency": "USD", "currency": "USD",
"notes": "revised +10% mid-cycle", "notes": "Q1 reforecast +10%",
"created_by": "test_suite", "created_by": "test_suite",
} }
) )
assert_true(status in (200, 204), assert_true(status in (200, 204),
f"PUT /budgets/{budget_id} failed ({status}): {body}") f"PUT /budgets/{budget_id} failed ({status}): {body}")
# Re-fetch variance — the variance amount should now reflect the new budget # Re-fetch with version=forecast_1 — budget should reflect revision
url = f"{API_BASE}/api/v1/variance?fiscal_year=2023&fiscal_period=1" url = (f"{API_BASE}/api/v1/variance"
f"?fiscal_year=2023&fiscal_period=1&dept_code=REV&version=forecast_1")
status2, body2 = http_get(url) status2, body2 = http_get(url)
assert_true(status2 == 200, f"Re-fetch variance failed ({status2})") assert_true(status2 == 200, f"Re-fetch forecast_1 variance failed ({status2}): {body2}")
items = body2 if isinstance(body2, list) else body2.get("variance") or body2.get("data") or [] lines = body2.get("lines", []) if isinstance(body2, dict) else []
entry = next( line = next((l for l in lines
(v for v in items if v.get("gl_account_id") == gl("Product")["id"] if l.get("gl_account_id") == gl("Product")["code"]), None)
or v.get("gl_code") == gl("Product")["code"]), assert_true(line is not None,
None "Product line missing from forecast_1 variance report after update")
) assert_true(abs(float(line["budget"]) - revised_amount) < 1.0,
assert_true(entry is not None, "Product variance entry missing after budget update") f"forecast_1 budget not reflected: got {line['budget']} expected {revised_amount:.2f}")
api_budget = float(entry.get("budget_amount") or entry.get("budget") or 0) s.run("PUT /api/v1/budgets/{id} version=forecast_1 — variance shifts correctly", update_and_reverify)
assert_true(abs(api_budget - revised_amount) < 1.0,
f"Updated budget not reflected: API={api_budget} expected={revised_amount:.2f}")
s.run("PUT /api/v1/budgets/{id} — variance reflects revised budget", update_and_reverify)
return s return s