3#include "request_result.hpp"
4#include "../type_traits.hpp"
5#include "../../core/mp.hpp"
6#include "../../core/type_traits.hpp"
7#include "../../core/http/request.hpp"
8#include "../../core/http/response.hpp"
9#include "../../core/http/type_traits.hpp"
11#include <boost/asio/strand.hpp>
12#include <boost/asio.hpp>
13#include <boost/beast/core.hpp>
14#include <spdlog/logger.h>
20namespace malloy::client::http
31 concepts::response_filter Filter
36 connection(std::shared_ptr<spdlog::logger> logger,
const std::uint64_t body_limit) :
37 m_logger(std::move(logger))
41 throw std::invalid_argument(
"no valid logger provided.");
44 m_parser.body_limit(body_limit);
52 boost::asio::awaitable< request_result<Filter> >
59 auto executor =
co_await boost::asio::this_coro::executor;
62 auto resolver = boost::asio::ip::tcp::resolver{ executor };
63 const auto [ec1, results] =
co_await resolver.async_resolve(
64 req.base()[malloy::http::field::host],
65 std::to_string(req.
port()),
66 boost::asio::as_tuple(boost::asio::use_awaitable)
73 set_stream_timeout(std::chrono::seconds(30));
74 co_await boost::beast::get_lowest_layer(derived().stream()).async_connect(results, boost::asio::redirect_error(boost::asio::use_awaitable, ec2));
79 co_await derived().hook_connected();
82 set_stream_timeout(std::chrono::seconds(30));
83 co_await boost::beast::http::async_write(derived().stream(), req);
93 auto bodies = filter.body_for(m_parser.get().base());
94 auto resp =
co_await std::visit(
96 using body_t = std::decay_t<
decltype(body)>;
98 auto parser = std::make_shared<boost::beast::http::response_parser<body_t>>(std::move(m_parser));
99 filter.setup_body(parser->get().base(), parser->get().body());
101 boost::beast::flat_buffer buffer;
103 co_await boost::beast::http::async_read(
107 boost::asio::use_awaitable
117 set_stream_timeout(std::chrono::seconds(30));
119 boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
124 if (ec && ec != boost::beast::errc::not_connected)
125 m_logger->error(
"shutdown: {}", ec.message());
139 std::shared_ptr<spdlog::logger> m_logger;
141 template<
typename Rep,
typename Period>
143 set_stream_timeout(
const std::chrono::duration<Rep, Period> duration)
148 boost::beast::get_lowest_layer(derived().stream()).expires_after(duration);
152 boost::beast::http::response_parser<boost::beast::http::empty_body> m_parser;
159 return static_cast<Derived&
>(*this);
Definition: connection.hpp:34
Definition: request.hpp:19
constexpr std::uint16_t port() const noexcept
Definition: request.hpp:143
Definition: response.hpp:22
Definition: type_traits.hpp:9
unwrap_variant< to_responses< bodies_for_t< Filter > > > filter_resp_t
Resolves to the type that must be taken in callbacks handling responses for Filter.
Definition: mp.hpp:67
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9
Definition: request_result.hpp:16
malloy::mp::filter_resp_t< Filter > response
Definition: request_result.hpp:30
malloy::error_code error_code
Definition: request_result.hpp:20