Skip to content

Commit

Permalink
Adjusting logging and refactoring API's
Browse files Browse the repository at this point in the history
  • Loading branch information
gimesketvirtadieni committed Feb 3, 2018
1 parent c464290 commit 88943e5
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 106 deletions.
10 changes: 7 additions & 3 deletions src/SlimStreamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ int main(int argc, char *argv[])

try
{
// TODO: parametrize ports
unsigned int commandsPort{3484};
unsigned int streamingPort{9001};

// Callbacks objects 'glue' SlimProto Streamer with TCP Command Servers
auto streamerPtr{std::make_unique<Streamer>()};
auto commandServerPtr{std::make_unique<Server>(3484, 2, createCommandCallbacks(*streamerPtr))};
auto streamingServerPtr{std::make_unique<Server>(9001, 2, createStreamingCallbacks(*streamerPtr))};
auto streamerPtr{std::make_unique<Streamer>(streamingPort)};
auto commandServerPtr{std::make_unique<Server>(commandsPort, 2, createCommandCallbacks(*streamerPtr))};
auto streamingServerPtr{std::make_unique<Server>(streamingPort, 2, createStreamingCallbacks(*streamerPtr))};

// creating Scheduler object
auto schedulerPtr{std::make_unique<Scheduler>(createPipelines(*streamerPtr))};
Expand Down
2 changes: 1 addition & 1 deletion src/slim/Pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ namespace slim
{
source.start([]
{
LOG(ERROR) << "Buffer overflow: a chunk was skipped";
LOG(ERROR) << "Buffer overflow error: a chunk was skipped";
});
}

Expand Down
22 changes: 14 additions & 8 deletions src/slim/Scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ namespace slim
public:
explicit Scheduler(std::vector<Pipeline<Source, Destination>> p)
: pipelines{std::move(p)}
, processorProxyPtr{nullptr} {}
{
LOG(DEBUG) << LABELS{"slim"} << "Scheduler object was created (id=" << this << ")";
}

// using Rule Of Zero
~Scheduler() = default;
~Scheduler()
{
LOG(DEBUG) << LABELS{"slim"} << "Scheduler object was deleted (id=" << this << ")";
}

Scheduler(const Scheduler&) = delete; // non-copyable
Scheduler& operator=(const Scheduler&) = delete; // non-assignable
Scheduler(Scheduler&& rhs) = default;
Expand All @@ -52,18 +58,18 @@ namespace slim
{
[&]
{
LOG(DEBUG) << "Starting PCM data capture thread (id=" << this << ")";
LOG(DEBUG) << LABELS{"slim"} << "Starting PCM data capture thread (id=" << this << ")";

try
{
pipeline.start();
}
catch (const slim::Exception& error)
{
LOG(ERROR) << error;
LOG(ERROR) << LABELS{"slim"} << "Error while starting a pipeline: " << error;
}

LOG(DEBUG) << "Stopping PCM data capture thread (id=" << this << ")";
LOG(DEBUG) << LABELS{"slim"} << "Stopping PCM data capture thread (id=" << this << ")";
}
};

Expand All @@ -82,7 +88,7 @@ namespace slim
{
[&]
{
LOG(DEBUG) << "Starting streamer thread (id=" << this << ")";
LOG(DEBUG) << LABELS{"slim"} << "Starting streamer thread (id=" << this << ")";

for(auto producing{true}, available{true}; producing;)
{
Expand Down Expand Up @@ -116,7 +122,7 @@ namespace slim
}
}

LOG(DEBUG) << "Stopping streamer thread (id=" << this << ")";
LOG(DEBUG) << LABELS{"slim"} << "Stopping streamer thread (id=" << this << ")";
}
};

