This commit is contained in:
Xory 2025-11-17 23:00:04 +02:00
parent 43a0b1c1d1
commit 7304b33b7c
5 changed files with 312 additions and 27 deletions

View file

@ -1,49 +1,129 @@
use futures_util::stream::StreamExt;
use skylink::{Command, DnxParams, PayloadType};
use futures_util::{SinkExt, stream::StreamExt};
use skylink::WsTx;
use skylink::lib::logger::{LogLevel, log};
use skylink::{LOG_PATH, WS_URL};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::protocol::Message;
// This is the type for the SENDER half of the WebSocket stream.
// It will be the shared state.
// Some parts of this function were generated by an LLM. I'm taking note of this in case a
// weird barely detectable bug pops up, as LLMs tend to generate.
async fn websocket_handler() {
async fn websocket_handler(ws_tx: WsTx) {
use std::time::Duration;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;
let url = "ws://127.0.0.1:8080";
loop {
match connect_async(url).await {
Ok(ws_stream_tuple) => {
println!("[i] Connected via websocket."); // TODO Use logger over println
let (mut ws_stream, _) = ws_stream_tuple;
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(Message::Text(text)) => {
println!("{}", &text);
let connection_result = connect_async(WS_URL).await;
let ws_stream = match connection_result {
Ok((stream, _)) => {
log(
LogLevel::Info,
LOG_PATH,
"[ws] WebSocket connection established.".to_string(),
)
.await;
stream
}
Err(e) => {
log(
LogLevel::Warning,
LOG_PATH,
format!("[ws] Failed to connect: {:?}. Retrying in 5s....", e)
)
.await;
tokio::time::sleep(Duration::from_secs(5)).await;
continue; // Go to the next iteration of the loop to retry.
}
};
let (ws_send, mut ws_recv) = ws_stream.split();
{
let mut unlocked = ws_tx.lock().await;
*unlocked = Some(ws_send);
}
while let Some(msg) = ws_recv.next().await {
match msg {
// break is used to trigger reconnect.
Ok(Message::Text(text)) => {
log(
LogLevel::Debug,
LOG_PATH,
format!("[ws] Received text: {}", &text)
)
.await;
}
Ok(Message::Close(_)) => {
log(
LogLevel::Warning,
LOG_PATH,
format!("[ws] Received close frame, disconnecting.")
)
.await;
break;
}
Ok(Message::Ping(h)) => {
log(
LogLevel::Debug,
LOG_PATH,
format!("[ws] Received Ping, sending Pong")
)
.await;
let mut unlocked = ws_tx.lock().await;
dbg!(&ws_tx);
dbg!(&unlocked);
match unlocked.as_mut() {
Some(v) => {
if let Err(e) = v.send(Message::Pong(h)).await {
log(LogLevel::Error, LOG_PATH, format!("[ws] Failed to send Pong: {e}, reconnecting.")).await;
break;
}
}
Ok(Message::Close(_)) => {
println!("[i] Disconnected.");
None => {
log(LogLevel::Error, LOG_PATH, format!("[ws] Failed to respond: no sink, reconnecting.")).await;
break;
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
break;
}
_ => {
// Ignore other message types
}
}
}
Err(e) => {
log(
LogLevel::Error,
LOG_PATH,
format!("[ws] Error receiving message: {:?}.", e),
)
.await;
break;
}
_ => { /* Ignore other message types */ }
}
Err(e) => {
eprintln!("[e] Failed to connect: {:?}", e); // TODO logger > println
{
let mut unlocked_ws_tx = ws_tx.lock().await;
*unlocked_ws_tx = None;
}
}
println!("[i] Connection lost, reconnecting in 5 seconds...");
tokio::time::sleep(Duration::from_secs(5)).await;
log(
LogLevel::Error,
LOG_PATH,
format!("[ws] Connection lost.")
)
.await;
// Reconnecting is handled by the loop
// So, we sleep to avoid spamming the server.
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
websocket_handler().await;
let ws_tx: WsTx = Arc::new(Mutex::new(None));
let ws_tx_for_handler = Arc::clone(&ws_tx);
websocket_handler(ws_tx_for_handler).await;
Ok(())
}