aboutsummaryrefslogtreecommitdiff
path: root/src/client/websocketpp_client.cpp
blob: 519934371264bd399733298850a535a735c97074 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <nlohmann/json.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio_client.hpp>

#include "web_socket_client.hpp"

using nlohmann::json;
using std::error_code;
using std::function;
using std::lock_guard;
using std::make_tuple;
using std::mutex;
using std::string;
using std::tuple;
using std::unordered_map;

namespace client
{
/**
 * @brief An implementation of the `IWebSocketClient` interface that uses the WebSocket++ library.
 */
class WebsocketppClient : public IWebSocketClient
{
public:
    void start() override
    { 
        this->_client.init_asio();
        this->_client.start_perpetual();
    };

    void stop() override
    {
        this->_client.stop_perpetual();
        this->_client.stop();
    };

    void openConnection(string uri) override
    {
        error_code error;
        websocketpp_client::connection_ptr connection = this->_client.get_connection(uri, error);

        if (error.value() == -1)    
        {
            // PLOG_ERROR << "Error connecting to relay " << relay << ": " << error.message();
        }

        // Configure the connection here via the connection pointer.
        connection->set_fail_handler([this, uri](auto handle) {
            // PLOG_ERROR << "Error connecting to relay " << relay << ": Handshake failed.";
            lock_guard<mutex> lock(this->_propertyMutex);
            if (this->isConnected(uri))
            {
                this->_connectionHandles.erase(uri);
            }
        });

        lock_guard<mutex> lock(this->_propertyMutex);
        this->_connectionHandles[uri] = connection->get_handle();
        this->_client.connect(connection);
    };

    bool isConnected(string uri) override
    {
        lock_guard<mutex> lock(this->_propertyMutex);
        return this->_connectionHandles.find(uri) != this->_connectionHandles.end();
    };

    tuple<string, bool> send(string message, string uri) override
    {
        error_code error;

        // Make sure the connection isn't closed from under us.
        lock_guard<mutex> lock(this->_propertyMutex);
        this->_client.send(
            this->_connectionHandles[uri],
            message,
            websocketpp::frame::opcode::text,
            error);

        if (error.value() == -1)    
        {
            // PLOG_ERROR << "Error publishing event to relay " << relay << ": " << error.message();
            return make_tuple(uri, false);
        }

        return make_tuple(uri, true);
    };

    void receive(string uri, function<void(const string&, const string&)> messageHandler) override
    {
        this->_client.set_message_handler([this, messageHandler](
            websocketpp::connection_hdl connectionHandle,
            websocketpp_client::message_ptr message)
        {
            json jarr = json::array();
            string payload = message->get_payload();

            jarr.parse(payload);
            string messageType = jarr[0];
            
            if (messageType == "EVENT")
            {
                string subscriptionId = jarr[1];
                string messageContents = jarr[2].dump();
                messageHandler(subscriptionId, messageContents);
            };

            // TODO: Add support for other message types.
        });
    };

    void closeConnection(string uri) override
    {
        lock_guard<mutex> lock(this->_propertyMutex);

        websocketpp::connection_hdl handle = this->_connectionHandles[uri];
        this->_client.close(
            handle,
            websocketpp::close::status::going_away,
            "_client requested close.");
        
        this->_connectionHandles.erase(uri);
    };

private:
    typedef websocketpp::client<websocketpp::config::asio_client> websocketpp_client;
    typedef unordered_map<string, websocketpp::connection_hdl>::iterator connection_hdl_iterator;

    websocketpp_client _client;
    unordered_map<string, websocketpp::connection_hdl> _connectionHandles;
    mutex _propertyMutex;

    void onMessage(websocketpp::connection_hdl handle, websocketpp_client::message_ptr message)
    {
    };
};
} // namespace client