Expand All @@ -135,7 +141,7 @@ namespace slim
}
catch (const slim::Exception& error)
{
LOG(ERROR) << error;
LOG(ERROR) << LABELS{"slim"} << "Error while stopping a pipeline: " << error;
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/slim/proto/CommandSTRM.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ namespace slim
{
public:
CommandSTRM(CommandSelection commandSelection)
: CommandSTRM{commandSelection, 0, {}} {}
: CommandSTRM{commandSelection, 0, 0, {}} {}

CommandSTRM(CommandSelection commandSelection, unsigned int samplingRate, std::string clientID)
CommandSTRM(CommandSelection commandSelection, unsigned int port, unsigned int samplingRate, std::string clientID)
{
memset(&strm, 0, sizeof(STRM));
memcpy(&strm.data.opcode, "strm", sizeof(strm.data.opcode));
Expand All @@ -84,9 +84,7 @@ namespace slim

if (strm.data.command == static_cast<char>(CommandSelection::Start))
{
// TODO: crap
((unsigned char*)(&strm.data.serverPort))[0] = 35;
((unsigned char*)(&strm.data.serverPort))[1] = 41;
strm.data.serverPort = htons(port);
std::strcpy(strm.data.httpHeader, (std::string{"GET /stream.pcm?player="} += clientID).c_str());
}

Expand Down
25 changes: 13 additions & 12 deletions src/slim/proto/CommandSession.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#pragma once

#include <slim/util/ExpandableBuffer.hpp>
#include <chrono>
#include <cstddef> // std::size_t
#include <optional>
Expand All @@ -26,7 +27,7 @@
#include "slim/proto/CommandSTAT.hpp"
#include "slim/proto/CommandSTRM.hpp"
#include "slim/proto/StreamingSession.hpp"
#include "slim/util/Buffer.hpp"
#include "slim/util/ExpandableBuffer.hpp"
#include "slim/util/ScopeGuard.hpp"


Expand All @@ -42,7 +43,7 @@ namespace slim

public:
CommandSession(ConnectionType& c, std::string id)
: connection(c)
: connection{c}
, clientID{id}
, handlersMap
{
Expand Down Expand Up @@ -87,7 +88,7 @@ namespace slim

inline void onRequest(unsigned char* buffer, std::size_t size)
{
//LOG(DEBUG) << "SlimProto onRequest size=" << size;
//LOG(DEBUG) << LABELS{"proto"} << "SlimProto onRequest size=" << size;

// adding data to the buffer
commandBuffer.append(buffer, size);
Expand Down Expand Up @@ -117,7 +118,7 @@ namespace slim
}
else
{
LOG(DEBUG) << "Unsupported SlimProto command received (header='" << s << "')";
LOG(DEBUG) << LABELS{"proto"} << "Unsupported SlimProto command received (header='" << s << "')";

//for (unsigned int i = 0; i < size; i++)
//{
Expand Down Expand Up @@ -182,15 +183,15 @@ namespace slim
}
}

void stream(unsigned int samplingRate)
void stream(unsigned int port, unsigned int samplingRate)
{
send(CommandSTRM{CommandSelection::Start, samplingRate, getClientID()});
send(CommandSTRM{CommandSelection::Start, port, samplingRate, getClientID()});
}

protected:
inline auto onDSCO(unsigned char* buffer, std::size_t size)
{
LOG(DEBUG) << "DSCO command received";
LOG(DEBUG) << LABELS{"proto"} << "DSCO command received";

return size;
}
Expand All @@ -202,7 +203,7 @@ namespace slim
// if there is enough data to process HELO message
if (CommandHELO::isEnoughData(buffer, size))
{
LOG(INFO) << "HELO command received";
LOG(INFO) << LABELS{"proto"} << "HELO command received";

// deserializing HELO command
commandHELO = CommandHELO{buffer, size};
Expand All @@ -220,7 +221,7 @@ namespace slim

inline auto onRESP(unsigned char* buffer, std::size_t size)
{
LOG(DEBUG) << "RESP command received";
LOG(DEBUG) << LABELS{"proto"} << "RESP command received";

responseReceived = true;

Expand All @@ -242,12 +243,12 @@ namespace slim
auto event{commandSTAT.getEvent()};
if (!event.compare("STMc"))
{
LOG(DEBUG) << "STMc command received";
LOG(DEBUG) << LABELS{"proto"} << "STMc command received";
connectedReceived = true;
}
else
{
LOG(DEBUG) << event << " command received";
LOG(DEBUG) << LABELS{"proto"} << event << " command received";
}
}

Expand All @@ -271,7 +272,7 @@ namespace slim
StreamingSession<ConnectionType>* streamingSessionPtr{nullptr};
bool connectedReceived{false};
bool responseReceived{false};
util::Buffer commandBuffer{std:size_t{0}, std:size_t{2048}};
util::ExpandableBuffer commandBuffer{std:size_t{0}, std:size_t{2048}};
std::optional<CommandHELO> commandHELO{std::nullopt};
std::optional<TimePoint> lastPingAt{std::nullopt};
};
Expand Down
47 changes: 27 additions & 20 deletions src/slim/proto/Streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ namespace slim
using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;

public:
Streamer()
: timerThread{[&]
Streamer(unsigned int sp)
: streamingPort{sp}
, timerThread{[&]
{
LOG(DEBUG) << "Timer thread started";
LOG(DEBUG) << LABELS{"proto"} << "Timer thread started";

for(unsigned int counter{0}; timerRunning; counter++, std::this_thread::sleep_for(std::chrono::milliseconds{200}))
{
Expand All @@ -68,13 +69,18 @@ namespace slim
}
}

LOG(DEBUG) << "Timer thread stopped";
}} {}
LOG(DEBUG) << LABELS{"proto"} << "Timer thread stopped";
}}
{
LOG(DEBUG) << LABELS{"proto"} << "Streamer object was created (id=" << this << ")";
}

~Streamer()
{
timerRunning = false;
timerThread.join();

LOG(DEBUG) << LABELS{"proto"} << "Streamer object was deleted (id=" << this << ")";
}

Streamer(const Streamer&) = delete; // non-copyable
Expand All @@ -101,13 +107,13 @@ namespace slim
// deferring chunk transmition for at least for one quantum
streaming = false;

LOG(INFO) << "Initialize streaming";
LOG(INFO) << LABELS{"proto"} << "Initialize streaming";

// assigning new sampling rate and start streaming
samplingRate = sr;
for (auto& entry : commandSessions)
{
entry.second->stream(samplingRate);
entry.second->stream(streamingPort, samplingRate);
}
}

