Building Server Connections and Responses

Create a new director in the src folder called server.

Inside the server directory create mod.rs, tcp_api.rs and connection.rs

SeaORM-TODO-App/src
				|-- src/fruits_table
                |-- src/insert_values.rs
                |-- src/main.rs
                |-- src/suppliers_table
                |-- src/todos_table 
                |-- src/server
+                	|-- mod.rs
+                	|-- tcp_api.rs
+                	|-- connection.rs

Import the new submodules to src/server/mod.rs file


#![allow(unused)]
fn main() {
mod tcp_api;
mod connection;

pub use tcp_api::*;
pub use connection::*;
}

Then register the module to the src.main.rs file

// -- code snippet --

mod server;
pub use server::*;

#[async_std::main]
async fn main() -> anyhow::Result<()> {
	// -- code snippet --
	
	Ok(())
}

The TCP API

Create the commands that the tcp api will handle. In the src/server/tcp_api.rs add:

use serde::{Serialize, Deserialize};

// The commands to use to perform CRUD operations on PostgreSQL
#[derive(Debug, Serialize, Deserialize)]
pub enum Command {
    Store { username: String, todo_list: String },
    UpdateTodoList { username: String, todo_list: String },
    Get(String),
    CreateUser(String),
    ListFruits,
}

Command::Store { username: String, todo_list: String } will handle an insert operation, inserting the todo_list in the row with the column labeled by username.

Command::UpdateTodoList { username: String, todo_list: String } will handle an update operation, inserting the todo_list in the row with the column labeled by username.

Command::Get(String) will fetch the todo_list from the column username with the username in the String field.

Command::CreateUser(String) will create a new row with the String field being inserted in the username column.

Command::ListFruits will fetch all the fruits in the fruits table.

The Command enum will be deserialized by bincode crate. Add the bincode and serde crates to Cargo.toml file

$ cargo add bincode

$ cargo add serde --features derive

Add error handling capabilities incase the wrong command is invoked

File: src/server/tcp_api.rs

// -- code snippet --

#[derive(Debug)]
pub enum ServerErrors {
    InvalidCommand,
    ModelNotFound,
}

impl Error for ServerErrors {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            ServerErrors::InvalidCommand => Some(&crate::ServerErrors::InvalidCommand),
            ServerErrors::ModelNotFound => Some(&crate::ServerErrors::ModelNotFound),
        }
    }
}

impl fmt::Display for ServerErrors {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{:?}",
            match self {
                ServerErrors::InvalidCommand => "Invalid command provided",
                ServerErrors::ModelNotFound => "The result of the query is `None`",
            }
        )
    }
}

The ServerErrors::InvalidCommand is returned when the method called on a Command is invalid while the ServerErrors::ModelNotFound is returned when a Model is not found in the database.

Then implement the methods for the Command enum that will handle database operations

File: src/server/tcp_api.rs

// -- code snippet --
+ use crate::{Fruits, Suppliers, Todos, TodosActiveModel, TodosColumn, TodosModel};
+ use sea_orm::{
+     ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, ModelTrait, QueryFilter, Set,
+ };
  use serde::{Deserialize, Serialize};
+ use std::{error::Error, fmt};

// The commands to use to perform CRUD operations on PostgreSQL
#[derive(Debug, Serialize, Deserialize)]
pub enum Command {
    Store { username: String, todo_list: String },
    UpdateTodoList { username: String, todo_list: String },
    Get(String),
    CreateUser(String),
    ListFruits,
}


Implement methods to handle the commands from the api

File: src/server/tcp_api.rs

// -- code snippet --
// The commands to use to perform CRUD operations on PostgreSQL
#[derive(Debug, Serialize, Deserialize)]
pub enum Command {
    Store { username: String, todo_list: String },
    UpdateTodoList { username: String, todo_list: String },
    Get(String),
    CreateUser(String),
    ListFruits,
}

impl Command {
    pub async fn get_fruits(&self, db: &DatabaseConnection) -> anyhow::Result<Vec<u8>> {
        let fruit_models = Fruits::find().all(db).await?;
        let fruits = fruit_models
            .iter()
            .map(|fruit_model| fruit_model.fruit_name.clone())
            .collect::<Vec<String>>();

        Ok(bincode::serialize(&fruits)?)
    }

