3#include "../connection/pgsql_async_wrapper.hpp"
4#include "../results/result.hpp"
18#include <boost/asio.hpp>
19#include <boost/asio/awaitable.hpp>
20#include <boost/asio/co_spawn.hpp>
21#include <boost/asio/detached.hpp>
22#include <boost/asio/use_awaitable.hpp>
34 std::string connection_string);
78 std::
string sql,
std::vector<
std::
string> params = {});
83 template <query::SqlExpr Query>
84 boost::asio::awaitable<ConnectionResult<result::ResultSet>>
execute(Query query) {
85 std::string sql = query.to_sql();
86 std::vector<std::string> params = query.bind_params();
95 template <
typename T, query::SqlExpr Query>
96 boost::asio::awaitable<ConnectionResult<T>>
execute(Query query) {
97 auto result_set_output =
co_await execute(query);
98 if (!result_set_output) {
99 co_return std::unexpected(result_set_output.error());
102 const auto& result_set = *result_set_output;
104 if (result_set.empty()) {
112 const auto& row = result_set.at(0);
115 auto structure_tie = boost::pfr::structure_tie(obj);
118 if (result_set.column_count() != boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) {
119 std::stringstream ss;
120 for (
const auto& param : query.bind_params()) {
124 .
message =
"Column count does not match struct field count, " +
125 std::to_string(result_set.column_count()) +
126 " != " + std::to_string(boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) +
127 " for struct " +
typeid(T).name() +
" and query " + query.to_sql() +
128 " with params " + ss.str(),
135 std::vector<std::string> values;
136 for (
size_t i = 0; i < result_set.column_count(); ++i) {
137 auto cell_result = row.get_cell(i);
139 co_return std::unexpected(
143 values.push_back((*cell_result)->raw_value());
148 }
catch (
const std::exception& e) {
149 co_return std::unexpected(
162 template <
typename T, query::SqlExpr Query>
163 boost::asio::awaitable<ConnectionResult<std::vector<T>>>
execute_many(
const Query& query) {
164 auto result_set_output =
co_await execute(query);
165 if (!result_set_output) {
166 co_return std::unexpected(result_set_output.error());
169 const auto& result_set = *result_set_output;
171 std::vector<T> objects;
172 objects.reserve(result_set.size());
175 if (result_set.empty()) {
180 if (result_set.column_count() != boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) {
181 std::stringstream ss;
182 for (
const auto& param : query.bind_params()) {
186 .
message =
"Column count does not match struct field count, " +
187 std::to_string(result_set.column_count()) +
188 " != " + std::to_string(boost::pfr::tuple_size_v<std::remove_cvref_t<T>>) +
189 " for struct " +
typeid(T).name() +
" and query " + query.to_sql() +
190 " with params " + ss.str(),
195 for (
size_t row_idx = 0; row_idx < result_set.size(); ++row_idx) {
196 const auto& row = result_set.at(row_idx);
198 auto structure_tie = boost::pfr::structure_tie(obj);
202 std::vector<std::string> values;
203 for (
size_t i = 0; i < result_set.column_count(); ++i) {
204 auto cell_result = row.get_cell(i);
207 cell_result.error().message,
210 values.push_back((*cell_result)->raw_value());
214 objects.push_back(std::move(obj));
215 }
catch (
const std::exception& e) {
217 .
message = std::string(
"Failed to convert result to struct: ") + e.what(),
257 boost::asio::io_context& io_context_;
258 std::string connection_string_;
259 std::unique_ptr<pgsql_async_wrapper::Connection> async_conn_;
260 bool is_connected_ =
false;
261 bool in_transaction_ =
false;
270 static std::string convert_placeholders(
const std::string& sql);
Asynchronous PostgreSQL implementation of the Connection interface.
boost::asio::awaitable< ConnectionResult< T > > execute(Query query)
Execute a query and map results to a user-defined type asynchronously.
PostgreSQLAsyncConnection(boost::asio::io_context &io_context, std::string connection_string)
Constructor with connection parameters and io_context.
bool reset_connection_state_sync()
Reset connection state synchronously (for use in destructors)
PostgreSQLAsyncConnection & operator=(const PostgreSQLAsyncConnection &)=delete
boost::asio::awaitable< ConnectionResult< result::ResultSet > > execute(Query query)
Execute a query expression asynchronously.
boost::asio::awaitable< ConnectionResult< void > > disconnect()
Disconnect from the PostgreSQL database asynchronously.
boost::asio::awaitable< ConnectionResult< void > > begin_transaction(IsolationLevel isolation_level=IsolationLevel::ReadCommitted)
Begin a new transaction asynchronously.
boost::asio::awaitable< ConnectionResult< void > > commit_transaction()
Commit the current transaction asynchronously.
boost::asio::io_context & get_io_context() const
Get the IO context associated with this connection.
PostgreSQLAsyncConnection(const PostgreSQLAsyncConnection &)=delete
boost::asio::awaitable< ConnectionResult< result::ResultSet > > execute_raw(std::string sql, std::vector< std::string > params={})
Execute a raw SQL query with parameters asynchronously.
pgsql_async_wrapper::Connection & get_async_conn()
Get the underlying async connection wrapper.
bool in_transaction() const
Check if a transaction is currently active.
~PostgreSQLAsyncConnection()
Destructor that ensures proper cleanup.
PostgreSQLAsyncConnection(boost::asio::io_context &io_context, const PostgreSQLConnectionParams ¶ms)
Constructor with structured connection parameters and io_context.
boost::asio::awaitable< ConnectionResult< void > > rollback_transaction()
Rollback the current transaction asynchronously.
bool is_connected() const
Check if the connection is open.
boost::asio::awaitable< ConnectionResult< std::vector< T > > > execute_many(const Query &query)
Execute a query and map results to a vector of user-defined types asynchronously.
PostgreSQLAsyncConnection(PostgreSQLAsyncConnection &&) noexcept
boost::asio::awaitable< ConnectionResult< void > > reset_connection_state()
Reset connection state after streaming operations.
boost::asio::awaitable< ConnectionResult< void > > connect()
Connect to the PostgreSQL database asynchronously.
Represents the result set from a database query.
void map_row_to_tuple(Tuple &tuple, const std::vector< std::string > &row)
Helper function to map a result row to a tuple (and thus to a struct)
IsolationLevel
Transaction isolation levels.
@ ReadCommitted
Prevents dirty reads.
std::expected< T, ConnectionError > ConnectionResult
Type alias for result of connection operations.
Error type for database connection operations.
Basic parameters for a PostgreSQL connection.