From 4051665b6dfd08a13752647f3fa9622b97bfddfb Mon Sep 17 00:00:00 2001 From: Andy Pack Date: Mon, 29 Jan 2024 07:43:16 +0000 Subject: [PATCH] sending and processing responses --- dnstp-client/src/main.rs | 38 ++++++++++++++++++++------- dnstp-server/src/main.rs | 1 + dnstp/src/dns_socket.rs | 46 ++++++++++++++++++++++++--------- dnstp/src/lib.rs | 3 ++- dnstp/src/response_processor.rs | 46 +++++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 23 deletions(-) create mode 100644 dnstp/src/response_processor.rs diff --git a/dnstp-client/src/main.rs b/dnstp-client/src/main.rs index 8b821a0..d7e6fdb 100644 --- a/dnstp-client/src/main.rs +++ b/dnstp-client/src/main.rs @@ -5,6 +5,10 @@ use std::time::Duration; use clap::Parser; use log::{error, info, LevelFilter}; use simplelog::*; +use dnstplib::dns_socket::DNSSocket; +use dnstplib::raw_request::NetworkMessage; +use dnstplib::request_processor::RequestProcesor; +use dnstplib::response_processor::ResponseProcesor; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -24,18 +28,32 @@ fn main() { let address = SocketAddr::from(([127, 0, 0, 1], 0)); - match UdpSocket::bind(&address) { - Ok(s) => { - loop { + let mut socket = DNSSocket::new(vec!(address)); + socket.bind(); + socket.run_tx(); - info!("sending..."); + let tx_channel = socket.get_tx_message_channel().unwrap(); - let send_buf = "hello world"; - let send_res = s.send_to(send_buf.as_bytes(), "127.0.0.1:5000"); + let mut processor = ResponseProcesor::new(); + processor.run(); - thread::sleep(Duration::from_secs(1)); - } - } - Err(e) => error!("couldn't bind to address {}", e) + socket.run_rx(processor.get_message_channel().expect("couldn't get message processing channel")); + + loop { + + info!("sending..."); + + let mut send_buf = [0; 512]; + send_buf[0] = 'a' as u8; + send_buf[1] = 'b' as u8; + send_buf[2] = 'c' as u8; + send_buf[3] = 'd' as u8; + + tx_channel.send(Box::from(NetworkMessage { + buffer: Box::from(send_buf), + peer: "127.0.0.1:5000".parse().unwrap() + })); + + thread::sleep(Duration::from_secs(1)); } } diff --git a/dnstp-server/src/main.rs b/dnstp-server/src/main.rs index e67704f..70a3838 100644 --- a/dnstp-server/src/main.rs +++ b/dnstp-server/src/main.rs @@ -41,6 +41,7 @@ fn main() { .collect(); let mut socket = DNSSocket::new(addresses); + socket.bind(); socket.run_tx(); let mut processor = RequestProcesor::new(); diff --git a/dnstp/src/dns_socket.rs b/dnstp/src/dns_socket.rs index aff9273..1d11626 100644 --- a/dnstp/src/dns_socket.rs +++ b/dnstp/src/dns_socket.rs @@ -10,6 +10,7 @@ use crate::raw_request::{NetworkMessage, NetworkMessagePtr}; pub struct DNSSocket { addresses: Vec, + socket: Option>, rx_thread: Option>, rx_thread_killer: Option>, tx_thread: Option>, @@ -22,6 +23,7 @@ impl DNSSocket { { DNSSocket { addresses, + socket: None, rx_thread: None, rx_thread_killer: None, tx_thread: None, @@ -41,11 +43,21 @@ impl DNSSocket { // } // } - fn bind(&mut self) -> Option + pub fn bind(&mut self) { match UdpSocket::bind(&self.addresses[..]) { - Ok(s) => Option::from(s), - Err(_) => None + Ok(s) => { + self.socket = Option::from(Box::from(s)); + }, + Err(_) => {} + }; + } + + fn get_socket_clone(&mut self) -> Option> + { + match &self.socket { + Some(s) => Option::from(Box::from(s.try_clone().unwrap())), + None => None } } @@ -53,7 +65,7 @@ impl DNSSocket { { let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel(); self.rx_thread_killer = Some(tx); - let socket = self.bind(); + let socket = self.get_socket_clone(); self.rx_thread = Some(thread::spawn(move || { @@ -103,18 +115,28 @@ impl DNSSocket { let (msg_tx, msg_rx): (Sender, Receiver) = mpsc::channel(); self.tx_message_channel = Option::from(msg_tx); + let socket = self.get_socket_clone(); + self.tx_thread = Some(thread::spawn(move || { - let mut cancelled = false; - while !cancelled { - - for m in &msg_rx { - info!("sending [{}] to [{}]", str::from_utf8(&(*(*m).buffer)).unwrap(), (*m).peer); + match socket { + None => { + error!("no socket created, failed to bind to address") } + Some(s) => { + let mut cancelled = false; + while !cancelled { - cancelled = match rx.try_recv() { - Ok(_) | Err(TryRecvError::Disconnected) => true, - _ => false + for m in &msg_rx { + info!("sending [{}] to [{}]", str::from_utf8(&(*(*m).buffer)).unwrap(), (*m).peer); + s.send_to(&(*m.buffer), m.peer); + } + + cancelled = match rx.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => true, + _ => false + } + } } } diff --git a/dnstp/src/lib.rs b/dnstp/src/lib.rs index 8807e73..ac736ed 100644 --- a/dnstp/src/lib.rs +++ b/dnstp/src/lib.rs @@ -2,4 +2,5 @@ pub mod dns_socket; pub mod request_parser; mod dns_header; pub mod request_processor; -mod raw_request; \ No newline at end of file +pub mod response_processor; +pub mod raw_request; \ No newline at end of file diff --git a/dnstp/src/response_processor.rs b/dnstp/src/response_processor.rs new file mode 100644 index 0000000..5c3b56d --- /dev/null +++ b/dnstp/src/response_processor.rs @@ -0,0 +1,46 @@ +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; +use std::thread; +use log::info; +use std::str; +use crate::raw_request::NetworkMessagePtr; + +pub struct ResponseProcesor { + message_channel: Option> +} + +impl ResponseProcesor { + pub fn new() -> ResponseProcesor { + ResponseProcesor{ + message_channel: None + } + } + + pub fn run(&mut self) + { + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + self.message_channel = Some(tx); + + thread::spawn(move || { + + for mut m in rx + { + info!("processing: {}", str::from_utf8(&(*(*m).buffer)).unwrap()); + + // (*(*m).buffer).reverse(); + + // match sending_channel.send(m) { + // Ok(_) => {} + // Err(_) => {} + // } + } + + info!("message processing thread finishing") + }); + } + + pub fn get_message_channel(&mut self) -> Option> + { + self.message_channel.clone() + } +} \ No newline at end of file