Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add protobuf/echo/ProtoMessageBuilder #416

Merged
merged 25 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
caf1d6d
feat: add protobuf/echo/ProtoMessageBuilder
richardapeters Sep 14, 2023
3659327
ProtoCEchoPlugin: Fix superfluous typename
richardapeters Sep 14, 2023
4850e98
protobuf/echoProtoMessageBuilder: Parse a lot of different types
richardapeters Sep 17, 2023
7ad854d
protobuf/echoProtoMessageBuilder: Parse enums, strings, extract Bufer…
richardapeters Sep 17, 2023
7af4bfd
Resolve code warnings, increase coverage
richardapeters Sep 17, 2023
7143d43
protobuf/echo: Add BufferingStreamWriter
richardapeters Sep 18, 2023
9f6c3ad
protobuf/echo: Add half of ProtoMessageSender
richardapeters Sep 18, 2023
5c9f178
protobuf/echo/ProtoMessageSender: Serialize lots of types
richardapeters Sep 21, 2023
2b44b0f
protobuf/echo/ProtoMessageSender: Fix missing template keyword
richardapeters Sep 21, 2023
50f67fa
protobuf/echo/ProtoMessageSender: Serialize lots of types
richardapeters Sep 21, 2023
f3b4b37
protobuf/protoc_echo_plugin/ProtoCEchoPlugin: Modify result of Messag…
richardapeters Sep 21, 2023
7ff3ff4
Apply suggestions from code review
richardapeters Sep 21, 2023
4d866f2
protobuf/echo/BufferingStreamReader: Make BufferingStreamReader accep…
richardapeters Sep 22, 2023
e216526
protobuf/echo/BufferingStreamReader: Remove useless comment
richardapeters Sep 22, 2023
88b5d8a
Resolve Sonar warnings
richardapeters Sep 23, 2023
67182ec
Resolve Sonar warnings
richardapeters Sep 24, 2023
62b5a07
Move BufferingSteamReader and BufferingStreamWriter to infra/stream
richardapeters Sep 25, 2023
d6cf3e4
infra/stream/test: Add tests for BufferingStreamReader and BufferingS…
richardapeters Sep 26, 2023
cf83d99
Apply suggestions from code review
richardapeters Sep 26, 2023
7ec732d
Resolve Sonar warnings
richardapeters Sep 26, 2023
3625a48
Resolve Sonar warnings
richardapeters Sep 27, 2023
a0b2451
Resolve Sonar warnings
richardapeters Sep 27, 2023
2b8cfea
Reduce duplication
richardapeters Sep 28, 2023
84d2f94
Update services/network/test_doubles/Certificates.cpp
richardapeters Sep 28, 2023
9cc8c23
Merge branch 'main' into feature/proto_message_builder
rjaegers Sep 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions infra/stream/BufferingStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include "infra/stream/BufferingStreamReader.hpp"

namespace infra
{
BufferingStreamReader::BufferingStreamReader(infra::BoundedDeque<uint8_t>& buffer, infra::StreamReaderWithRewinding& input)
: buffer(buffer)
, input(input)
{}

BufferingStreamReader::~BufferingStreamReader()
{
StoreRemainder();
}

void BufferingStreamReader::Extract(infra::ByteRange range, infra::StreamErrorPolicy& errorPolicy)
{
if (index != buffer.size())
{
auto from = infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size());
infra::Copy(from, infra::Head(range, from.size()));
range.pop_front(from.size());
index += from.size();
richardapeters marked this conversation as resolved.
Show resolved Hide resolved

// Perhaps the deque just wrapped around, try once more
from = infra::Head(buffer.contiguous_range(buffer.begin() + index), range.size());
infra::Copy(from, infra::Head(range, from.size()));
range.pop_front(from.size());
index += from.size();
}

if (!range.empty())
{
auto from = input.ExtractContiguousRange(range.size());
infra::Copy(from, infra::Head(range, from.size()));
range.pop_front(from.size());
index += from.size();
}

errorPolicy.ReportResult(range.empty());
}

