Skip to content

Commit

Permalink
refactor(kvsd_protocol): define own Cursor instead of type alias
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Oct 1, 2024
1 parent b97f282 commit 0200f74
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 105 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ either = { version = "1.13.0" }
fake = { version = "2.10.0", features = ["derive", "chrono"] }
fdlimit = { version = "0.3.0", default-features = false }
feed-rs = { version = "1.5", default-features = false }
futures = { version = "0.3.30" }
futures-util = { version = "0.3.30", default-features = false }
graphql_client = { version = "0.13.0", default-features = false }
headers = { version = "0.4.0" }
Expand Down
6 changes: 3 additions & 3 deletions crates/synd_kvsd_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ name = "synd-kvsd-protocol"
version = "0.1.0"

[dependencies]
# TODO: use latest
atoi = { version = "0.3.3" }
atoi = { version = "2.0.0" }
bytes = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["net"] }
tokio = { workspace = true, features = ["net", "time", "io-util"] }

[lints]
workspace = true
Expand Down
22 changes: 11 additions & 11 deletions crates/synd_kvsd_protocol/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{
io::{self, Cursor},
io::{self},
time::Duration,
};

use bytes::{Buf as _, BytesMut};
use futures::TryFutureExt as _;
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncReadExt as _, AsyncWrite, BufWriter},
net::TcpStream,
time::error::Elapsed,
};

use crate::message::{FrameError, Message, MessageError, MessageFrames};
use crate::message::{Cursor, FrameError, Message, MessageError, MessageFrames};

#[derive(Error, Debug)]
pub enum ConnectionError {
Expand Down Expand Up @@ -72,10 +73,9 @@ where
&mut self,
duration: Duration,
) -> Result<Option<Message>, ConnectionError> {
match tokio::time::timeout(duration, self.read_message()).await {
Ok(read_result) => read_result,
Err(elapsed) => Err(ConnectionError::read_timeout(elapsed)),
}
tokio::time::timeout(duration, self.read_message())
.map_err(ConnectionError::read_timeout)
.await?
}

pub async fn read_message(&mut self) -> Result<Option<Message>, ConnectionError> {
Expand Down Expand Up @@ -111,14 +111,14 @@ where
fn parse_message_frames(&mut self) -> Result<Option<MessageFrames>, ConnectionError> {
use FrameError::Incomplete;

let mut buf = Cursor::new(&self.buffer[..]);
let mut cursor = Cursor::new(&self.buffer[..]);

match MessageFrames::check_parse(&mut buf) {
match MessageFrames::check_parse(&mut cursor) {
Ok(()) => {
#[allow(clippy::cast_possible_truncation)]
let len = buf.position() as usize;
buf.set_position(0);
let message_frames = MessageFrames::parse(&mut buf)
let len = cursor.position() as usize;
cursor.set_position(0);
let message_frames = MessageFrames::parse(&mut cursor)
.map_err(ConnectionError::parse_message_frames)?;
self.buffer.advance(len);

Expand Down
108 changes: 108 additions & 0 deletions crates/synd_kvsd_protocol/src/message/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::io;

use bytes::Buf;

use crate::message::{spec, FrameError};

pub(crate) struct Cursor<'a> {
cursor: io::Cursor<&'a [u8]>,
}

impl<'a> Cursor<'a> {
pub(crate) fn new(buf: &'a [u8]) -> Self {
Self {
cursor: io::Cursor::new(buf),
}
}

pub(crate) fn position(&self) -> u64 {
self.cursor.position()
}

pub(crate) fn set_position(&mut self, pos: u64) {
self.cursor.set_position(pos);
}

pub(super) fn skip(&mut self, n: usize) -> Result<(), FrameError> {
if self.cursor.remaining() < n {
Err(FrameError::Incomplete)
} else {
self.cursor.advance(n);
Ok(())
}
}

pub(super) fn remaining(&self) -> usize {
self.cursor.remaining()
}

pub(super) fn chunk(&self) -> &[u8] {
self.cursor.chunk()
}

pub(super) fn u8(&mut self) -> Result<u8, FrameError> {
if self.cursor.has_remaining() {
Ok(self.cursor.get_u8())
} else {
Err(FrameError::Incomplete)
}
}

pub(super) fn u64(&mut self) -> Result<u64, FrameError> {
let line = self.line()?;
atoi::atoi::<u64>(line).ok_or_else(|| FrameError::Invalid("invalid u64".into()))
}

/// Return the buffer up to the line delimiter.
/// If the line delimiter is not found within the buffer, return [`FrameError::Incomplete`].
/// When the line delimiter is found, set the cursor position to the next position after the line delimiter
/// so that subsequent reads do not need to be aware of the line delimiter.
pub(super) fn line(&mut self) -> Result<&'a [u8], FrameError> {
let slice = *self.cursor.get_ref();
#[allow(clippy::cast_possible_truncation)]
let start = self.cursor.position() as usize;
let end = slice.len() - (spec::DELIMITER.len() - 1);

for i in start..end {
if &slice[i..i + spec::DELIMITER.len()] == spec::DELIMITER {
self.cursor.set_position((i + spec::DELIMITER.len()) as u64);
return Ok(&slice[start..i]);
}
}

Err(FrameError::Incomplete)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn u8() {
let buf = [128, 129];
let mut cursor = Cursor::new(&buf[..]);
assert_eq!(cursor.u8(), Ok(128));
assert_eq!(cursor.u8(), Ok(129));
assert_eq!(cursor.u8(), Err(FrameError::Incomplete));
assert_eq!(cursor.u8(), Err(FrameError::Incomplete));
}

#[test]
fn line() {
let buf = [b'x', b'x', b'\r', b'\n', b'y'];
let mut cursor = Cursor::new(&buf[..]);
assert_eq!(cursor.line(), Ok([b'x', b'x'].as_slice()));
assert_eq!(cursor.line(), Err(FrameError::Incomplete));
assert_eq!(cursor.line(), Err(FrameError::Incomplete));
}

#[test]
fn skip() {
let buf = [b'_', b'_', b'a'];
let mut cursor = Cursor::new(&buf[..]);
assert_eq!(cursor.skip(2), Ok(()));
assert_eq!(cursor.u8(), Ok(b'a'));
assert_eq!(cursor.skip(1), Err(FrameError::Incomplete));
}
}
Loading

0 comments on commit 0200f74

Please sign in to comment.