TenantAtlas/scripts/mcp_gitea_smoke.py
ahmido abda751296 feat(058): tenant dashboard + active-runs gating (#68)
Adds a tenant-scoped dashboard page (KPIs, Needs Attention, Recent Drift Findings, Recent Operations) with polling only while active runs exist.

Guardrails: DB-only render (no outbound HTTP) + tenant isolation.

Tests: ActiveRunsTest, TenantDashboardDbOnlyTest, TenantDashboardTenantScopeTest.
Co-authored-by: Ahmed Darrazi <ahmeddarrazi@adsmac.local>
Reviewed-on: #68
2026-01-21 14:00:42 +00:00

86 lines
2.4 KiB
Python

#!/usr/bin/env python3
import json
import subprocess
import sys
import time
def send(proc: subprocess.Popen[bytes], payload: dict) -> None:
proc.stdin.write((json.dumps(payload) + "\n").encode("utf-8"))
proc.stdin.flush()
def read_line(proc: subprocess.Popen[bytes], timeout: float = 10.0) -> str:
start = time.time()
while time.time() - start < timeout:
line = proc.stdout.readline()
if line:
return line.decode("utf-8", errors="replace").strip()
time.sleep(0.05)
return ""
def main() -> int:
proc = subprocess.Popen(
["python3", "scripts/run-gitea-mcp.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
send(
proc,
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "smoke", "version": "0.0.0"},
},
},
)
init_resp = read_line(proc, timeout=15.0)
if not init_resp:
err = proc.stderr.read(4096).decode("utf-8", errors="replace")
sys.stderr.write("No initialize response. stderr:\n" + err + "\n")
return 1
print("initialize:", init_resp[:400])
send(proc, {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}})
tools_resp = read_line(proc, timeout=15.0)
if not tools_resp:
err = proc.stderr.read(4096).decode("utf-8", errors="replace")
sys.stderr.write("No tools/list response. stderr:\n" + err + "\n")
return 1
print("tools/list:", tools_resp[:400])
send(
proc,
{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": "get_my_user_info", "arguments": {}},
},
)
me_resp = read_line(proc, timeout=20.0)
if not me_resp:
err = proc.stderr.read(4096).decode("utf-8", errors="replace")
sys.stderr.write("No get_my_user_info response. stderr:\n" + err + "\n")
return 1
print("get_my_user_info:", me_resp[:500])
return 0
finally:
proc.terminate()
if __name__ == "__main__":
raise SystemExit(main())