    pub async fn store(&self, db: &DatabaseConnection) -> anyhow::Result<Vec<u8>> {
        match self {
            Self::Store {
                username,
                todo_list,
            } => {
                let todo_user = TodosActiveModel {
                    username: Set(username.to_owned()),
                    todo_list: Set(Some(todo_list.to_owned())),
                    ..Default::default()
                };
                Todos::insert(todo_user).exec(db).await?;

                Ok(bincode::serialize("INSERTED")?)
            }
            _ => Err(anyhow::Error::new(ServerErrors::InvalidCommand)),
        }
    }

    pub async fn create_new_user(&self, db: &DatabaseConnection) -> anyhow::Result<Vec<u8>> {
        match self {
            Self::CreateUser(username) => {
                let todo_user = TodosActiveModel {
                    username: Set(username.to_owned()),
                    ..Default::default()
                };
                Todos::insert(todo_user).exec(db).await?;

                Ok(bincode::serialize(&format!("CREATED_USER `{}`", username))?)
            }
            _ => Err(anyhow::Error::new(ServerErrors::InvalidCommand)),
        }
    }

    pub async fn get_user_todo(&self, db: &DatabaseConnection) -> anyhow::Result<Vec<u8>> {
        match self {
            Self::Get(user) => {
                let get_todo = Todos::find()
                    .filter(TodosColumn::Username.contains(user))
                    .one(db)
                    .await?;

                if let Some(found_todo) = get_todo {
                    Ok(bincode::serialize(&found_todo.todo_list)?)
                } else {
                    Ok(bincode::serialize(&Some("USER_NOT_FOUND"))?)
                }
            }
            _ => Err(anyhow::Error::new(ServerErrors::InvalidCommand)),
        }
    }

    pub async fn update_todo_list(&self, db: &DatabaseConnection) -> anyhow::Result<Vec<u8>> {
        match self {
            Self::UpdateTodoList {
                username,
                todo_list,
            } => {
                let found_todo: Option<TodosModel> = Todos::find()
                    .filter(TodosColumn::Username.contains(username))
                    .one(db)
                    .await?;

                match found_todo {
                    Some(todo_model) => {
                        let mut todo_model: TodosActiveModel = todo_model.into();
                        todo_model.todo_list = Set(Some(todo_list.to_owned()));
                        todo_model.update(db).await?;
                    }
                    None => return Err(anyhow::Error::new(ServerErrors::ModelNotFound)),
                };

                Ok(bincode::serialize("UPDATED_TODO")?)
            }
            _ => Err(anyhow::Error::new(ServerErrors::InvalidCommand)),
        }
    }
}

The get_fruits() method handles the Command::ListFruits command and it is responsible for fetching the list of fruits in the database.

The store() method handles the Command:: Store {..} command and it inserts the field todo_list in the username column corresponding to the username field.

The create_new_user() method handles Command::CreateUser(..) command, it creates a new user by inserting the String field data to the username column and an empty entry in the todo_list column.

The get_user_todo() method handles Command::Get(..) command. It is used mostly to check if the user in the String field exists in the username column.

The update_todo_list() method handles the Command:: UpdateTodoList {..} command and it updates the field todo_list in the username column corresponding to the username field.

The TCP API handler

The TcpStream will need to be handled. The src/server/connection.rs file contains the code for this.

File: src/server/connection.rs

// Import the necessary async versions of TcpStream and TcpListener
use crate::Command;
use async_std::{
    net::{Shutdown, SocketAddr, TcpListener, TcpStream},
    prelude::*,
    sync::Arc,
    task,
};

use sea_orm::DatabaseConnection;

const BUFFER_DATA_CAPACITY: usize = 1024 * 1024; // The todo list should not exceed 1MiB
const BUFFER_CAPACITY: usize = 64 * 1024; //64Kib

// function is called to create a new server on port 8080 localhost
pub async fn start_server(db: Arc<DatabaseConnection>) -> anyhow::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Listening on 127.0.0.1:8080");

    while let Some(stream) = listener.incoming().next().await {
        let stream = stream?;
        let db = db.clone();

        task::spawn(async move {
            match process_stream(db, stream).await {
                Ok(addr) => {
                    println!("x → {addr:?} - DISCONNECTED")
                }
                Err(error) => {
                    eprintln!("{:?}", error);
                }
            }
        })
        .await;
    }

    Ok(())
}

