passing messages to processor and back again
This commit is contained in:
parent
d3aefea3cd
commit
584dcdebde
@ -1,12 +1,13 @@
|
||||
use clap::Parser;
|
||||
use std::{thread};
|
||||
|
||||
use log::{error, info, warn};
|
||||
use log::info;
|
||||
use simplelog::*;
|
||||
use std::fs::File;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use dnstplib::dns_socket::DNSSocket;
|
||||
use dnstplib::request_processor::RequestProcesor;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
@ -40,7 +41,12 @@ fn main() {
|
||||
.collect();
|
||||
|
||||
let mut socket = DNSSocket::new(addresses);
|
||||
socket.run();
|
||||
socket.run_tx();
|
||||
|
||||
let mut processor = RequestProcesor::new();
|
||||
processor.run(socket.get_tx_message_channel().expect("couldn't get message transmitting channel"));
|
||||
|
||||
socket.run_rx(processor.get_message_channel().expect("couldn't get message processing channel"));
|
||||
|
||||
thread::park();
|
||||
}
|
||||
|
@ -1,4 +1,3 @@
|
||||
|
||||
pub enum Direction {
|
||||
Request, Response
|
||||
}
|
||||
|
@ -6,11 +6,15 @@ use log::{error, info};
|
||||
use std::str;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
|
||||
use crate::raw_request::{NetworkMessage, NetworkMessagePtr};
|
||||
|
||||
pub struct DNSSocket {
|
||||
addresses: Vec<SocketAddr>,
|
||||
thread: Option<JoinHandle<()>>,
|
||||
thread_killer: Option<Sender<()>>
|
||||
rx_thread: Option<JoinHandle<()>>,
|
||||
rx_thread_killer: Option<Sender<()>>,
|
||||
tx_thread: Option<JoinHandle<()>>,
|
||||
tx_message_channel: Option<Sender<NetworkMessagePtr>>,
|
||||
tx_thread_killer: Option<Sender<()>>
|
||||
}
|
||||
|
||||
impl DNSSocket {
|
||||
@ -18,8 +22,11 @@ impl DNSSocket {
|
||||
{
|
||||
DNSSocket {
|
||||
addresses,
|
||||
thread: None,
|
||||
thread_killer: None
|
||||
rx_thread: None,
|
||||
rx_thread_killer: None,
|
||||
tx_thread: None,
|
||||
tx_message_channel: None,
|
||||
tx_thread_killer: None
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,17 +45,17 @@ impl DNSSocket {
|
||||
{
|
||||
match UdpSocket::bind(&self.addresses[..]) {
|
||||
Ok(s) => Option::from(s),
|
||||
Err(e) => None
|
||||
Err(_) => None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&mut self)
|
||||
pub fn run_rx(&mut self, message_sender: Sender<NetworkMessagePtr>)
|
||||
{
|
||||
let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel();
|
||||
self.thread_killer = Some(tx);
|
||||
self.rx_thread_killer = Some(tx);
|
||||
let socket = self.bind();
|
||||
|
||||
self.thread = Some(thread::spawn(move || {
|
||||
self.rx_thread = Some(thread::spawn(move || {
|
||||
|
||||
match socket {
|
||||
None => {
|
||||
@ -57,13 +64,21 @@ impl DNSSocket {
|
||||
Some(s) => {
|
||||
let mut cancelled = false;
|
||||
while !cancelled {
|
||||
let mut buf = [0; 512];
|
||||
let res = s.recv_from(&mut buf);
|
||||
let mut buf = Box::new([0; 512]);
|
||||
let res = s.recv_from(&mut (*buf));
|
||||
|
||||
match res {
|
||||
Ok(r) => {
|
||||
let res_str = str::from_utf8(&buf).unwrap();
|
||||
info!("received: {}", res_str);
|
||||
Ok((_, peer)) => {
|
||||
let res_str = str::from_utf8(&(*buf)).unwrap();
|
||||
info!("received [{}] from [{}]", res_str, peer);
|
||||
match message_sender.send(Box::new(NetworkMessage {
|
||||
buffer: buf,
|
||||
peer
|
||||
}))
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
@ -76,14 +91,50 @@ impl DNSSocket {
|
||||
}
|
||||
};
|
||||
|
||||
info!("socket thread finishing")
|
||||
info!("socket rx thread finishing")
|
||||
}));
|
||||
}
|
||||
|
||||
pub fn run_tx(&mut self)
|
||||
{
|
||||
let (tx, rx): (Sender<()>, Receiver<()>) = mpsc::channel();
|
||||
self.tx_thread_killer = Some(tx);
|
||||
|
||||
let (msg_tx, msg_rx): (Sender<NetworkMessagePtr>, Receiver<NetworkMessagePtr>) = mpsc::channel();
|
||||
self.tx_message_channel = Option::from(msg_tx);
|
||||
|
||||
self.tx_thread = Some(thread::spawn(move || {
|
||||
|
||||
let mut cancelled = false;
|
||||
while !cancelled {
|
||||
|
||||
for m in &msg_rx {
|
||||
info!("sending [{}] to [{}]", str::from_utf8(&(*(*m).buffer)).unwrap(), (*m).peer);
|
||||
}
|
||||
|
||||
cancelled = match rx.try_recv() {
|
||||
Ok(_) | Err(TryRecvError::Disconnected) => true,
|
||||
_ => false
|
||||
}
|
||||
}
|
||||
|
||||
info!("socket tx thread finishing")
|
||||
}));
|
||||
}
|
||||
|
||||
pub fn get_tx_message_channel(&mut self) -> Option<Sender<NetworkMessagePtr>>
|
||||
{
|
||||
self.tx_message_channel.clone()
|
||||
}
|
||||
|
||||
pub fn stop(&mut self)
|
||||
{
|
||||
// if let Some(t) = &mut self.thread {
|
||||
if let Some(k) = &self.thread_killer {
|
||||
if let Some(k) = &self.rx_thread_killer {
|
||||
k.send(());
|
||||
// t.join();
|
||||
}
|
||||
if let Some(k) = &self.tx_thread_killer {
|
||||
k.send(());
|
||||
// t.join();
|
||||
}
|
||||
|
@ -1,3 +1,5 @@
|
||||
pub mod dns_socket;
|
||||
pub mod request_parser;
|
||||
mod dns_header;
|
||||
pub mod request_processor;
|
||||
mod raw_request;
|
8
dnstp/src/raw_request.rs
Normal file
8
dnstp/src/raw_request.rs
Normal file
@ -0,0 +1,8 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
pub type NetworkMessagePtr = Box<NetworkMessage>;
|
||||
|
||||
pub struct NetworkMessage {
|
||||
pub buffer: Box<[u8; 512]>,
|
||||
pub peer: SocketAddr
|
||||
}
|
46
dnstp/src/request_processor.rs
Normal file
46
dnstp/src/request_processor.rs
Normal file
@ -0,0 +1,46 @@
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::thread;
|
||||
use log::info;
|
||||
use std::str;
|
||||
use crate::raw_request::NetworkMessagePtr;
|
||||
|
||||
pub struct RequestProcesor {
|
||||
message_channel: Option<Sender<NetworkMessagePtr>>
|
||||
}
|
||||
|
||||
impl RequestProcesor {
|
||||
pub fn new() -> RequestProcesor {
|
||||
RequestProcesor{
|
||||
message_channel: None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&mut self, sending_channel: Sender<NetworkMessagePtr>)
|
||||
{
|
||||
let (tx, rx): (Sender<NetworkMessagePtr>, Receiver<NetworkMessagePtr>) = mpsc::channel();
|
||||
self.message_channel = Some(tx);
|
||||
|
||||
thread::spawn(move || {
|
||||
|
||||
for mut m in rx
|
||||
{
|
||||
info!("processing: {}", str::from_utf8(&(*(*m).buffer)).unwrap());
|
||||
|
||||
(*(*m).buffer).reverse();
|
||||
|
||||
match sending_channel.send(m) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
info!("message processing thread finishing")
|
||||
});
|
||||
}
|
||||
|
||||
pub fn get_message_channel(&mut self) -> Option<Sender<NetworkMessagePtr>>
|
||||
{
|
||||
self.message_channel.clone()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user