server_node/server/
client.rsuse super::Event;
use crate::{config::Config, console};
use protocol::{
    CborRecvError, DhcpClientMessage, DhcpServerMessage, MacAddr, RecvCbor, RecvError, SendCbor,
};
use std::{
    io::ErrorKind,
    net::{Ipv4Addr, TcpListener, TcpStream},
    sync::{
        mpsc::{self, Sender},
        Arc,
    },
};
use thread_pool::ThreadPool;
pub fn listen_clients(
    listener: &TcpListener,
    config: &Arc<Config>,
    server_tx: &Sender<Event>,
    thread_pool: &ThreadPool,
) {
    for stream in listener.incoming() {
        match stream {
            Ok(mut stream) => {
                let config = Arc::clone(config);
                let tx = server_tx.clone();
                thread_pool
                    .execute(move || serve_client(&mut stream, &config, &tx))
                    .expect("Thread pool cannot spawn threads");
            }
            Err(e) => console::error!(&e, "Accepting new client connection failed"),
        }
    }
}
fn serve_client(stream: &mut TcpStream, config: &Arc<Config>, server_tx: &Sender<Event>) {
    stream
        .set_read_timeout(Some(config.client_timeout))
        .expect("Can't set stream read timeout");
    let result = stream.recv();
    match result {
        Ok(DhcpClientMessage::Discover { mac_address }) => {
            handle_discover(stream, server_tx, mac_address);
        }
        Ok(DhcpClientMessage::Request { mac_address, ip }) => {
            handle_renew(stream, server_tx, mac_address, ip);
        }
        Err(e) => console::error!(&e, "Could not receive request from the client"),
    }
}
fn handle_discover(stream: &mut TcpStream, server_tx: &Sender<Event>, mac_address: MacAddr) {
    #[cfg(debug_assertions)]
    assert!(matches!(stream.read_timeout(), Ok(Some(_))));
    let (tx, rx) = mpsc::channel();
    server_tx
        .send(Event::LeaseRequest { mac_address, tx })
        .expect(
            "Invariant violated: server_rx has been dropped before joining client listener thread",
        );
    let Ok(offer) = rx.recv() else {
        if let Err(e) = stream.send(&DhcpServerMessage::Nack) {
            console::error!(&e, "Could not reply with Nack to the client");
        }
        return;
    };
    if let Err(e) = stream.send(&DhcpServerMessage::Offer(offer.into())) {
        console::error!(&e, "Could not send offer to the client");
        return;
    }
    match stream.recv() {
        Ok(DhcpClientMessage::Request { mac_address, ip }) => {
            let (tx, rx) = mpsc::channel();
            server_tx
                    .send(Event::ConfirmRequest {
                        mac_address,
                        ip,
                        tx,
                    })
                    .expect("Invariant violated: server_rx has been dropped before joining client listener thread");
            match rx.recv() {
                Ok(committed) => {
                    if committed {
                        if let Err(e) = stream.send(&DhcpServerMessage::Ack) {
                            console::error!(&e, "Could not send Ack to the client");
                        }
                    } else if let Err(e) = stream.send(&DhcpServerMessage::Nack) {
                        console::error!(&e, "Could not send Nack to the client");
                    }
                }
                Err(e) => {
                    console::error!(&e, "Could not commit lease");
                }
            }
        }
        Ok(message) => {
            console::warning!(
                "Client didn't follow protocol!\nExpected: Request, got: {message:?}"
            );
        }
        Err(ref error) => {
            if let CborRecvError::Receive(RecvError::Io(io_error)) = error {
                match io_error.kind() {
                    ErrorKind::WouldBlock | ErrorKind::TimedOut => {
                        console::error!(
                            error,
                            "Client didn't follow Discover with Request within {:?}",
                            stream.read_timeout().ok().flatten().expect(
                                "handle_discover called without read timeout set on stream"
                            ),
                        );
                    }
                    _ => console::error!(error, "Could not receive client reply"),
                }
            } else {
                console::error!(error, "Could not receive client reply");
            }
        }
    }
}
fn handle_renew(
    stream: &mut TcpStream,
    server_tx: &Sender<Event>,
    mac_address: MacAddr,
    ip: Ipv4Addr,
) {
    #[cfg(debug_assertions)]
    assert!(matches!(stream.read_timeout(), Ok(Some(_))));
    let (tx, rx) = mpsc::channel();
    server_tx
        .send(Event::ConfirmRequest {
            mac_address,
            ip,
            tx,
        })
        .expect(
            "Invariant violated: server_rx has been dropped before joining client listener thread",
        );
    if rx.recv().unwrap_or(false) {
        if let Err(e) = stream.send(&DhcpServerMessage::Ack) {
            console::error!(&e, "Could not send Ack to the client");
        }
    } else if let Err(e) = stream.send(&DhcpServerMessage::Nack) {
        console::error!(&e, "Could not send Nack to the client");
    }
}