Expand All @@ -126,9 +132,9 @@ namespace slim
{
if (missingSessionsTotal)
{
LOG(WARNING) << "Could not defer chunk processing due to reached threashold";
LOG(WARNING) << LABELS{"proto"} << "Could not defer chunk processing due to reached threashold";
}
LOG(INFO) << "Start streaming";
LOG(INFO) << LABELS{"proto"} << "Start streaming";

streaming = true;

Expand All @@ -139,7 +145,7 @@ namespace slim
{
if (missingSessionsTotal)
{
LOG(DEBUG) << "Deferring chunk transmition due to missing HTTP sessions";
LOG(DEBUG) << LABELS{"proto"} << "Deferring chunk transmition due to missing HTTP sessions";
}

// TODO: implement cruise control; for now sleep is good enough
Expand All @@ -165,7 +171,7 @@ namespace slim
// if there are command sessions without relevant HTTP session
if (counter > 0)
{
LOG(WARNING) << "Current chunk transmition was skipped for " << counter << " client(s)";
LOG(WARNING) << LABELS{"proto"} << "Current chunk transmition was skipped for " << counter << " client(s)";
}
}

Expand All @@ -174,7 +180,7 @@ namespace slim

void onHTTPClose(ConnectionType& connection)
{
LOG(INFO) << "HTTP close callback";
LOG(INFO) << LABELS{"proto"} << "HTTP close callback";

auto streamingSessionPtr{removeSession(streamingSessions, connection)};
auto clientID{streamingSessionPtr->getClientID()};
Expand All @@ -195,7 +201,7 @@ namespace slim
session.onRequest(buffer, receivedSize);
}))
{
LOG(INFO) << "New HTTP session request received";
LOG(INFO) << LABELS{"proto"} << "New HTTP session request received";

try
{
Expand All @@ -213,7 +219,7 @@ namespace slim
throw slim::Exception("Missing client ID in HTTP request");
}

LOG(INFO) << "Client ID was parsed from HTTP request (clientID=" << clientID.value() << ")";
LOG(INFO) << LABELS{"proto"} << "Client ID was parsed from HTTP request (clientID=" << clientID.value() << ")";

// if there a SlimProto connection found that originated this HTTP request
auto commandSessionPtr{findCommandSession(clientID.value()).value_or(nullptr)};
Expand All @@ -231,7 +237,7 @@ namespace slim
}
catch (const slim::Exception& error)
{
LOG(ERROR) << "Incorrect HTTP session request: " << error.what();
LOG(ERROR) << LABELS{"proto"} << "Incorrect HTTP session request: " << error.what();
connection.stop();
}
}
Expand Down Expand Up @@ -266,7 +272,7 @@ namespace slim
// enable streaming for this session if required
if (streaming)
{
sessionPtr->stream(samplingRate);
sessionPtr->stream(streamingPort, samplingRate);
}

// saving command session in the map
Expand All @@ -275,7 +281,7 @@ namespace slim
}
catch (const slim::Exception& error)
{
LOG(ERROR) << "Error while processing SlimProto command: " << error.what();
LOG(ERROR) << LABELS{"proto"} << "Error while processing SlimProto command: " << error.what();
connection.stop();
}
}
Expand All @@ -302,12 +308,12 @@ namespace slim

// saving session in a map; using pointer to a relevant connection as an ID
sessions[&connection] = std::move(sessionPtr);
LOG(DEBUG) << LABELS{"slim"} << "New session was added (id=" << s << ", sessions=" << sessions.size() << ")";
LOG(DEBUG) << LABELS{"proto"} << "New session was added (id=" << s << ", sessions=" << sessions.size() << ")";
}
else
{
s = (*found).second.get();
LOG(INFO) << "Session already exists";
LOG(INFO) << LABELS{"proto"} << "Session already exists";
}

return *s;
Expand Down Expand Up @@ -378,13 +384,14 @@ namespace slim
{
sessionPtr = std::move((*found).second);
sessions.erase(found);
LOG(DEBUG) << LABELS{"slim"} << "Session was removed (id=" << sessionPtr.get() << ", sessions=" << sessions.size() << ")";
LOG(DEBUG) << LABELS{"proto"} << "Session was removed (id=" << sessionPtr.get() << ", sessions=" << sessions.size() << ")";
}

return std::move(sessionPtr);
}

private:
unsigned int streamingPort;
SessionsMap<CommandSession<ConnectionType>> commandSessions;
SessionsMap<StreamingSession<ConnectionType>> streamingSessions;
bool streaming{false};
Expand Down
Loading

0 comments on commit 88943e5

Please sign in to comment.