Skip to content

Commit

Permalink
feat(kvsd_protocol): parse Message::Ping
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Nov 9, 2024
1 parent c9630e9 commit 8ca857b
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 65 deletions.
7 changes: 5 additions & 2 deletions crates/synd_kvsd_protocol/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,16 @@ where

#[cfg(test)]
mod tests {
use crate::message::Authenticate;
use crate::message::{Authenticate, Ping};

use super::*;

#[tokio::test]
async fn read_write() {
let messages = vec![Message::Authenticate(Authenticate::new("user", "pass"))];
let messages = vec![
Message::Authenticate(Authenticate::new("user", "pass")),
Message::Ping(Ping::new()),
];

let buf_size = 1024;
let (read, write) = tokio::io::duplex(buf_size);
Expand Down
2 changes: 2 additions & 0 deletions crates/synd_kvsd_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub use keyvalue::{Key, KeyValue, KeyValueError, Value};
mod connection;
pub use connection::Connection;
mod message;

pub(crate) type Time = chrono::DateTime<chrono::Utc>;
76 changes: 18 additions & 58 deletions crates/synd_kvsd_protocol/src/message/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,63 +30,6 @@ pub(crate) enum Frame {
}

impl Frame {
// TODO: remove
/*
fn read(src: &mut Cursor) -> Result<Frame, FrameError> {
match src.u8()? {
prefix::MESSAGE_START => Ok(Frame::MessageStart),
prefix::FRAME_LENGTH => {
let len = src.u64()?;
Ok(Frame::Length(len))
}
prefix::MESSAGE_TYPE => MessageType::try_from(src.u8()?)
.map_err(FrameError::InvalidMessageType)
.map(Frame::MessageType),
prefix::STRING => {
#[allow(clippy::cast_possible_truncation)]
let len = src.u64()? as usize;
let n = len + spec::DELIMITER.len();
if src.remaining() < n {
return Err(FrameError::Incomplete);
}
let string = std::str::from_utf8(&src.chunk()[..len])
.map_err(|e| FrameError::Invalid(e.to_string()))?
.to_owned();
src.skip(n)?;
Ok(Frame::String(string))
}
prefix::BYTES => {
#[allow(clippy::cast_possible_truncation)]
let len = src.u64()? as usize;
let n = len + spec::DELIMITER.len();
if src.remaining() < n {
return Err(FrameError::Incomplete);
}
let value = Vec::from(&src.chunk()[..len]);
src.skip(n)?;
Ok(Frame::Bytes(value))
}
prefix::TIME => {
use chrono::{DateTime, Utc};
let line = src.line()?.to_vec();
let string =
String::from_utf8(line).map_err(|e| FrameError::Invalid(e.to_string()))?;
Ok(Frame::Time(
DateTime::parse_from_rfc3339(&string)
.map(|dt| dt.with_timezone(&Utc))
.unwrap(),
))
}
prefix::NULL => Ok(Frame::Null),
_ => unreachable!(),
}
}
*/

pub(crate) async fn write<W>(self, mut writer: W) -> Result<(), io::Error>
where
W: AsyncWriteExt + Unpin,
Expand Down Expand Up @@ -114,8 +57,10 @@ impl Frame {
writer.write_all(spec::DELIMITER).await
}
Frame::Time(val) => {
let val = val.to_rfc3339();
writer.write_u8(prefix::TIME).await?;
writer.write_all(val.to_rfc3339().as_bytes()).await?;
writer.write_u64(val.len() as u64).await?;
writer.write_all(val.as_bytes()).await?;
writer.write_all(spec::DELIMITER).await
}
Frame::Null => writer.write_u8(prefix::NULL).await,
Expand Down Expand Up @@ -149,4 +94,19 @@ impl MessageFrames {
pub(super) fn push_string(&mut self, s: impl Into<String>) {
self.0.push(Frame::String(s.into()));
}

pub(super) fn push_time(&mut self, time: Time) {
self.0.push(Frame::Time(time));
}

pub(super) fn push_time_or_null(&mut self, time: Option<Time>) {
match time {
Some(t) => self.push_time(t),
None => self.push_null(),
}
}

pub(super) fn push_null(&mut self) {
self.0.push(Frame::Null);
}
}
5 changes: 3 additions & 2 deletions crates/synd_kvsd_protocol/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub(crate) use frame::MessageFrames;
mod parse;
pub(crate) use parse::{ParseError, Parser};
mod payload;
pub use payload::authenticate::Authenticate;
pub use payload::{Authenticate, Ping};
use tokio::io::AsyncWriteExt;
mod spec;

Expand Down Expand Up @@ -54,7 +54,7 @@ impl TryFrom<u8> for MessageType {

#[derive(Debug, Clone, PartialEq)]
pub enum Message {
// Ping(Ping),
Ping(Ping),
Authenticate(Authenticate),
// Success(Success),
// Fail(Fail),
Expand All @@ -66,6 +66,7 @@ pub enum Message {
impl From<Message> for MessageFrames {
fn from(message: Message) -> Self {
match message {
Message::Ping(m) => m.into(),
Message::Authenticate(m) => m.into(),
}
}
Expand Down
62 changes: 60 additions & 2 deletions crates/synd_kvsd_protocol/src/message/parse.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::string::FromUtf8Error;

use chrono::{DateTime, Utc};
use thiserror::Error;

use crate::message::{Authenticate, Message, MessageError, MessageType};
use crate::message::{frame::prefix, Authenticate, Message, MessageError, MessageType, Ping};

#[derive(Error, Debug)]
pub enum ParseError {
Expand Down Expand Up @@ -48,7 +49,38 @@ impl Parser {
let message_type = MessageType::try_from(message_type)?;

match message_type {
MessageType::Ping => todo!(),
MessageType::Ping => {
let parse_time = |input| -> Result<(&[u8], Option<DateTime<Utc>>), ParseError> {
let (input, t) =
parse::time(input).map_err(|err| ParseError::expect(err, "time"))?;
let rfc3339 = String::from_utf8(t.to_vec())?;
let t = DateTime::parse_from_rfc3339(&rfc3339).unwrap();
Ok((input, Some(t.with_timezone(&Utc))))
};

let (input, pre) =
parse::prefix(input).map_err(|err| ParseError::expect(err, "prefix"))?;

println!("prefix {pre}");

let (input, client) = match pre {
prefix::TIME => parse_time(input)?,
prefix::NULL => (input, None),
_ => unreachable!(),
};
let (input, server) = match pre {
prefix::TIME => parse_time(input)?,
prefix::NULL => (input, None),
_ => unreachable!(),
};
Ok((
input,
Message::Ping(Ping {
client_timestamp: client,
server_timestamp: server,
}),
))
}
MessageType::Authenticate => {
let (input, (username, password)) = parse::authenticate(input)
.map_err(|err| ParseError::expect(err, "message authenticate"))?;
Expand Down Expand Up @@ -94,6 +126,10 @@ mod parse {
pair(string, string).parse(input)
}

pub(super) fn prefix(input: &[u8]) -> IResult<&[u8], u8> {
u8(input)
}

fn delimiter(input: &[u8]) -> IResult<&[u8], ()> {
map(tag(spec::DELIMITER), |_| ()).parse(input)
}
Expand All @@ -108,8 +144,15 @@ mod parse {
terminated(take(len), delimiter).parse(input)
}

pub(super) fn time(input: &[u8]) -> IResult<&[u8], &[u8]> {
let (input, len) = preceded(tag([prefix::TIME].as_slice()), u64).parse(input)?;
terminated(take(len), delimiter).parse(input)
}

#[cfg(test)]
mod tests {
use chrono::DateTime;

use crate::message::{frame::Frame, MessageType};

use super::*;
Expand Down Expand Up @@ -171,5 +214,20 @@ mod parse {
let err = string(b"").unwrap_err();
assert!(err.is_incomplete());
}

#[tokio::test]
async fn parse_time_frame() {
let mut buf = Vec::new();
let t = DateTime::from_timestamp(1000, 0).unwrap();
let f = Frame::Time(t);
f.write(&mut buf).await.unwrap();

let (remain, parsed_time) = time(buf.as_slice()).unwrap();
assert_eq!(parsed_time, t.to_rfc3339().as_bytes());
assert!(remain.is_empty());

let err = time(b"").unwrap_err();
assert!(err.is_incomplete());
}
}
}
6 changes: 5 additions & 1 deletion crates/synd_kvsd_protocol/src/message/payload/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
pub(super) mod authenticate;
mod authenticate;
pub use authenticate::Authenticate;

mod ping;
pub use ping::Ping;
36 changes: 36 additions & 0 deletions crates/synd_kvsd_protocol/src/message/payload/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::{
message::{MessageFrames, MessageType},
Time,
};

#[derive(Debug, Clone, PartialEq)]
pub struct Ping {
pub client_timestamp: Option<Time>,
pub server_timestamp: Option<Time>,
}

impl Ping {
pub fn new() -> Self {
Self {
client_timestamp: None,
server_timestamp: None,
}
}
}

impl Default for Ping {
fn default() -> Self {
Self::new()
}
}

impl From<Ping> for MessageFrames {
fn from(ping: Ping) -> Self {
let mut frames = MessageFrames::new(MessageType::Ping, 2);

frames.push_time_or_null(ping.client_timestamp);
frames.push_time_or_null(ping.server_timestamp);

frames
}
}

0 comments on commit 8ca857b

Please sign in to comment.