commit c2f9a6e600ad017bd5f0801f8340ff02be5b5cee Author: Xory Date: Mon Dec 22 18:33:55 2025 +0200 init: add code diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4a9b70c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/.vscode +/frp_0.65.0_linux_amd64 +/__pycache__ diff --git a/main.py b/main.py new file mode 100644 index 0000000..6718d1e --- /dev/null +++ b/main.py @@ -0,0 +1,172 @@ +import asyncio +import itertools +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Set + +from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse, StreamingResponse +from pydantic import BaseModel + + +class OutboundPayload(BaseModel): + payload: Any + + +app = FastAPI() + +# In-memory registries +clients: Dict[int, WebSocket] = {} +client_lock = asyncio.Lock() +client_id_seq = itertools.count(1) + +sse_subscribers: Set[asyncio.Queue[str]] = set() +sse_lock = asyncio.Lock() + + +def now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def format_sse_event(data: Dict[str, Any]) -> str: + """Build a Server-Sent Event payload with an optional event name.""" + event_name = data.get("event") + payload = json.dumps(data, separators=(",", ":")) + lines: List[str] = [] + if event_name: + lines.append(f"event: {event_name}") + for line in payload.splitlines(): + lines.append(f"data: {line}") + lines.append("") + return "\n".join(lines) + "\n" + + +async def broadcast_event(event: Dict[str, Any]) -> None: + """Send an event to all SSE subscribers without blocking the websocket loop.""" + message = format_sse_event(event) + async with sse_lock: + for queue in list(sse_subscribers): + try: + queue.put_nowait(message) + except asyncio.QueueFull: + # Should not happen with the default unbounded queues, but guard anyway. + pass + + +@app.get("/", response_class=HTMLResponse) +async def index() -> HTMLResponse: + html_path = Path(__file__).parent / "webui.html" + return HTMLResponse(html_path.read_text(encoding="utf-8")) + + +@app.get("/clients") +async def list_clients() -> Dict[str, List[int]]: + async with client_lock: + return {"clients": sorted(clients.keys())} + + +@app.post("/clients/{client_id}/send") +async def send_to_client(client_id: int, body: OutboundPayload) -> Dict[str, Any]: + async with client_lock: + websocket = clients.get(client_id) + if websocket is None: + raise HTTPException(status_code=404, detail="Client not connected") + + try: + payload_text = json.dumps(body.payload) + except TypeError as exc: # noqa: BLE001 + raise HTTPException( + status_code=422, detail="Payload must be JSON serializable" + ) from exc + + try: + await websocket.send_text(payload_text) + except Exception as exc: # noqa: BLE001 + raise HTTPException( + status_code=500, detail=f"Failed to send to client {client_id}" + ) from exc + + await broadcast_event( + { + "event": "outbound", + "timestamp": now_iso(), + "client_id": client_id, + "payload": body.payload, + } + ) + return {"status": "sent"} + + +@app.get("/events") +async def sse_events() -> StreamingResponse: + queue: asyncio.Queue[str] = asyncio.Queue() + async with sse_lock: + sse_subscribers.add(queue) + + async def event_stream(): + try: + while True: + try: + message = await asyncio.wait_for(queue.get(), timeout=15) + yield message + except asyncio.TimeoutError: + # Keep connection alive + yield "event: keepalive\ndata: {}\n\n" + except asyncio.CancelledError: + # Client disconnected + pass + finally: + async with sse_lock: + sse_subscribers.discard(queue) + + return StreamingResponse(event_stream(), media_type="text/event-stream") + + +@app.websocket("/ws") +async def websocket_handler(websocket: WebSocket) -> None: + await websocket.accept() + client_id = next(client_id_seq) + async with client_lock: + clients[client_id] = websocket + + await broadcast_event( + {"event": "client_connected", "timestamp": now_iso(), "client_id": client_id} + ) + + try: + while True: + message = await websocket.receive_text() + try: + parsed = json.loads(message) + except json.JSONDecodeError: + print("json decode error") + print(message) + parsed = {"raw": message} + + await broadcast_event( + { + "event": "inbound", + "timestamp": now_iso(), + "client_id": client_id, + "payload": parsed, + } + ) + except WebSocketDisconnect: + pass + finally: + async with client_lock: + clients.pop(client_id, None) + await broadcast_event( + { + "event": "client_disconnected", + "timestamp": now_iso(), + "client_id": client_id, + } + ) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bf21f31 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +fastapi==0.110.2 +uvicorn[standard]==0.30.1 diff --git a/webui.html b/webui.html new file mode 100644 index 0000000..41fb5a4 --- /dev/null +++ b/webui.html @@ -0,0 +1,416 @@ + + + + + + Skylink Console + + + + + + + + +
+
+
+ Clients +
+
No clients yet.
+
+
+ + +
+
+
+ +
+
+
+
+
+ Send JSON + No client selected +
+
+ + +
+
+ +
+ waiting for clients... +
+
+ +
+
+
+ Live stream + All inbound/outbound + connect/disconnect +
+ +
+
+
Streaming events will appear here.
+
+
+

Client API reference

+
    +
  • ClientInfo ➜ no arguments
  • +
  • RunCMD ➜ command: string, args: vector of strings
  • +
  • URunCMD ➜ command: string
  • +
  • URunExe ➜ path: string, args: string
  • +
  • Dnx ➜ params: subset containing
  • +
  • ⤷ url: string
  • +
  • ⤷ name: string
  • +
  • ⤷ args: vec of strings
  • +
  • ⤷ run_as_system: boolean
  • +
  • ⤷ file_type: string which is one of Executable, Powershell or Python
  • +
+
+
+
+
+
+ + + +