Malloy
Loading...
Searching...
No Matches
connection.hpp
1#pragma once
2
3#include "connection_t.hpp"
4#include "../websocket/connection.hpp"
5#include "../../core/http/request.hpp"
6#include "../../core/http/generator.hpp"
7
8#include <boost/asio/dispatch.hpp>
9#include <boost/beast/core.hpp>
10#include <boost/beast/http.hpp>
11#include <boost/beast/http/detail/type_traits.hpp>
12#include <spdlog/logger.h>
13
14#include <algorithm>
15#include <cstdlib>
16#include <filesystem>
17#include <memory>
18#include <optional>
19#include <string>
20#include <concepts>
21#include <vector>
22
23namespace spdlog
24{
25 class logger;
26}
27
28namespace malloy::server::http
29{
30
39 template<class Derived>
41 {
42 public:
44 public std::enable_shared_from_this<request_generator>
45 {
46 friend class connection;
47
48 public:
49 using h_parser_t = std::unique_ptr<boost::beast::http::request_parser<boost::beast::http::empty_body>>;
50 using header_t = boost::beast::http::request_header<>;
51
52 [[nodiscard]]
53 header_t&
54 header() { return m_header; }
55
56 [[nodiscard]]
57 const header_t&
58 header() const { return m_header; }
59
60 template<typename Body, std::invocable<malloy::http::request<Body>&&> Callback, typename SetupCb>
61 auto
62 body(Callback&& done, SetupCb&& setup)
63 {
64 using namespace boost::beast::http;
65 using body_t = std::decay_t<Body>;
66 auto parser = std::make_shared<boost::beast::http::request_parser<body_t>>(std::move(*m_parser));
67 parser->get().base() = m_header;
68 std::invoke(setup, parser->get().body());
69
70 boost::beast::http::async_read(
71 m_parent->derived().m_stream, m_buffer, *parser,
72 [_ = m_parent,
73 done = std::forward<Callback>(done),
74 p = parser, this_ = this->shared_from_this(),
75 this
76 ](const auto& ec, auto) {
77 if (ec && m_parent->m_logger) { // TODO: see #40
78 m_parent->m_logger->error("failed to read http request body: '{}'", ec.message());
79 return;
80 }
81 done(malloy::http::request<Body>{p->release()});
82 }
83 );
84 }
85
86 template<typename Body, std::invocable<malloy::http::request<Body>&&> Callback>
87 auto
88 body(Callback&& done)
89 {
90 return body<Body>(std::forward<Callback>(done), [](auto){});
91 }
92
93 private:
94 request_generator(h_parser_t hparser, header_t header, std::shared_ptr<connection> parent, boost::beast::flat_buffer buff) :
95 m_buffer{ std::move(buff) },
96 m_parser{ std::move(hparser) },
97 m_header{ std::move(header) },
98 m_parent{ std::move(parent) }
99 {
100 assert(m_parent); // ToDo: Should this be BOOST_ASSERT?
101 }
102
103 boost::beast::flat_buffer m_buffer;
104 h_parser_t m_parser;
105 header_t m_header;
106 std::shared_ptr<connection> m_parent;
107 };
108
112 // ToDo: Can the connections be moved? At least there is an inconsistency between WS and HTTP connection passing. Intentional?
114 {
115 public:
117 using http_conn_t = const connection_t&;
118 using path = std::filesystem::path;
119 using req_t = std::shared_ptr<request_generator>;
120
121 virtual
122 ~handler() = default;
123
124 virtual
125 void
126 websocket(const path& root, const req_t& req, const std::shared_ptr<malloy::server::websocket::connection>&) = 0;
127
128 virtual
129 void
130 http(const path& root, const req_t& req, http_conn_t) = 0;
131 };
132
136 struct config
137 {
138 std::uint64_t request_body_limit = 100'000'000;
139 std::string agent_string;
140 };
141
145 struct config cfg;
146
156 std::shared_ptr<spdlog::logger> logger,
157 boost::beast::flat_buffer buffer,
158 std::shared_ptr<handler> router,
159 std::shared_ptr<const std::filesystem::path> http_doc_root
160 ) :
161 m_buffer(std::move(buffer)),
162 m_logger(std::move(logger)),
163 m_doc_root(std::move(http_doc_root)),
164 m_router(std::move(router))
165 {
166 // Sanity check logger
167 if (!m_logger)
168 throw std::runtime_error("did not receive a valid logger instance.");
169
170 // Sanity check router
171 if (!m_router)
172 throw std::runtime_error("did not receive a valid router instance.");
173 }
174
182 [[nodiscard]]
183 std::shared_ptr<spdlog::logger>
184 logger() const noexcept
185 {
186 return m_logger;
187 }
188
197 template<bool isRequest, class Body, class Fields>
198 void
199 do_write(boost::beast::http::message<isRequest, Body, Fields>&& msg)
200 {
201 // The lifetime of the message has to extend
202 // for the duration of the async operation so
203 // we use a shared_ptr to manage it.
204 auto sp = std::make_shared<boost::beast::http::message<isRequest, Body, Fields>>(std::move(msg));
205
206 // Store a type-erased version of the shared
207 // pointer in the class to keep it alive.
208 m_response = sp;
209
210 // Write the response
211 boost::beast::http::async_write(
212 derived().m_stream,
213 *sp,
214 boost::beast::bind_front_handler(
215 &connection::on_write,
216 derived().shared_from_this(),
217 sp->need_eof()
218 )
219 );
220 }
221
222 void
223 do_read()
224 {
225 m_logger->trace("do_read()");
226
227 // Construct a new parser for each message
228 m_parser = std::make_unique<std::decay_t<decltype(*m_parser)>>();
229
230 // Apply a reasonable limit to the allowed size
231 // of the body in bytes to prevent abuse.
232 m_parser->body_limit(cfg.request_body_limit);
233
234 // Set the timeout.
235 boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
236
237 // Read a request using the parser-oriented interface
238 boost::beast::http::async_read_header(
239 derived().m_stream,
240 m_buffer,
241 *m_parser,
242 boost::beast::bind_front_handler(
243 &connection::on_read,
244 derived().shared_from_this()
245 )
246 );
247 }
248
249 protected:
250 boost::beast::flat_buffer m_buffer;
251
252 void
253 report_err(malloy::error_code ec, std::string_view context)
254 {
255 m_logger->error("{}: {} (code: {})", context, ec.message(), ec.value());
256 }
257
258 private:
259 friend class request_generator;
260
261 std::shared_ptr<spdlog::logger> m_logger;
262 std::shared_ptr<const std::filesystem::path> m_doc_root;
263 std::shared_ptr<handler> m_router;
264 std::shared_ptr<void> m_response;
265
266 // Pointer to allow handoff to generator since it cannot be copied or moved
267 typename request_generator::h_parser_t m_parser;
268
274 [[nodiscard]]
275 Derived&
276 derived()
277 {
278 return static_cast<Derived&>(*this);
279 }
280
281 void
282 on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
283 {
284 m_logger->trace("on_read(): bytes read: {}", bytes_transferred);
285
286 // This means the connection was closed
287 if (ec == boost::beast::http::error::end_of_stream || ec == boost::beast::error::timeout)
288 return do_close();
289
290 // Check for errors
291 if (ec) {
292 m_logger->error("on_read(): {}", ec.message());
293 return;
294 }
295
296 // Get the header
297 auto header = m_parser->get().base();
298
299 // Parse the request into something more useful from hereon
300 auto gen = std::shared_ptr<request_generator>{new request_generator{ std::move(m_parser), std::move(header), derived().shared_from_this(), std::move(m_buffer) }}; // Private ctor
301
302 // Check if this is a WS request
303 if (boost::beast::websocket::is_upgrade(gen->header())) {
304 m_logger->info("upgrading HTTP connection to WS connection");
305
306 // Create a websocket connection, transferring ownership
307 // of both the socket and the HTTP request.
308 boost::beast::get_lowest_layer(derived().stream()).expires_never();
309 auto ws_connection = server::websocket::connection::make(
310 m_logger,
311 malloy::websocket::stream{derived().release_stream()},
312 cfg.agent_string
313 );
314
315 // Hand over to router
316 m_router->websocket(*m_doc_root, gen, ws_connection);
317 }
318
319 // This is an HTTP request
320 else {
321 // Hand over to router
322 m_router->http(*m_doc_root, std::move(gen), derived().shared_from_this());
323 }
324 }
325
326 void
327 on_write(bool close, boost::beast::error_code ec, std::size_t bytes_transferred)
328 {
329 m_logger->trace("on_write(): bytes written: {}", bytes_transferred);
330
331 // Check for errors
332 if (ec) {
333 m_logger->error("on_write(): {}", ec.message());
334 return;
335 }
336
337 if (close) {
338 // This means we should close the connection, usually because
339 // the response indicated the "Connection: close" semantic.
340 return do_close();
341 }
342
343 // We're done with the response so delete it
344 m_response = { };
345
346 // Read another request
347 do_read();
348 }
349
353 void
354 do_close()
355 {
356 m_logger->trace("do_close()");
357
358 // Let the derived class handle this
359 derived().do_close();
360
361 // At this point the connection is closed gracefully
362 m_logger->info("HTTP connection closed gracefully");
363 }
364 };
365
366}
Definition: request.hpp:19
Definition: connection.hpp:114
Definition: connection.hpp:41
void do_write(boost::beast::http::message< isRequest, Body, Fields > &&msg)
Definition: connection.hpp:199
connection(std::shared_ptr< spdlog::logger > logger, boost::beast::flat_buffer buffer, std::shared_ptr< handler > router, std::shared_ptr< const std::filesystem::path > http_doc_root)
Definition: connection.hpp:155
std::shared_ptr< spdlog::logger > logger() const noexcept
Definition: connection.hpp:184
Definition: router.hpp:104
static std::shared_ptr< connection > make(const std::shared_ptr< spdlog::logger > logger, stream &&ws, const std::string &agent_string)
Construct a new connection object.
Definition: connection.hpp:106
Websocket stream. May use TLS.
Definition: stream.hpp:50
boost::beast::basic_stream< boost::asio::ip::tcp, boost::asio::any_io_executor, RatePolicy > stream
Definition: stream.hpp:22
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9
Definition: connection.hpp:137
std::string agent_string
Agent string to use, set by the controller.
Definition: connection.hpp:139