uint8_t BufferingStreamReader::Peek(infra::StreamErrorPolicy& errorPolicy)
{
auto range = PeekContiguousRange(0);

errorPolicy.ReportResult(!range.empty());

if (range.empty())
return 0;
else
return range.front();
}

infra::ConstByteRange BufferingStreamReader::ExtractContiguousRange(std::size_t max)
{
if (index < buffer.size())
{
auto from = infra::Head(buffer.contiguous_range(buffer.begin() + index), max);
index += from.size();
return from;
}

auto from = input.ExtractContiguousRange(max);
return from;
richardapeters marked this conversation as resolved.
Show resolved Hide resolved
}

infra::ConstByteRange BufferingStreamReader::PeekContiguousRange(std::size_t start)
{
if (index + start < buffer.size())
return buffer.contiguous_range(buffer.begin() + index + start);

return input.PeekContiguousRange(index + start - buffer.size());
}

bool BufferingStreamReader::Empty() const
{
return Available() == 0;
}

std::size_t BufferingStreamReader::Available() const
{
return buffer.size() + input.Available();
}

std::size_t BufferingStreamReader::ConstructSaveMarker() const
{
return index;
}

void BufferingStreamReader::Rewind(std::size_t marker)
{
if (index > buffer.size())
{
auto rewindAmount = std::min(index - marker, index - buffer.size());
input.Rewind(input.ConstructSaveMarker() - rewindAmount);
index -= rewindAmount;
}

if (marker < buffer.size())
index = marker;
}

void BufferingStreamReader::StoreRemainder()
{
std::size_t bufferDecrease = std::min(buffer.size(), index);
buffer.erase(buffer.begin(), buffer.begin() + bufferDecrease);
while (!input.Empty())
{
auto range = input.ExtractContiguousRange(std::numeric_limits<std::size_t>::max());
buffer.insert(buffer.end(), range.begin(), range.end());
}
}
}
38 changes: 38 additions & 0 deletions infra/stream/BufferingStreamReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#ifndef INFRA_BUFFERING_STREAM_READER_HPP
#define INFRA_BUFFERING_STREAM_READER_HPP

#include "infra/stream/InputStream.hpp"
#include "infra/util/BoundedDeque.hpp"

namespace infra
{
// Usage: Everything that is not read from the inputData is stored into the buffer upon destruction of the BufferingStreamReader
// Any data already present in the buffer is read first from the reader
class BufferingStreamReader
: public infra::StreamReaderWithRewinding
{
public:
BufferingStreamReader(infra::BoundedDeque<uint8_t>& buffer, infra::StreamReaderWithRewinding& input);
~BufferingStreamReader() override;

// Implementation of StreamReaderWithRewinding
void Extract(infra::ByteRange range, infra::StreamErrorPolicy& errorPolicy) override;
uint8_t Peek(infra::StreamErrorPolicy& errorPolicy) override;
infra::ConstByteRange ExtractContiguousRange(std::size_t max) override;
infra::ConstByteRange PeekContiguousRange(std::size_t start) override;
bool Empty() const override;
std::size_t Available() const override;
std::size_t ConstructSaveMarker() const override;
void Rewind(std::size_t marker) override;

private:
void StoreRemainder();

private:
infra::BoundedDeque<uint8_t>& buffer;
infra::StreamReaderWithRewinding& input;
std::size_t index = 0;
};
}

#endif
65 changes: 65 additions & 0 deletions infra/stream/BufferingStreamWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "infra/stream/BufferingStreamWriter.hpp"

