relx 0.1.0
A Modern C++23 Type-Safe SQL Query Builder
Loading...
Searching...
No Matches
postgresql_async_connection.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "../connection/pgsql_async_wrapper.hpp"
4#include "../results/result.hpp"
5#include "connection.hpp"
6#include "meta.hpp"
7
8#include <expected>
9#include <future>
10#include <memory>
11#include <optional>
12#include <regex>
13#include <sstream>
14#include <string>
15#include <string_view>
16#include <vector>
17
18#include <boost/asio.hpp>
19#include <boost/asio/awaitable.hpp>
20#include <boost/asio/co_spawn.hpp>
21#include <boost/asio/detached.hpp>
22#include <boost/asio/use_awaitable.hpp>
23
24namespace relx::connection {
25
28public:
33 explicit PostgreSQLAsyncConnection(boost::asio::io_context& io_context,
34 std::string connection_string);
35
39 explicit PostgreSQLAsyncConnection(boost::asio::io_context& io_context,
40 const PostgreSQLConnectionParams& params);
41
44
45 // Delete copy constructor and assignment operator
48
49 // Allow move operations
52
55 bool is_connected() const;
56
59 bool in_transaction() const;
60
61 //====================================================================
62 // The following methods provide the asynchronous awaitable interface
63 //====================================================================
64
67 boost::asio::awaitable<ConnectionResult<void>> connect();
68
71 boost::asio::awaitable<ConnectionResult<void>> disconnect();
72
77 boost::asio::awaitable<ConnectionResult<result::ResultSet>> execute_raw(
78 std::string sql, std::vector<std::string> params = {});
79
83 template <query::SqlExpr Query>
84 boost::asio::awaitable<ConnectionResult<result::ResultSet>> execute(Query query) {
85 std::string sql = query.to_sql();
86 std::vector<std::string> params = query.bind_params();
87 return execute_raw(sql, params);
88 }
89
95 template <typename T, query::SqlExpr Query>
96 boost::asio::awaitable<ConnectionResult<T>> execute(Query query) {
97 auto result_set_output = co_await execute(query);
98 if (!result_set_output) {
99 co_return std::unexpected(result_set_output.error());
100 }
101
102 const auto& result_set = *result_set_output;
103
104 if (result_set.empty()) {
105 co_return std::unexpected(ConnectionError{.message = "No results found", .error_code = -1});
106 }
107
108 // Create an instance of T
109 T obj{};
110
111 // Get the first row of results
112 const auto& row = result_set.at(0);
113
114 // Use Boost.PFR to get the tuple type that matches our struct
115 auto structure_tie = boost::pfr::structure_tie(obj);
116
117 // Make sure the number of columns matches the number of fields in the struct
118 if (result_set.column_count() != boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) {
119 std::stringstream ss;
120 for (const auto& param : query.bind_params()) {
121 ss << param << ", ";
122 }
123 co_return std::unexpected(ConnectionError{
124 .message = "Column count does not match struct field count, " +
125 std::to_string(result_set.column_count()) +
126 " != " + std::to_string(boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) +
127 " for struct " + typeid(T).name() + " and query " + query.to_sql() +
128 " with params " + ss.str(),
129 .error_code = -1});
130 }
131
132 // Convert each value in the result row to the appropriate type in the struct
133 try {
134 // Create a vector of values from the row
135 std::vector<std::string> values;
136 for (size_t i = 0; i < result_set.column_count(); ++i) {
137 auto cell_result = row.get_cell(i);
138 if (!cell_result) {
139 co_return std::unexpected(
140 ConnectionError{.message = "Failed to get cell value: " + cell_result.error().message,
141 .error_code = -1});
142 }
143 values.push_back((*cell_result)->raw_value());
144 }
145
146 // Apply tuple assignment from the row values to the struct fields
147 relx::connection::map_row_to_tuple(structure_tie, values);
148 } catch (const std::exception& e) {
149 co_return std::unexpected(
150 ConnectionError{.message = std::string("Failed to convert result to struct: ") + e.what(),
151 .error_code = -1});
152 }
153
154 co_return obj;
155 }
156
162 template <typename T, query::SqlExpr Query>
163 boost::asio::awaitable<ConnectionResult<std::vector<T>>> execute_many(const Query& query) {
164 auto result_set_output = co_await execute(query);
165 if (!result_set_output) {
166 co_return std::unexpected(result_set_output.error());
167 }
168
169 const auto& result_set = *result_set_output;
170
171 std::vector<T> objects;
172 objects.reserve(result_set.size());
173
174 // Check if we have at least one row to determine column count
175 if (result_set.empty()) {
176 co_return objects; // Return empty vector
177 }
178
179 // Make sure the number of columns matches the number of fields in the struct
180 if (result_set.column_count() != boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) {
181 std::stringstream ss;
182 for (const auto& param : query.bind_params()) {
183 ss << param << ", ";
184 }
185 co_return std::unexpected(ConnectionError{
186 .message = "Column count does not match struct field count, " +
187 std::to_string(result_set.column_count()) +
188 " != " + std::to_string(boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) +
189 " for struct " + typeid(T).name() + " and query " + query.to_sql() +
190 " with params " + ss.str(),
191 .error_code = -1});
192 }
193
194 // Process each row
195 for (size_t row_idx = 0; row_idx < result_set.size(); ++row_idx) {
196 const auto& row = result_set.at(row_idx);
197 T obj{};
198 auto structure_tie = boost::pfr::structure_tie(obj);
199
200 try {
201 // Create a vector of values from the row
202 std::vector<std::string> values;
203 for (size_t i = 0; i < result_set.column_count(); ++i) {
204 auto cell_result = row.get_cell(i);
205 if (!cell_result) {
206 co_return std::unexpected(ConnectionError{.message = "Failed to get cell value: " +
207 cell_result.error().message,
208 .error_code = -1});
209 }
210 values.push_back((*cell_result)->raw_value());
211 }
212
213 relx::connection::map_row_to_tuple(structure_tie, values);
214 objects.push_back(std::move(obj));
215 } catch (const std::exception& e) {
216 co_return std::unexpected(ConnectionError{
217 .message = std::string("Failed to convert result to struct: ") + e.what(),
218 .error_code = -1});
219 }
220 }
221
222 co_return objects;
223 }
224
228 boost::asio::awaitable<ConnectionResult<void>> begin_transaction(
230
233 boost::asio::awaitable<ConnectionResult<void>> commit_transaction();
234
237 boost::asio::awaitable<ConnectionResult<void>> rollback_transaction();
238
242
244 boost::asio::io_context& get_io_context() const { return io_context_; }
245
248 boost::asio::awaitable<ConnectionResult<void>> reset_connection_state();
249
255
256private:
257 boost::asio::io_context& io_context_;
258 std::string connection_string_;
259 std::unique_ptr<pgsql_async_wrapper::Connection> async_conn_;
260 bool is_connected_ = false;
261 bool in_transaction_ = false;
262
264 static ConnectionResult<result::ResultSet> convert_result(
265 const pgsql_async_wrapper::Result& pg_result);
266
270 static std::string convert_placeholders(const std::string& sql);
271};
272
273} // namespace relx::connection
Asynchronous PostgreSQL implementation of the Connection interface.
boost::asio::awaitable< ConnectionResult< T > > execute(Query query)
Execute a query and map results to a user-defined type asynchronously.
PostgreSQLAsyncConnection(boost::asio::io_context &io_context, std::string connection_string)
Constructor with connection parameters and io_context.
bool reset_connection_state_sync()
Reset connection state synchronously (for use in destructors)
PostgreSQLAsyncConnection & operator=(const PostgreSQLAsyncConnection &)=delete
boost::asio::awaitable< ConnectionResult< result::ResultSet > > execute(Query query)
Execute a query expression asynchronously.
boost::asio::awaitable< ConnectionResult< void > > disconnect()
Disconnect from the PostgreSQL database asynchronously.
boost::asio::awaitable< ConnectionResult< void > > begin_transaction(IsolationLevel isolation_level=IsolationLevel::ReadCommitted)
Begin a new transaction asynchronously.
boost::asio::awaitable< ConnectionResult< void > > commit_transaction()
Commit the current transaction asynchronously.
boost::asio::io_context & get_io_context() const
Get the IO context associated with this connection.
PostgreSQLAsyncConnection(const PostgreSQLAsyncConnection &)=delete
boost::asio::awaitable< ConnectionResult< result::ResultSet > > execute_raw(std::string sql, std::vector< std::string > params={})
Execute a raw SQL query with parameters asynchronously.
pgsql_async_wrapper::Connection & get_async_conn()
Get the underlying async connection wrapper.
bool in_transaction() const
Check if a transaction is currently active.
~PostgreSQLAsyncConnection()
Destructor that ensures proper cleanup.
PostgreSQLAsyncConnection(boost::asio::io_context &io_context, const PostgreSQLConnectionParams &params)
Constructor with structured connection parameters and io_context.
boost::asio::awaitable< ConnectionResult< void > > rollback_transaction()
Rollback the current transaction asynchronously.
bool is_connected() const
Check if the connection is open.
boost::asio::awaitable< ConnectionResult< std::vector< T > > > execute_many(const Query &query)
Execute a query and map results to a vector of user-defined types asynchronously.
PostgreSQLAsyncConnection(PostgreSQLAsyncConnection &&) noexcept
boost::asio::awaitable< ConnectionResult< void > > reset_connection_state()
Reset connection state after streaming operations.
boost::asio::awaitable< ConnectionResult< void > > connect()
Connect to the PostgreSQL database asynchronously.
Represents the result set from a database query.
Definition result.hpp:608
void map_row_to_tuple(Tuple &tuple, const std::vector< std::string > &row)
Helper function to map a result row to a tuple (and thus to a struct)
Definition meta.hpp:61
IsolationLevel
Transaction isolation levels.
@ ReadCommitted
Prevents dirty reads.
std::expected< T, ConnectionError > ConnectionResult
Type alias for result of connection operations.
STL namespace.
Error type for database connection operations.
Basic parameters for a PostgreSQL connection.