-
Notifications
You must be signed in to change notification settings - Fork 2
/
msg_comm.hpp
125 lines (106 loc) · 2.59 KB
/
msg_comm.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#ifndef MSG_COMM_HPP
#define MSG_COMM_HPP
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <type_traits>
#include <atomic>
#include <string>
#include <utility>
#include <cstring>
#include <iostream>
#include "ipc_lock.h"
// Reside in user process memory
struct MsgInfo
{
int fd = -1;
void *mem = nullptr;
// Total size in shared memory
std::size_t size = 0;
std::string name;
size_t type;
};
// TODO: lock-free implementation
class HeaderConn
{
public:
std::uint32_t GetConnectId()
{
// If connection slot is full
if ((curr_mask_ + 1) == 0)
{
return 0;
}
// find the first 0, and set it to 1.
std::uint32_t next = curr_mask_ | (curr_mask_ + 1);
std::uint32_t connected_id = next ^ curr_mask_;
curr_mask_ = next;
return connected_id;
}
// Return the mask after removing the disconnected id
std::uint32_t DisconnectId(std::uint32_t id)
{
curr_mask_ = (curr_mask_ & ~id) & ~id;
return curr_mask_;
}
std::size_t ConnCount()
{
std::uint32_t mask = curr_mask_;
std::uint32_t cnt;
for (cnt = 0; curr_mask_; ++cnt)
{
curr_mask_ &= curr_mask_ - 1;
}
return cnt;
}
std::uint32_t CurConn() const
{
return curr_mask_;
}
bool IsConnected(std::uint32_t rid)
{
return ((curr_mask_ | ~rid) & rid) != 0;
}
private:
std::uint32_t curr_mask_ = 0;
};
// Reside in shared memory header for synchronization
struct alignas(64) MsgHeader
{
// Indicates whether the shared memory should shut down in case of dead writer
std::atomic_bool shut_down = ATOMIC_VAR_INIT(false);
Mutex mutex;
ConditionVar cond_not_empty;
// Internal circular buffer
std::size_t capacity;
std::size_t size;
std::size_t wi = 0; // TODO: not thread-safe
size_t type_hash;
// Connected readers
HeaderConn conn;
bool IsEqualWi(std::size_t ri)
{
return ri % capacity == wi;
}
std::size_t IncRi(std::size_t &ri)
{
ri = (ri + 1) % capacity;
return ri;
}
};
template <typename T>
struct Item
{
// Uninitialized memory blocks to hold the object
typename std::aligned_storage<sizeof(T), alignof(T)>::type data{};
std::uint32_t rc{0}; // Reader flags, bit 0 for being read, 1 for not read
};
std::size_t GetTotalSize(std::size_t len, std::size_t data_size)
{
return sizeof(MsgHeader) + len * data_size;
}
#endif