86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
|
|
def send(proc: subprocess.Popen[bytes], payload: dict) -> None:
|
|
proc.stdin.write((json.dumps(payload) + "\n").encode("utf-8"))
|
|
proc.stdin.flush()
|
|
|
|
|
|
def read_line(proc: subprocess.Popen[bytes], timeout: float = 10.0) -> str:
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
line = proc.stdout.readline()
|
|
if line:
|
|
return line.decode("utf-8", errors="replace").strip()
|
|
time.sleep(0.05)
|
|
return ""
|
|
|
|
|
|
def main() -> int:
|
|
proc = subprocess.Popen(
|
|
["python3", "scripts/run-gitea-mcp.py"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
try:
|
|
send(
|
|
proc,
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "initialize",
|
|
"params": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {},
|
|
"clientInfo": {"name": "smoke", "version": "0.0.0"},
|
|
},
|
|
},
|
|
)
|
|
init_resp = read_line(proc, timeout=15.0)
|
|
if not init_resp:
|
|
err = proc.stderr.read(4096).decode("utf-8", errors="replace")
|
|
sys.stderr.write("No initialize response. stderr:\n" + err + "\n")
|
|
return 1
|
|
|
|
print("initialize:", init_resp[:400])
|
|
|
|
send(proc, {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}})
|
|
tools_resp = read_line(proc, timeout=15.0)
|
|
if not tools_resp:
|
|
err = proc.stderr.read(4096).decode("utf-8", errors="replace")
|
|
sys.stderr.write("No tools/list response. stderr:\n" + err + "\n")
|
|
return 1
|
|
|
|
print("tools/list:", tools_resp[:400])
|
|
|
|
send(
|
|
proc,
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"id": 3,
|
|
"method": "tools/call",
|
|
"params": {"name": "get_my_user_info", "arguments": {}},
|
|
},
|
|
)
|
|
me_resp = read_line(proc, timeout=20.0)
|
|
if not me_resp:
|
|
err = proc.stderr.read(4096).decode("utf-8", errors="replace")
|
|
sys.stderr.write("No get_my_user_info response. stderr:\n" + err + "\n")
|
|
return 1
|
|
|
|
print("get_my_user_info:", me_resp[:500])
|
|
return 0
|
|
finally:
|
|
proc.terminate()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|