namespace infra
{
BufferingStreamWriter::BufferingStreamWriter(infra::BoundedDeque<uint8_t>& buffer, infra::StreamWriter& output)
: buffer(buffer)
, output(output)
{
LoadRemainder();
}

void BufferingStreamWriter::Insert(infra::ConstByteRange range, infra::StreamErrorPolicy& errorPolicy)
{
auto first = infra::Head(range, output.Available());
output.Insert(first, errorPolicy);
index += first.size();
range.pop_front(first.size());

buffer.insert(buffer.end(), range.begin(), range.end());
index += range.size();
}

std::size_t BufferingStreamWriter::Available() const
{
return output.Available() + buffer.max_size() - buffer.size();
}

std::size_t BufferingStreamWriter::ConstructSaveMarker() const
{
return index;
}

std::size_t BufferingStreamWriter::GetProcessedBytesSince(std::size_t marker) const
{
return index - marker;
}

[[noreturn]] infra::ByteRange BufferingStreamWriter::SaveState(std::size_t marker)
{
std::abort();
}

[[noreturn]] void BufferingStreamWriter::RestoreState(infra::ByteRange range)
{
std::abort();
}

[[noreturn]] infra::ByteRange BufferingStreamWriter::Overwrite(std::size_t marker)
{
std::abort();
}

void BufferingStreamWriter::LoadRemainder()
{
infra::StreamErrorPolicy errorPolicy;
auto from = infra::Head(buffer.contiguous_range(buffer.begin()), output.Available());
output.Insert(from, errorPolicy);
buffer.erase(buffer.begin(), buffer.begin() + from.size());
from = infra::Head(buffer.contiguous_range(buffer.begin()), output.Available());
output.Insert(from, errorPolicy);
buffer.erase(buffer.begin(), buffer.begin() + from.size());

index = buffer.size();
}
}
36 changes: 36 additions & 0 deletions infra/stream/BufferingStreamWriter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#ifndef INFRA_BUFFERING_STREAM_WRITER_HPP
#define INFRA_BUFFERING_STREAM_WRITER_HPP

#include "infra/stream/OutputStream.hpp"
#include "infra/util/BoundedDeque.hpp"

namespace infra
{
// Usage: Any data that does not fit into the output stream is written to the buffer
// Any data already present in the buffer is written to the output stream upon construction of BufferingStreamWriter
class BufferingStreamWriter
: public infra::StreamWriter
{
public:
BufferingStreamWriter(infra::BoundedDeque<uint8_t>& buffer, infra::StreamWriter& output);

// Implementation of StreamWriter
void Insert(infra::ConstByteRange range, infra::StreamErrorPolicy& errorPolicy) override;
std::size_t Available() const override;
std::size_t ConstructSaveMarker() const override;
std::size_t GetProcessedBytesSince(std::size_t marker) const override;
[[noreturn]] infra::ByteRange SaveState(std::size_t marker) override;
[[noreturn]] void RestoreState(infra::ByteRange range) override;
[[noreturn]] infra::ByteRange Overwrite(std::size_t marker) override;

private:
void LoadRemainder();

private:
infra::BoundedDeque<uint8_t>& buffer;
infra::StreamWriter& output;
std::size_t index = 0;
};
}

#endif
4 changes: 4 additions & 0 deletions infra/stream/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ target_sources(infra.stream PRIVATE
BoundedVectorInputStream.hpp
BoundedVectorOutputStream.cpp
BoundedVectorOutputStream.hpp
BufferingStreamReader.cpp
BufferingStreamReader.hpp
BufferingStreamWriter.cpp
BufferingStreamWriter.hpp
ByteInputStream.cpp
ByteInputStream.hpp
ByteOutputStream.cpp
Expand Down
2 changes: 2 additions & 0 deletions infra/stream/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ target_sources(infra.stream_test PRIVATE
StreamMock.hpp
TestBoundedDequeInputStream.cpp
TestBoundedVectorOutputStream.cpp
TestBufferingStreamReader.cpp
TestBufferingStreamWriter.cpp
TestByteInputStream.cpp
TestByteOutputStream.cpp
TestCountingInputStream.cpp
Expand Down
Loading
Loading