Malloy
Loading...
Searching...
No Matches
controller_run_result.hpp
1#pragma once
2
3#include <boost/asio/executor_work_guard.hpp>
4#include <boost/asio/io_context.hpp>
5#include <boost/asio/use_future.hpp>
6#include <spdlog/logger.h>
7
8#include <memory>
9#include <thread>
10#include <stdexcept>
11
12namespace malloy::detail
13{
14
19 {
23 std::size_t num_threads = 1;
24
28 std::shared_ptr<spdlog::logger> logger;
29
30 void
31 validate()
32 {
33 if (!logger)
34 throw std::logic_error{"invalid config: logger is null"};
35
36 if (num_threads == 0)
37 throw std::logic_error{"invalid config: cannot have 0 threads"};
38 };
39 };
40
41 template<std::movable T>
43 {
44 public:
52 controller_run_result(const controller_config& cfg, T ctrl, std::unique_ptr<boost::asio::io_context> ioc) :
53 m_io_ctx{std::move(ioc)},
54 m_workguard{m_io_ctx->get_executor()},
55 m_ctrl{std::move(ctrl)}
56 {
57 // Create the I/O context threads
58 m_io_threads.reserve(cfg.num_threads);
59 for (std::size_t i = 0; i < cfg.num_threads; i++) {
60 m_io_threads.emplace_back(
61 [m_io_ctx = m_io_ctx.get()] { // We cannot capture `this` as we may be moved from before this executes
62 assert(m_io_ctx);
63 m_io_ctx->run();
64 });
65 }
66
67 // Log
68 cfg.logger->debug("starting i/o context.");
69 }
70
72 controller_run_result(controller_run_result&&) noexcept = default;
73
74 controller_run_result& operator=(const controller_run_result&) = delete;
75 controller_run_result& operator=(controller_run_result&&) noexcept = default;
76
81 {
82 if (!m_io_ctx)
83 return; // We've been moved
84
85 // Stop the `io_context`. This will cause `run()`
86 // to return immediately, eventually destroying the
87 // `io_context` and all of the sockets in it.
88 m_io_ctx->stop();
89
90 // Tell the workguard that we no longer need it's service
91 m_workguard.reset();
92
93 // Join I/O threads
94 for (auto& thread : m_io_threads)
95 thread.join();
96 }
97
101 void
103 {
104 if (!m_io_ctx)
105 throw std::logic_error{"attempt to call run() on moved from run_result_t"};
106
107 m_workguard.reset();
108 m_io_ctx->run();
109 }
110
111 private:
112 using workguard_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
113
114 std::unique_ptr<boost::asio::io_context> m_io_ctx;
115 workguard_t m_workguard;
116 std::vector<std::thread> m_io_threads;
117 T m_ctrl; // This order matters, the T destructor may need access to something related to the i/o-context
118 };
119
120} // namespace malloy::detail
Definition: controller_run_result.hpp:43
controller_run_result(const controller_config &cfg, T ctrl, std::unique_ptr< boost::asio::io_context > ioc)
Definition: controller_run_result.hpp:52
void run()
Block until all queued async actions completed.
Definition: controller_run_result.hpp:102
Definition: controller_run_result.hpp:19
std::shared_ptr< spdlog::logger > logger
Definition: controller_run_result.hpp:28
std::size_t num_threads
Definition: controller_run_result.hpp:23