async fn process_stream(
    db: Arc<DatabaseConnection>,
    mut stream: TcpStream,
) -> anyhow::Result<SocketAddr> {
    let peer = stream.peer_addr()?;
    println!("← → {peer:?} - CONNECTED");
    let mut buffer = [0u8; BUFFER_CAPACITY];
    let mut command_buffer: Vec<u8> = Vec::new();
    let bytes_read = stream.read(&mut buffer).await?;
    while bytes_read != 0 {
        if command_buffer.len() > BUFFER_DATA_CAPACITY {
            handle_response(&mut stream, b"BUFFER_CAPACITY_EXCEEDED_1MiB".to_vec()).await?;
        }

        // Check if the current stream is less than the buffer capacity, if so all data has been received
        if buffer[..bytes_read].len() < BUFFER_CAPACITY {
            // Ensure that the data is appended before being deserialized by bincode
            command_buffer.append(&mut buffer[..bytes_read].to_owned());
            let dbop_result = process_database_op(&db, &command_buffer).await?;
            handle_response(&mut stream, dbop_result).await?;
            break;
        }
        // Append data to buffer
        command_buffer.append(&mut buffer[..bytes_read].to_owned());
    }

    let peer = stream.peer_addr()?;
    //Shutdown the TCP address
    stream.shutdown(Shutdown::Both)?;
    // Terminate the stream if the client terminates the connection by sending 0 bytes
    return Ok(peer);
}

async fn handle_response(stream: &mut TcpStream, reponse_data: Vec<u8>) -> anyhow::Result<()> {
    stream.write_all(&reponse_data).await?;

    stream.flush().await?;

    Ok(())
}

async fn process_database_op(
    db: &DatabaseConnection,
    command_buffer: &[u8],
) -> anyhow::Result<Vec<u8>> {
    let command: Command = bincode::deserialize(command_buffer)?;

    let db_op = match command {
        Command::Get(..) => command.get_user_todo(db).await,
        Command::CreateUser(..) => command.create_new_user(db).await,
        Command::ListFruits => command.get_fruits(db).await,
        Command::ListSuppliers => command.get_suppliers(db).await,
        Command::Store { .. } => command.store(db).await,
        Command::UpdateTodoList { .. } => command.update_todo_list(db).await,
        Command::DeleteUser(..) => command.delete_user(db).await,
    };

    match db_op {
        Ok(value) => Ok(value),
        Err(error) => Ok(bincode::serialize(&error.to_string())?),
    }
}

Here, the BUFFER_DATA_CAPACITY caps the TODO list data at 1MiB and limits the buffer capacity for the TCP stream using BUFFER_CAPACITY capped at 64KiB.

The start_server() function creates a TcpListener at port 8080 localhost IP 127.0.0.1. It accepts a database connection inside an Arc<DatabaseConnection> for thread safety when we spawn a task to handle the stream. Each TcpStream is handled asynchronously using a async::task::spawn() method.

stream.read(&mut buffer).await?; reads the stream. The while loop loops until the stream returns a 0_usize indicating the connection has been closed by the peer and if data has been received, it checks if the data has exceeded the

BUFFER_DATA_CAPACITY of 1MiB, if not it decodes the buffer using bincode and passes the data to the process_database_op() function which matches the deserialized Command and calls the appropriate method which in turn performs the database operation, encodes the result of the database operation and writes it back to the peer using the handle_response() function.

Start the server

Lastly, inside the src/main.rs file, start the server

File: src/main.rs

use async_std::sync::Arc;
use sea_orm::{
    sea_query::{Alias, ColumnDef, ForeignKey, ForeignKeyAction, Table},
    ConnectionTrait, Database, DbBackend,
};

// -- code snippet --

#[async_std::main]
async fn main() -> anyhow::Result<()> {

	// -- code snippet --
	
	
    insert_fruits(&db).await?;
    insert_suppliers(&db).await?;

+	start_server(db).await?;

    Ok(())
}

Run the program using cargo run. It print the following to the terminal

$ Running `/media/su43/IGIED-01/Rust-Projects/SeaQL/SeaORM-TODO-App/target/debug/todo-server`
`CREATE TABLE fruits` "Operation Successful"
`CREATE TABLE suppliers` "Operation Successful"
`CREATE TABLE todos` "Operation Successful"
INSERTED FRUITS: InsertResult { last_insert_id: 1 }
INSERTED SUPPLIERS: InsertResult { last_insert_id: 1 }
Listening on 127.0.0.1:8080

The server is now listening on 127.0.0.1:8080 for incoming TcpStreams.

Next, we build the client.