From 584dcdebde64ce3ec0ba24d5ed2d75f9c7b3415a Mon Sep 17 00:00:00 2001 From: Andy Pack Date: Mon, 29 Jan 2024 00:03:43 +0000 Subject: [PATCH] passing messages to processor and back again --- dnstp-server/src/main.rs | 12 +++-- dnstp/src/dns_header.rs | 1 - dnstp/src/dns_socket.rs | 81 +++++++++++++++++++++++++++------- dnstp/src/lib.rs | 4 +- dnstp/src/raw_request.rs | 8 ++++ dnstp/src/request_processor.rs | 46 +++++++++++++++++++ 6 files changed, 132 insertions(+), 20 deletions(-) create mode 100644 dnstp/src/raw_request.rs create mode 100644 dnstp/src/request_processor.rs diff --git a/dnstp-server/src/main.rs b/dnstp-server/src/main.rs index 1d9b1c4..e67704f 100644 --- a/dnstp-server/src/main.rs +++ b/dnstp-server/src/main.rs @@ -1,12 +1,13 @@ use clap::Parser; use std::{thread}; -use log::{error, info, warn}; +use log::info; use simplelog::*; use std::fs::File; -use std::net::{SocketAddr, UdpSocket}; +use std::net::SocketAddr; use dnstplib::dns_socket::DNSSocket; +use dnstplib::request_processor::RequestProcesor; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -40,7 +41,12 @@ fn main() { .collect(); let mut socket = DNSSocket::new(addresses); - socket.run(); + socket.run_tx(); + + let mut processor = RequestProcesor::new(); + processor.run(socket.get_tx_message_channel().expect("couldn't get message transmitting channel")); + + socket.run_rx(processor.get_message_channel().expect("couldn't get message processing channel")); thread::park(); } diff --git a/dnstp/src/dns_header.rs b/dnstp/src/dns_header.rs index bbe63d0..1037704 100644 --- a/dnstp/src/dns_header.rs +++ b/dnstp/src/dns_header.rs @@ -1,4 +1,3 @@ - pub enum Direction { Request, Response } diff --git a/dnstp/src/dns_socket.rs b/dnstp/src/dns_socket.rs index 2dadeba..aff9273 100644 --- a/dnstp/src/dns_socket.rs +++ b/dnstp/src/dns_socket.rs @@ -6,11 +6,15 @@ use log::{error, info}; use std::str; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use crate::raw_request::{NetworkMessage, NetworkMessagePtr}; pub struct DNSSocket { addresses: Vec, - thread: Option>, - thread_killer: Option> + rx_thread: Option>, + rx_thread_killer: Option>, + tx_thread: Option>, + tx_message_channel: Option>, + tx_thread_killer: Option> } impl DNSSocket { @@ -18,8 +22,11 @@ impl DNSSocket { { DNSSocket { addresses, - thread: None, - thread_killer: None + rx_thread: None, + rx_thread_killer: None, + tx_thread: None, + tx_message_channel: None, + tx_thread_killer: None } } @@ -38,17 +45,17 @@ impl DNSSocket { { match UdpSocket::bind(&self.addresses[..]) { Ok(s) => Option::from(s), - Err(e) => None + Err(_) => None } } - pub fn run(&mut self) + pub fn run_rx(&mut self, message_sender: Sender) { let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel(); - self.thread_killer = Some(tx); + self.rx_thread_killer = Some(tx); let socket = self.bind(); - self.thread = Some(thread::spawn(move || { + self.rx_thread = Some(thread::spawn(move || { match socket { None => { @@ -57,13 +64,21 @@ impl DNSSocket { Some(s) => { let mut cancelled = false; while !cancelled { - let mut buf = [0; 512]; - let res = s.recv_from(&mut buf); + let mut buf = Box::new([0; 512]); + let res = s.recv_from(&mut (*buf)); match res { - Ok(r) => { - let res_str = str::from_utf8(&buf).unwrap(); - info!("received: {}", res_str); + Ok((_, peer)) => { + let res_str = str::from_utf8(&(*buf)).unwrap(); + info!("received [{}] from [{}]", res_str, peer); + match message_sender.send(Box::new(NetworkMessage { + buffer: buf, + peer + })) + { + Ok(_) => {} + Err(_) => {} + } } Err(_) => {} } @@ -76,14 +91,50 @@ impl DNSSocket { } }; - info!("socket thread finishing") + info!("socket rx thread finishing") })); } + pub fn run_tx(&mut self) + { + let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel(); + self.tx_thread_killer = Some(tx); + + let (msg_tx, msg_rx): (Sender, Receiver) = mpsc::channel(); + self.tx_message_channel = Option::from(msg_tx); + + self.tx_thread = Some(thread::spawn(move || { + + let mut cancelled = false; + while !cancelled { + + for m in &msg_rx { + info!("sending [{}] to [{}]", str::from_utf8(&(*(*m).buffer)).unwrap(), (*m).peer); + } + + cancelled = match rx.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => true, + _ => false + } + } + + info!("socket tx thread finishing") + })); + } + + pub fn get_tx_message_channel(&mut self) -> Option> + { + self.tx_message_channel.clone() + } + pub fn stop(&mut self) { // if let Some(t) = &mut self.thread { - if let Some(k) = &self.thread_killer { + if let Some(k) = &self.rx_thread_killer { + k.send(()); + // t.join(); + } + if let Some(k) = &self.tx_thread_killer { k.send(()); // t.join(); } diff --git a/dnstp/src/lib.rs b/dnstp/src/lib.rs index 2b85e2e..8807e73 100644 --- a/dnstp/src/lib.rs +++ b/dnstp/src/lib.rs @@ -1,3 +1,5 @@ pub mod dns_socket; pub mod request_parser; -mod dns_header; \ No newline at end of file +mod dns_header; +pub mod request_processor; +mod raw_request; \ No newline at end of file diff --git a/dnstp/src/raw_request.rs b/dnstp/src/raw_request.rs new file mode 100644 index 0000000..07d83a5 --- /dev/null +++ b/dnstp/src/raw_request.rs @@ -0,0 +1,8 @@ +use std::net::SocketAddr; + +pub type NetworkMessagePtr = Box; + +pub struct NetworkMessage { + pub buffer: Box<[u8; 512]>, + pub peer: SocketAddr +} \ No newline at end of file diff --git a/dnstp/src/request_processor.rs b/dnstp/src/request_processor.rs new file mode 100644 index 0000000..470c3f9 --- /dev/null +++ b/dnstp/src/request_processor.rs @@ -0,0 +1,46 @@ +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; +use std::thread; +use log::info; +use std::str; +use crate::raw_request::NetworkMessagePtr; + +pub struct RequestProcesor { + message_channel: Option> +} + +impl RequestProcesor { + pub fn new() -> RequestProcesor { + RequestProcesor{ + message_channel: None + } + } + + pub fn run(&mut self, sending_channel: Sender) + { + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + self.message_channel = Some(tx); + + thread::spawn(move || { + + for mut m in rx + { + info!("processing: {}", str::from_utf8(&(*(*m).buffer)).unwrap()); + + (*(*m).buffer).reverse(); + + match sending_channel.send(m) { + Ok(_) => {} + Err(_) => {} + } + } + + info!("message processing thread finishing") + }); + } + + pub fn get_message_channel(&mut self) -> Option> + { + self.message_channel.clone() + } +} \ No newline at end of file