diff --git a/crates/synd_kvsd_protocol/src/connection.rs b/crates/synd_kvsd_protocol/src/connection.rs index 63b55a9..c4e2b31 100644 --- a/crates/synd_kvsd_protocol/src/connection.rs +++ b/crates/synd_kvsd_protocol/src/connection.rs @@ -129,13 +129,16 @@ where #[cfg(test)] mod tests { - use crate::message::Authenticate; + use crate::message::{Authenticate, Ping}; use super::*; #[tokio::test] async fn read_write() { - let messages = vec![Message::Authenticate(Authenticate::new("user", "pass"))]; + let messages = vec![ + Message::Authenticate(Authenticate::new("user", "pass")), + Message::Ping(Ping::new()), + ]; let buf_size = 1024; let (read, write) = tokio::io::duplex(buf_size); diff --git a/crates/synd_kvsd_protocol/src/lib.rs b/crates/synd_kvsd_protocol/src/lib.rs index 37e85e9..4d49a2a 100644 --- a/crates/synd_kvsd_protocol/src/lib.rs +++ b/crates/synd_kvsd_protocol/src/lib.rs @@ -3,3 +3,5 @@ pub use keyvalue::{Key, KeyValue, KeyValueError, Value}; mod connection; pub use connection::Connection; mod message; + +pub(crate) type Time = chrono::DateTime; diff --git a/crates/synd_kvsd_protocol/src/message/frame.rs b/crates/synd_kvsd_protocol/src/message/frame.rs index a719f24..9bfba5d 100644 --- a/crates/synd_kvsd_protocol/src/message/frame.rs +++ b/crates/synd_kvsd_protocol/src/message/frame.rs @@ -30,63 +30,6 @@ pub(crate) enum Frame { } impl Frame { - // TODO: remove - /* - fn read(src: &mut Cursor) -> Result { - match src.u8()? { - prefix::MESSAGE_START => Ok(Frame::MessageStart), - prefix::FRAME_LENGTH => { - let len = src.u64()?; - Ok(Frame::Length(len)) - } - prefix::MESSAGE_TYPE => MessageType::try_from(src.u8()?) - .map_err(FrameError::InvalidMessageType) - .map(Frame::MessageType), - prefix::STRING => { - #[allow(clippy::cast_possible_truncation)] - let len = src.u64()? as usize; - let n = len + spec::DELIMITER.len(); - if src.remaining() < n { - return Err(FrameError::Incomplete); - } - let string = std::str::from_utf8(&src.chunk()[..len]) - .map_err(|e| FrameError::Invalid(e.to_string()))? - .to_owned(); - - src.skip(n)?; - - Ok(Frame::String(string)) - } - prefix::BYTES => { - #[allow(clippy::cast_possible_truncation)] - let len = src.u64()? as usize; - let n = len + spec::DELIMITER.len(); - if src.remaining() < n { - return Err(FrameError::Incomplete); - } - let value = Vec::from(&src.chunk()[..len]); - - src.skip(n)?; - - Ok(Frame::Bytes(value)) - } - prefix::TIME => { - use chrono::{DateTime, Utc}; - let line = src.line()?.to_vec(); - let string = - String::from_utf8(line).map_err(|e| FrameError::Invalid(e.to_string()))?; - Ok(Frame::Time( - DateTime::parse_from_rfc3339(&string) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap(), - )) - } - prefix::NULL => Ok(Frame::Null), - _ => unreachable!(), - } - } - */ - pub(crate) async fn write(self, mut writer: W) -> Result<(), io::Error> where W: AsyncWriteExt + Unpin, @@ -114,8 +57,10 @@ impl Frame { writer.write_all(spec::DELIMITER).await } Frame::Time(val) => { + let val = val.to_rfc3339(); writer.write_u8(prefix::TIME).await?; - writer.write_all(val.to_rfc3339().as_bytes()).await?; + writer.write_u64(val.len() as u64).await?; + writer.write_all(val.as_bytes()).await?; writer.write_all(spec::DELIMITER).await } Frame::Null => writer.write_u8(prefix::NULL).await, @@ -149,4 +94,19 @@ impl MessageFrames { pub(super) fn push_string(&mut self, s: impl Into) { self.0.push(Frame::String(s.into())); } + + pub(super) fn push_time(&mut self, time: Time) { + self.0.push(Frame::Time(time)); + } + + pub(super) fn push_time_or_null(&mut self, time: Option