sending and processing responses

This commit is contained in:
Andy Pack 2024-01-29 07:43:16 +00:00
parent 584dcdebde
commit 4051665b6d
Signed by: sarsoo
GPG Key ID: A55BA3536A5E0ED7
5 changed files with 111 additions and 23 deletions

View File

@ -5,6 +5,10 @@ use std::time::Duration;
use clap::Parser; use clap::Parser;
use log::{error, info, LevelFilter}; use log::{error, info, LevelFilter};
use simplelog::*; use simplelog::*;
use dnstplib::dns_socket::DNSSocket;
use dnstplib::raw_request::NetworkMessage;
use dnstplib::request_processor::RequestProcesor;
use dnstplib::response_processor::ResponseProcesor;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
@ -24,18 +28,32 @@ fn main() {
let address = SocketAddr::from(([127, 0, 0, 1], 0)); let address = SocketAddr::from(([127, 0, 0, 1], 0));
match UdpSocket::bind(&address) { let mut socket = DNSSocket::new(vec!(address));
Ok(s) => { socket.bind();
loop { socket.run_tx();
info!("sending..."); let tx_channel = socket.get_tx_message_channel().unwrap();
let send_buf = "hello world"; let mut processor = ResponseProcesor::new();
let send_res = s.send_to(send_buf.as_bytes(), "127.0.0.1:5000"); processor.run();
thread::sleep(Duration::from_secs(1)); socket.run_rx(processor.get_message_channel().expect("couldn't get message processing channel"));
}
} loop {
Err(e) => error!("couldn't bind to address {}", e)
info!("sending...");
let mut send_buf = [0; 512];
send_buf[0] = 'a' as u8;
send_buf[1] = 'b' as u8;
send_buf[2] = 'c' as u8;
send_buf[3] = 'd' as u8;
tx_channel.send(Box::from(NetworkMessage {
buffer: Box::from(send_buf),
peer: "127.0.0.1:5000".parse().unwrap()
}));
thread::sleep(Duration::from_secs(1));
} }
} }

View File

@ -41,6 +41,7 @@ fn main() {
.collect(); .collect();
let mut socket = DNSSocket::new(addresses); let mut socket = DNSSocket::new(addresses);
socket.bind();
socket.run_tx(); socket.run_tx();
let mut processor = RequestProcesor::new(); let mut processor = RequestProcesor::new();

View File

@ -10,6 +10,7 @@ use crate::raw_request::{NetworkMessage, NetworkMessagePtr};
pub struct DNSSocket { pub struct DNSSocket {
addresses: Vec<SocketAddr>, addresses: Vec<SocketAddr>,
socket: Option<Box<UdpSocket>>,
rx_thread: Option<JoinHandle<()>>, rx_thread: Option<JoinHandle<()>>,
rx_thread_killer: Option<Sender<()>>, rx_thread_killer: Option<Sender<()>>,
tx_thread: Option<JoinHandle<()>>, tx_thread: Option<JoinHandle<()>>,
@ -22,6 +23,7 @@ impl DNSSocket {
{ {
DNSSocket { DNSSocket {
addresses, addresses,
socket: None,
rx_thread: None, rx_thread: None,
rx_thread_killer: None, rx_thread_killer: None,
tx_thread: None, tx_thread: None,
@ -41,11 +43,21 @@ impl DNSSocket {
// } // }
// } // }
fn bind(&mut self) -> Option<UdpSocket> pub fn bind(&mut self)
{ {
match UdpSocket::bind(&self.addresses[..]) { match UdpSocket::bind(&self.addresses[..]) {
Ok(s) => Option::from(s), Ok(s) => {
Err(_) => None self.socket = Option::from(Box::from(s));
},
Err(_) => {}
};
}
fn get_socket_clone(&mut self) -> Option<Box<UdpSocket>>
{
match &self.socket {
Some(s) => Option::from(Box::from(s.try_clone().unwrap())),
None => None
} }
} }
@ -53,7 +65,7 @@ impl DNSSocket {
{ {
let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel(); let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel();
self.rx_thread_killer = Some(tx); self.rx_thread_killer = Some(tx);
let socket = self.bind(); let socket = self.get_socket_clone();
self.rx_thread = Some(thread::spawn(move || { self.rx_thread = Some(thread::spawn(move || {
@ -103,18 +115,28 @@ impl DNSSocket {
let (msg_tx, msg_rx): (Sender<NetworkMessagePtr>, Receiver<NetworkMessagePtr>) = mpsc::channel(); let (msg_tx, msg_rx): (Sender<NetworkMessagePtr>, Receiver<NetworkMessagePtr>) = mpsc::channel();
self.tx_message_channel = Option::from(msg_tx); self.tx_message_channel = Option::from(msg_tx);
let socket = self.get_socket_clone();
self.tx_thread = Some(thread::spawn(move || { self.tx_thread = Some(thread::spawn(move || {
let mut cancelled = false; match socket {
while !cancelled { None => {
error!("no socket created, failed to bind to address")
for m in &msg_rx {
info!("sending [{}] to [{}]", str::from_utf8(&(*(*m).buffer)).unwrap(), (*m).peer);
} }
Some(s) => {
let mut cancelled = false;
while !cancelled {
cancelled = match rx.try_recv() { for m in &msg_rx {
Ok(_) | Err(TryRecvError::Disconnected) => true, info!("sending [{}] to [{}]", str::from_utf8(&(*(*m).buffer)).unwrap(), (*m).peer);
_ => false s.send_to(&(*m.buffer), m.peer);
}
cancelled = match rx.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => true,
_ => false
}
}
} }
} }

View File

@ -2,4 +2,5 @@ pub mod dns_socket;
pub mod request_parser; pub mod request_parser;
mod dns_header; mod dns_header;
pub mod request_processor; pub mod request_processor;
mod raw_request; pub mod response_processor;
pub mod raw_request;

View File

@ -0,0 +1,46 @@
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use log::info;
use std::str;
use crate::raw_request::NetworkMessagePtr;
pub struct ResponseProcesor {
message_channel: Option<Sender<NetworkMessagePtr>>
}
impl ResponseProcesor {
pub fn new() -> ResponseProcesor {
ResponseProcesor{
message_channel: None
}
}
pub fn run(&mut self)
{
let (tx, rx): (Sender<NetworkMessagePtr>, Receiver<NetworkMessagePtr>) = mpsc::channel();
self.message_channel = Some(tx);
thread::spawn(move || {
for mut m in rx
{
info!("processing: {}", str::from_utf8(&(*(*m).buffer)).unwrap());
// (*(*m).buffer).reverse();
// match sending_channel.send(m) {
// Ok(_) => {}
// Err(_) => {}
// }
}
info!("message processing thread finishing")
});
}
pub fn get_message_channel(&mut self) -> Option<Sender<NetworkMessagePtr>>
{
self.message_channel.clone()
}
}