From 369cfb335fc76809d49d63025307f7fa68ed8f1b Mon Sep 17 00:00:00 2001 From: techmetx11 Date: Sun, 21 Jul 2024 16:28:45 +0100 Subject: [PATCH] Added TCP/IP support Closes #2 --- src/consts.rs | 2 ++ src/main.rs | 93 +++++++++++++++++++++++++-------------------------- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/src/consts.rs b/src/consts.rs index ae6a86d..94c08f5 100644 --- a/src/consts.rs +++ b/src/consts.rs @@ -2,6 +2,8 @@ use lazy_regex::{regex, Lazy}; use regex::Regex; pub static DEFAULT_SOCK_PATH: &str = "/tmp/inv_sig_helper.sock"; +pub static DEFAULT_TCP_URL: &str = "0.0.0.0:12999"; + pub static TEST_YOUTUBE_VIDEO: &str = "https://www.youtube.com/watch?v=jNQXAC9IVRw"; pub static REGEX_PLAYER_ID: &Lazy = regex!("\\/s\\/player\\/([0-9a-f]{8})"); diff --git a/src/main.rs b/src/main.rs index f6f57b2..8d7d556 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,14 +4,15 @@ mod opcode; mod player; use ::futures::StreamExt; -use consts::DEFAULT_SOCK_PATH; +use consts::{DEFAULT_SOCK_PATH, DEFAULT_TCP_URL}; use jobs::{process_decrypt_n_signature, process_fetch_update, GlobalState, JobOpcode}; use opcode::OpcodeDecoder; use player::fetch_update; use std::{env::args, sync::Arc}; use tokio::{ fs::remove_file, - net::{UnixListener, UnixStream}, + io::{AsyncReadExt, AsyncWrite}, + net::{TcpListener, UnixListener}, sync::Mutex, }; use tokio_util::codec::Framed; @@ -21,30 +22,25 @@ use crate::jobs::{ process_player_update_timestamp, }; -macro_rules! break_fail { - ($res:expr) => { - match $res { - Ok(value) => value, - Err(e) => { - println!("An error occurred while parsing the current request: {}", e); - break; +macro_rules! loop_main { + ($i:ident, $s:ident) => { + println!("Fetching player"); + match fetch_update($s.clone()).await { + Ok(()) => println!("Successfully fetched player"), + Err(x) => { + println!("Error occured while trying to fetch the player: {:?}", x); } } + loop { + let (socket, _addr) = $i.accept().await.unwrap(); + + let cloned_state = $s.clone(); + tokio::spawn(async move { + process_socket(cloned_state, socket).await; + }); + } }; } - -macro_rules! eof_fail { - ($res:expr, $stream:ident) => { - match $res { - Ok(value) => value, - Err(e) => { - println!("An error occurred while parsing the current request: {}", e); - break; - } - } - }; -} - #[tokio::main] async fn main() { let args: Vec = args().collect(); @@ -56,37 +52,40 @@ async fn main() { // have to please rust let state: Arc = Arc::new(GlobalState::new()); - let socket: UnixListener = match UnixListener::bind(socket_url) { - Ok(x) => x, - Err(x) => { - if x.kind() == std::io::ErrorKind::AddrInUse { - remove_file(socket_url).await; - UnixListener::bind(socket_url).unwrap() - } else { + if socket_url == "--tcp" { + let socket_tcp_url: &str = match args.get(2) { + Some(stringref) => stringref, + None => DEFAULT_TCP_URL, + }; + let tcp_socket = match TcpListener::bind(socket_tcp_url).await { + Ok(x) => x, + Err(x) => { println!("Error occurred while trying to bind: {}", x); return; } - } - }; - - println!("Fetching player"); - match fetch_update(state.clone()).await { - Ok(()) => println!("Successfully fetched player"), - Err(x) => { - println!("Error occured while trying to fetch the player: {:?}", x); - } - } - loop { - let (socket, _addr) = socket.accept().await.unwrap(); - - let cloned_state = state.clone(); - tokio::spawn(async move { - process_socket(cloned_state, socket).await; - }); + }; + loop_main!(tcp_socket, state); + } else { + let unix_socket = match UnixListener::bind(socket_url) { + Ok(x) => x, + Err(x) => { + if x.kind() == std::io::ErrorKind::AddrInUse { + remove_file(socket_url).await; + UnixListener::bind(socket_url).unwrap() + } else { + println!("Error occurred while trying to bind: {}", x); + return; + } + } + }; + loop_main!(unix_socket, state); } } -async fn process_socket(state: Arc, socket: UnixStream) { +async fn process_socket(state: Arc, socket: W) +where + W: AsyncReadExt + Send + AsyncWrite + 'static, +{ let decoder = OpcodeDecoder {}; let str = Framed::new(socket, decoder);