-
Notifications
You must be signed in to change notification settings - Fork 0
/
RealtimeStream.lua
197 lines (163 loc) · 5.17 KB
/
RealtimeStream.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
--[[
RealtimeStream.lua
Copyright (c) 2018, Xamla and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
--]]
local torch = require 'torch'
local ros = require 'ros'
local socket = require 'socket'
local convertToBigEndianReader = require 'BigEndianAdapter'
local ur = require 'ur_env'
local RealtimeStream = torch.class('RealtimeStream')
local DEFAULT_REALTIME_PORT = 30003
local STASH_SIZE = 2048
local MAX_PACKAGE_SIZE = 1060
local HEADER_SIZE = 4
local CONNECT_TIMEOUT = 1.0
local RECONNECT_WAIT = 0.1
local READ_TIMEOUT = 0.004
local RECEIVE_BUFFER_SIZE = 4096
local RealtimeStreamState = {
Disconnected = 1,
Connected = 2,
Error = 3
}
ur.RealtimeStreamState = RealtimeStreamState
local ReadState = {
HEADER = 1,
PAYLOAD = 2,
ERROR = 3
}
--- realtimeState is target that receives state updates
function RealtimeStream:__init(realtimeState, logger)
self.realtimeState = realtimeState
self.logger = logger or ur.DEFAULT_LOGGER
self.stash_offset = 0
self.stash = torch.ByteTensor(STASH_SIZE)
self.stash_reader = ros.StorageReader(self.stash:storage())
convertToBigEndianReader(self.stash_reader)
self:resetStash()
self.client = socket.tcp()
self.client:setoption('tcp-nodelay', true)
self.state = RealtimeStreamState.Disconnected
self.readTimeouts = 0
end
function RealtimeStream:connect(hostname, port)
self.client:settimeout(CONNECT_TIMEOUT)
local ok, err = self.client:connect(hostname, port or DEFAULT_REALTIME_PORT)
if not ok then
self.logger.error('Connecting failed, error: ' .. err)
self.client:close()
sys.sleep(RECONNECT_WAIT)
self.client = socket.tcp()
return false
end
self.client:settimeout(READ_TIMEOUT, 't')
self.state = RealtimeStreamState.Connected
self.readTimeouts = 0
return true
end
function RealtimeStream:getState()
return self.state
end
function RealtimeStream:getSocket()
return self.client
end
function RealtimeStream:close(abortive)
if self.client ~= nil then
if self.state == RealtimeStreamState.Connected then
self.client:shutdown('send')
if abortive then
self.client:setoption('linger', { on = true, timeout = 0 })
end
end
self.client:close()
self.client = nil
end
end
local function processReceivedBlock(self, block)
local updated = false
local r = self.stash_reader
local buf = torch.ByteTensor(torch.ByteStorage():string(block), 1, #block)
local buf_len = buf:size(1)
local offset, len = 0, 0
while offset < buf_len do
local avail = buf_len - offset
local copy_len = math.min(avail, self.remaining_bytes)
self.stash[{{self.stash_offset+1, self.stash_offset + copy_len}}] = buf[{{offset+1, offset+copy_len}}]
self.stash_offset = self.stash_offset + copy_len
offset = offset + copy_len
self.remaining_bytes = self.remaining_bytes - copy_len
if self.remaining_bytes > 0 then
return
end
if self.stream_state == ReadState.HEADER then
local len = r:readUInt32()
if len <= MAX_PACKAGE_SIZE then
self.stream_state = ReadState.PAYLOAD
self.remaining_bytes = len - 4
else
self.logger.error('[RealtimeStream] Received realtime package has invalid size.')
self.stream_state = ReadState.ERROR
return false
end
elseif self.stream_state == ReadState.PAYLOAD then
local payload_reader = ros.StorageReader(self.stash:storage())
convertToBigEndianReader(payload_reader)
self.realtimeState:read(payload_reader)
self:resetStash()
updated = true
else
error('[RealtimeStream] Unexpected read state.') -- should never happen
end
end
return updated
end
function RealtimeStream:read()
if self.state ~= RealtimeStreamState.Connected then
return false
end
local r, e, p = self.client:receive(RECEIVE_BUFFER_SIZE)
if e == 'timeout' then
r = p
end
if not r then
self.logger.error('[RealtimeStream] Receive failure: ' .. e)
self.state = RealtimeStreamState.Error
return false
end
local updated = false
if #r > 0 then
updated = processReceivedBlock(self, r)
if self.stream_state == ReadState.ERROR then
self.state = RealtimeStreamState.Error
return false
end
end
if updated then
self.readTimeouts = 0
else
self.readTimeouts = self.readTimeouts + 1
end
return updated
end
function RealtimeStream:resetStash()
self.stream_state = ReadState.HEADER
self.remaining_bytes = HEADER_SIZE
self.stash:fill(0)
self.stash_offset = 0
self.stash_reader:setOffset(0)
end
function RealtimeStream:send(msg)
return self.client:send(msg)
end