Skip to content

Commit

Permalink
support ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
Nevermore1994 authored and rolf.tan committed Nov 27, 2023
1 parent 757785a commit ab4e4ad
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 63 deletions.
138 changes: 92 additions & 46 deletions Channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ concept IsSender = requires(T&& t) {
std::forward<T>(t)->done();
};

template <typename T>
concept IsReceiver = std::ranges::range<T> && requires(T&& t) {
typename T::ValueType;
std::forward<T>(t).tryReceive();
};

template <IsSender S>
struct SenderAdaptorClosure {
private:
Expand All @@ -58,8 +64,8 @@ struct SenderAdaptorClosure {

template <std::ranges::viewable_range InputView>
friend auto operator|(InputView &&lhs, SenderAdaptorClosure<S> self) {
self.sender_->send(lhs);
return std::ranges::empty_view<int>();
auto res = self.sender_->send(std::forward<InputView>(lhs));
return std::ranges::single_view<bool>(res);
}
};

Expand All @@ -78,21 +84,14 @@ class Sender;
template <typename T>
class Receiver;

template <typename T>
struct DestroyReceiver {
void operator()(Receiver<T>* receiver) {
delete receiver;
}
};

template <typename T>
using SenderPtr = std::unique_ptr<Sender<T>>;

template <typename T>
using SenderRefPtr = std::shared_ptr<Sender<T>>;

template <typename T>
using ReceiverPtr = std::unique_ptr<Receiver<T>, decltype(DestroyReceiver<T>())>;
using ReceiverPtr = std::unique_ptr<Receiver<T>>;

enum class ChannelEventType {
Unknown,
Expand All @@ -113,9 +112,7 @@ class Channel {
auto p = std::shared_ptr<Channel<T>>(new Channel<T>(), [](Channel<T>* channel) {
delete channel;
});
return {std::make_unique<Sender<T>>(p),
ReceiverPtr<T>(new Receiver<T>(p), DestroyReceiver<T>())
};
return { std::make_unique<Sender<T>>(p), std::make_unique<Receiver<T>>(p) };
}

inline auto done() noexcept -> void {
Expand Down Expand Up @@ -317,21 +314,20 @@ template <typename T>
class ReceiverIterator;

template <typename T>
class Receiver final {
private:
friend class DestroyReceiver<T>;
class ConstReceiverIterator;

template <typename T>
class Receiver final : public std::ranges::view_interface<Receiver<T>>{
private:
friend class ReceiverIterator<T>;

friend class ConstReceiverIterator<T>;
private:
class ReceiverImpl {
public:
friend class Channel<T>;

friend class ReceiverIterator<T>;

friend class DestroyReceiver<T>;

explicit ReceiverImpl(std::shared_ptr<Channel<T>> channel)
: channel_(channel) {

Expand All @@ -341,9 +337,23 @@ class Receiver final {
channel_->done();
}

ReceiverImpl& operator=(const ReceiverImpl&) = delete;
inline auto operator=(const ReceiverImpl& src) noexcept -> ReceiverImpl& {
channel_ = src.channel_;
}

ReceiverImpl(const ReceiverImpl& src) noexcept
: channel_(src.channel_) {

}

inline auto operator=(ReceiverImpl&& src) noexcept -> ReceiverImpl& {
channel_ = std::move(src.channel_);
}

ReceiverImpl(const ReceiverImpl&) = delete;
ReceiverImpl(ReceiverImpl&& src) noexcept
: channel_(std::move(src.channel_)) {

}

inline auto receive() noexcept -> std::expected<T, ChannelEventType> {
std::unique_lock<std::mutex> lock(channel_->mutex_);
Expand Down Expand Up @@ -387,25 +397,39 @@ class Receiver final {
std::shared_ptr<Channel<T>> channel_;
};//end of class ReceiverImpl

private:
std::shared_ptr<ReceiverImpl> impl_;

private:
~Receiver() = default;

public:
using ValueType = T;
explicit Receiver(std::shared_ptr<Channel<T>> channel)
: impl_(std::make_shared<ReceiverImpl>(channel)) {

}

Receiver<T>& operator=(const Receiver<T>&) = delete;
~Receiver() noexcept = default;

Receiver(const Receiver<T>&) = delete;
inline auto operator=(const Receiver<T>& src) -> Receiver<T>& {
impl_ = src.impl_;
}

Receiver(const Receiver<T>& src) noexcept
: impl_(src.impl_) {

}

inline auto operator=(Receiver<T>&& src) noexcept -> Receiver<T>& {
impl_ = std::move(src.impl_);
}

Receiver(Receiver<T>&& src) noexcept
: impl_(std::move(src.impl_)) {

}

inline auto begin() noexcept -> ReceiverIterator<T> {
return ReceiverIterator<T>(impl_, ChannelEventType::Unknown);
auto it = ReceiverIterator<T>(impl_, ChannelEventType::Unknown);
it.value_ = impl_->receive();
return it;
}

inline auto end() noexcept -> ReceiverIterator<T> {
Expand All @@ -423,24 +447,22 @@ class Receiver final {
inline auto tryReceiveAll() noexcept -> std::list<T> {
return impl_->tryReceiveAll();
}

}; //end of class Receiver

template <typename T>
class ReceiverIterator {
friend class Receiver<T>;
protected:
std::shared_ptr<typename Receiver<T>::ReceiverImpl> receiverImpl_;
std::expected<T, ChannelEventType> value_;
private:
inline auto getValue() noexcept -> std::expected<T, ChannelEventType>& {
if (!value_.has_value() && value_.error() == ChannelEventType::Unknown) {
value_ = std::move(receiverImpl_->receive());
}
return value_;
}
mutable std::expected<T, ChannelEventType> value_;

public:
using iterator_category = std::input_iterator_tag;
using value_type = std::expected<T, ChannelEventType>;
using value_type = T;
using difference_type = std::ptrdiff_t;
using pointer = T*;
using reference = T&;

explicit ReceiverIterator(std::shared_ptr<typename Receiver<T>::ReceiverImpl> receiverImpl,
ChannelEventType eventType = ChannelEventType::Unknown)
Expand All @@ -449,34 +471,58 @@ class ReceiverIterator {

}

ReceiverIterator(const ReceiverIterator&) = delete;
ReceiverIterator()
: receiverImpl_(nullptr)
, value_(std::unexpected(ChannelEventType::Closed)) {

}

template <typename = std::enable_if_t<std::is_copy_constructible_v<T>>>
explicit ReceiverIterator(const ReceiverIterator& src) noexcept
: receiverImpl_(src.receiverImpl_)
, value_(src.value_) {

ReceiverIterator& operator=(const ReceiverIterator&) = delete;
}

ReceiverIterator(ReceiverIterator&& src) noexcept
template <typename = std::enable_if_t<std::is_copy_assignable_v<T>>>
ReceiverIterator& operator=(const ReceiverIterator& src) noexcept {
receiverImpl_ = src.receiverImpl_;
value_ = src.value_;
return *this;
}

template <typename = std::enable_if_t<std::is_move_constructible_v<T>>>
explicit ReceiverIterator(ReceiverIterator&& src) noexcept
: receiverImpl_(std::move(src.receiverImpl_))
, value_(std::move(src.value_)) {

}

template <typename = std::enable_if_t<std::is_move_assignable_v<T>>>
inline auto operator=(ReceiverIterator&& src) noexcept -> ReceiverIterator& {
receiverImpl_ = std::move(src.receiverImpl_);
value_ = std::move(src.value_);
return *this;
}

inline auto operator*() noexcept -> std::expected<T, ChannelEventType>& {
return getValue();
inline auto operator*() const noexcept -> reference {
return *value_;
}

inline auto operator++() noexcept -> ReceiverIterator& {
getValue();
value_ = std::move(receiverImpl_->receive());
return *this;
}

inline auto operator==(const ReceiverIterator& iter) noexcept -> bool {
const auto& lvalue = getValue();
inline auto operator++(int) noexcept -> ReceiverIterator {
auto temp = std::move(*this);
*this = ReceiverIterator(receiverImpl_);
this->value_ = receiverImpl_->receive();
return temp;
}

inline auto operator==(const ReceiverIterator& iter) const noexcept -> bool {
const auto& lvalue = value_;
const auto& rvalue = iter.value_;
if (lvalue.has_value() && rvalue.has_value()) {
return &lvalue.value() == &rvalue.value();
Expand All @@ -487,7 +533,7 @@ class ReceiverIterator {
return lvalue.error() == rvalue.error();
}

inline auto operator!=(const ReceiverIterator& iter) noexcept -> bool {
inline auto operator!=(const ReceiverIterator& iter) const noexcept -> bool {
return !operator==(iter);
}
}; //end of class ReceiverIterator
Expand Down
48 changes: 33 additions & 15 deletions example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <random>
#include <thread>
#include "Channel.hpp"

using namespace Async;
using namespace std::string_literals;

int32_t randomCommon(int32_t min, int32_t max) {
std::random_device rd;
Expand Down Expand Up @@ -37,29 +39,40 @@ struct People {
, id(id_) {

}
People() = default;
};


int main() {
using type = std::unordered_map<int, People>;
auto [sp, rp] = Channel<People>::create();
std::thread t([rp = std::move(rp)]{
//block
for (auto& res : *rp) {
auto& people = *res;
std::cout << " receive interval:" << (timestamp() - people.timestamp)
<< "ns, age:" << people.age << ", id:" << people.id << std::endl;
}
// for (;;) {
// auto res = rp->tryReceive(); // No blocking
// auto res = rp->receive(); // block
// auto res = rp->tryReceiveAll(); //No blocking
std::thread t([rp = std::move(rp)] {
// //can use for range
// for (auto& people : *rp) {
// std::cout << " receive interval:" << (timestamp() - people.timestamp)
// << "ns, age:" << people.age << ", id:" << people.id << std::endl;
// }

//can use ranges
// auto func = [](auto& ex) {
// return ex.id % 2 == 0;
// };
// for (const auto& people : *rp | std::views::filter(func)) {
// std::cout << " receive interval:" << (timestamp() - people.timestamp) << "ns, age:" << people.age << ", id:" << people.id << std::endl;
// }

//can use STL algorithm
// std::vector<People> values;
// std::move(rp->begin(), rp->end(), std::back_inserter(values));
// for(auto& people: values) {
// std::cout << " receive interval:" << (timestamp() - people.timestamp) << "ns, age:" << people.age << ", id:"
// << people.id << std::endl;
// }
});

//send single message
sp->send(People{timestamp(), randomAge(), randomId()});
sp->send(People{timestamp(), randomAge(), randomId()});
sp << People{timestamp(), randomAge(), randomId()};
sp << (People{timestamp(), randomAge(), randomId()});
sp << People{timestamp(), randomAge(), randomId()} << (People{timestamp(), randomAge(), randomId()});

//send multi message
std::vector<People> peoples;
Expand All @@ -73,8 +86,13 @@ int main() {
for(int i = 0; i < 10; i++) {
peoples.emplace_back(timestamp(), randomAge(), randomId());
}

//can use ranges
sp << (peoples | std::views::take(3)); // << higher priority than |
peoples | std::views::drop(4) | SenderView(sp); // SenderView
for (bool p : peoples | std::views::drop(4) | SenderView(sp)){
//p = true, send success
std::cout << "send ranges:" << std::boolalpha << p << std::endl;
};
sp->done();
t.join();
return 0;
Expand Down
4 changes: 2 additions & 2 deletions tests/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ TEST(ChannelTest, Range) {
std::thread t1([rp = std::move(rp)]{
type value = 0;
for(auto res : *rp) {
EXPECT_EQ(*res, value);
EXPECT_EQ(res, value);
value++;
}
});
Expand Down Expand Up @@ -234,7 +234,7 @@ TEST(ChannelTest, ImplicitConversion) {
std::thread t1([rp = std::move(rp)]{
int value = 0;
for(auto res : *rp) {
EXPECT_EQ(res.value().value, value);
EXPECT_EQ(res.value, value);
value++;
}
});
Expand Down

0 comments on commit ab4e4ad

Please sign in to comment.