relx 0.1.0
A Modern C++23 Type-Safe SQL Query Builder
Loading...
Searching...
No Matches
pgsql_async_wrapper.hpp
Go to the documentation of this file.
1// Asynchronous PostgreSQL Client with C++20 Coroutines
2// This library wraps libpq's asynchronous API with C++20 coroutines and Boost.Asio
3// to provide a clean, modern interface for PostgreSQL database operations.
4
5#pragma once
6
7#include <chrono>
8#include <expected>
9#include <format>
10#include <functional>
11#include <iostream>
12#include <memory>
13#include <optional>
14#include <stdexcept>
15#include <string>
16#include <string_view>
17#include <unordered_map>
18#include <utility>
19#include <vector>
20
21#include <boost/asio.hpp>
22#include <boost/asio/awaitable.hpp>
23#include <boost/asio/co_spawn.hpp>
24#include <boost/asio/detached.hpp>
25#include <boost/asio/posix/stream_descriptor.hpp>
26#include <boost/asio/this_coro.hpp>
27#include <boost/asio/use_awaitable.hpp>
28#include <libpq-fe.h>
29
31
32// Error types
33struct PgError {
34 std::string message;
35 int error_code = 0;
36
37 static PgError from_conn(PGconn* conn) {
38 return PgError{.message = PQerrorMessage(conn) ? PQerrorMessage(conn)
39 : "Unknown PostgreSQL error",
40 .error_code = PQstatus(conn)};
41 }
42
43 static PgError from_result(PGresult* result) {
44 return PgError{.message = PQresultErrorMessage(result) ? PQresultErrorMessage(result)
45 : "Unknown PostgreSQL error",
46 .error_code = static_cast<int>(PQresultStatus(result))};
47 }
48};
49
53inline std::string format_error(const PgError& error) {
54 return std::format("PostgreSQL error: {} (Code: {})", error.message, error.error_code);
55}
56
57// Type alias for result of operations
58template <typename T>
59using PgResult = std::expected<T, PgError>;
60
61// Result class to handle PGresult
62class Result {
63private:
64 PGresult* res_ = nullptr;
65
66public:
67 Result() = default;
68
69 explicit Result(PGresult* res) : res_(res) {}
70
71 ~Result() { clear(); }
72
73 Result(const Result&) = delete;
74 Result& operator=(const Result&) = delete;
75
76 Result(Result&& other) noexcept : res_(other.res_) { other.res_ = nullptr; }
77
78 Result& operator=(Result&& other) noexcept {
79 if (this != &other) {
80 clear();
81 res_ = other.res_;
82 other.res_ = nullptr;
83 }
84 return *this;
85 }
86
87 void clear() {
88 if (res_) {
89 PQclear(res_);
90 res_ = nullptr;
91 }
92 }
93
94 bool ok() const {
95 if (!res_) {
96 return false;
97 }
98 auto status = PQresultStatus(res_);
99 return status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK;
100 }
101
102 ExecStatusType status() const { return res_ ? PQresultStatus(res_) : PGRES_FATAL_ERROR; }
103
104 const char* error_message() const {
105 return res_ ? PQresultErrorMessage(res_) : "No result available";
106 }
107
108 int rows() const { return res_ ? PQntuples(res_) : 0; }
109
110 int columns() const { return res_ ? PQnfields(res_) : 0; }
111
112 const char* field_name(int col) const { return res_ ? PQfname(res_, col) : nullptr; }
113
114 Oid field_type(int col) const { return res_ ? PQftype(res_, col) : 0; }
115
116 int field_size(int col) const { return res_ ? PQfsize(res_, col) : 0; }
117
118 int field_number(const char* name) const { return res_ ? PQfnumber(res_, name) : -1; }
119
120 bool is_null(int row, int col) const { return res_ ? PQgetisnull(res_, row, col) != 0 : true; }
121
122 const char* get_value(int row, int col) const {
123 return res_ ? PQgetvalue(res_, row, col) : nullptr;
124 }
125
126 int get_length(int row, int col) const { return res_ ? PQgetlength(res_, row, col) : 0; }
127
128 PGresult* get() const { return res_; }
129
130 operator bool() const { return ok(); }
131};
132
133// Transaction isolation level
135
136// Forward declaration
137class Connection;
138
139// Prepared statement class
141private:
142 Connection& conn_;
143 std::string name_;
144 std::string query_;
145 bool prepared_ = false;
146
147public:
148 PreparedStatement(Connection& conn, std::string name, std::string query)
149 : conn_(conn), name_(std::move(name)), query_(std::move(query)) {}
150
152
153 // Non-copyable
156
157 // Move constructible/assignable
159 : conn_(other.conn_), name_(std::move(other.name_)), query_(std::move(other.query_)),
160 prepared_(other.prepared_) {
161 other.prepared_ = false;
162 }
163
165 if (this != &other) {
166 name_ = std::move(other.name_);
167 query_ = std::move(other.query_);
168 prepared_ = other.prepared_;
169 other.prepared_ = false;
170 }
171 return *this;
172 }
173
174 const std::string& name() const { return name_; }
175 const std::string& query() const { return query_; }
176 bool is_prepared() const { return prepared_; }
177
178 // The implementation of prepare and execute will be defined in separate cpp file
179 boost::asio::awaitable<PgResult<void>> prepare();
180 boost::asio::awaitable<PgResult<Result>> execute(const std::vector<std::string>& params);
181 boost::asio::awaitable<PgResult<void>> deallocate();
182
183 friend class Connection;
184};
185
186// ----------------------------------------------------------------------
187// The connection class - main interface for PostgreSQL operations
188// ----------------------------------------------------------------------
190private:
191 boost::asio::io_context& io_;
192 PGconn* conn_ = nullptr;
193 std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
194 std::unordered_map<std::string, std::shared_ptr<PreparedStatement>> statements_;
195 bool in_transaction_ = false;
196
197 PgResult<void> create_socket() {
198 if (conn_ == nullptr) {
199 return std::unexpected(
200 PgError{.message = "Cannot create socket: no connection", .error_code = -1});
201 }
202
203 const int sock = PQsocket(conn_);
204 if (sock < 0) {
205 return std::unexpected(PgError{.message = "Invalid socket", .error_code = -1});
206 }
207
208 socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_, boost::asio::ip::tcp::v4(), sock);
209 return PgResult<void>{};
210 }
211
212 // Asynchronous flush of outgoing data to the PostgreSQL server
213 boost::asio::awaitable<PgResult<void>> flush_outgoing_data() {
214 while (true) {
215 const int flush_result = PQflush(conn_);
216 if (flush_result == -1) {
217 co_return std::unexpected(PgError::from_conn(conn_));
218 }
219 if (flush_result == 0) {
220 break; // All data has been flushed
221 }
222 // Flush returned 1, need to wait for socket to be writable
223 auto socket_result = socket();
224 if (!socket_result) {
225 co_return std::unexpected(socket_result.error());
226 }
227
228 boost::system::error_code ec;
229 co_await (*socket_result)
230 ->async_wait(boost::asio::ip::tcp::socket::wait_write,
231 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
232
233 if (ec) {
234 co_return std::unexpected(PgError{.message = ec.message(), .error_code = ec.value()});
235 }
236 }
237 co_return PgResult<void>{};
238 }
239
240 // Helper method to wait for and get a query result
241 boost::asio::awaitable<PgResult<Result>> get_query_result() {
242 while (true) {
243 // Check if we can consume input without blocking
244 if (PQconsumeInput(conn_) == 0) {
245 co_return std::unexpected(PgError::from_conn(conn_));
246 }
247
248 // Check if we can get a result without blocking
249 if (!PQisBusy(conn_)) {
250 PGresult* res = PQgetResult(conn_);
251 Result result_obj(res);
252
253 // Flush all remaining results (normally there should be none for a single query)
254 while ((res = PQgetResult(conn_)) != nullptr) {
255 PQclear(res);
256 }
257
258 co_return result_obj;
259 }
260
261 // Still busy, wait for the socket to be readable
262 auto socket_result = socket();
263 if (!socket_result) {
264 co_return std::unexpected(socket_result.error());
265 }
266
267 boost::system::error_code ec;
268 co_await (*socket_result)
269 ->async_wait(boost::asio::ip::tcp::socket::wait_read,
270 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
271
272 if (ec) {
273 co_return std::unexpected(PgError{.message = ec.message(), .error_code = ec.value()});
274 }
275 }
276 }
277
278public:
279 Connection(boost::asio::io_context& io) : io_(io) {}
280
282
283 // Non-copyable
284 Connection(const Connection&) = delete;
285 Connection& operator=(const Connection&) = delete;
286
287 // Move constructible/assignable
288 Connection(Connection&& other) noexcept
289 : io_(other.io_), conn_(other.conn_), socket_(std::move(other.socket_)),
290 statements_(std::move(other.statements_)), in_transaction_(other.in_transaction_) {
291 other.conn_ = nullptr;
292 other.in_transaction_ = false;
293 }
294
295 Connection& operator=(Connection&& other) noexcept {
296 if (this != &other) {
297 close();
298 conn_ = other.conn_;
299 socket_ = std::move(other.socket_);
300 statements_ = std::move(other.statements_);
301 in_transaction_ = other.in_transaction_;
302 other.conn_ = nullptr;
303 other.in_transaction_ = false;
304 }
305 return *this;
306 }
307
308 void close() {
309 statements_.clear();
310
311 if (socket_) {
312 socket_->close();
313 socket_.reset();
314 }
315
316 if (conn_) {
317 PQfinish(conn_);
318 conn_ = nullptr;
319 }
320
321 in_transaction_ = false;
322 }
323
324 bool is_open() const { return conn_ != nullptr && PQstatus(conn_) == CONNECTION_OK; }
325
326 bool in_transaction() const { return in_transaction_; }
327
328 PGconn* native_handle() { return conn_; }
329
331 if (!socket_) {
332 return std::unexpected(PgError{.message = "Socket not initialized", .error_code = -1});
333 }
334 return socket_.get();
335 }
336
337 // Asynchronous connection using boost::asio::awaitable
338 boost::asio::awaitable<PgResult<void>> connect(const std::string& conninfo) {
339 if (conn_ != nullptr) {
340 close();
341 }
342
343 // Start the connection process
344 conn_ = PQconnectStart(conninfo.c_str());
345 if (conn_ == nullptr) {
346 co_return std::unexpected(PgError{.message = "Out of memory", .error_code = -1});
347 }
348
349 // Check if connection immediately failed
350 if (PQstatus(conn_) == CONNECTION_BAD) {
351 auto error = PgError::from_conn(conn_);
352 close();
353 co_return std::unexpected(error);
354 }
355
356 // Set the connection to non-blocking
357 if (PQsetnonblocking(conn_, 1) != 0) {
358 auto error = PgError::from_conn(conn_);
359 close();
360 co_return std::unexpected(error);
361 }
362
363 // Create the socket
364 auto socket_result = create_socket();
365 if (!socket_result) {
366 close();
367 co_return std::unexpected(socket_result.error());
368 }
369
370 // Connection polling loop
371 while (true) {
372 const PostgresPollingStatusType poll_status = PQconnectPoll(conn_);
373
374 if (poll_status == PGRES_POLLING_FAILED) {
375 auto error = PgError::from_conn(conn_);
376 close();
377 co_return std::unexpected(error);
378 }
379
380 if (poll_status == PGRES_POLLING_OK) {
381 // Connection successful
382 break;
383 }
384
385 // Need to poll
386 if (poll_status == PGRES_POLLING_READING) {
387 // Wait until socket is readable
388 auto socket_result = socket();
389 if (!socket_result) {
390 close();
391 co_return std::unexpected(socket_result.error());
392 }
393
394 boost::system::error_code ec;
395 co_await (*socket_result)
396 ->async_wait(boost::asio::ip::tcp::socket::wait_read,
397 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
398
399 if (ec) {
400 close();
401 co_return std::unexpected(PgError{.message = ec.message(), .error_code = ec.value()});
402 }
403 } else if (poll_status == PGRES_POLLING_WRITING) {
404 // Wait until socket is writable
405 auto socket_result = socket();
406 if (!socket_result) {
407 close();
408 co_return std::unexpected(socket_result.error());
409 }
410
411 boost::system::error_code ec;
412 co_await (*socket_result)
413 ->async_wait(boost::asio::ip::tcp::socket::wait_write,
414 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
415
416 if (ec) {
417 close();
418 co_return std::unexpected(PgError{.message = ec.message(), .error_code = ec.value()});
419 }
420 }
421 }
422
423 if (PQstatus(conn_) != CONNECTION_OK) {
424 auto error = PgError::from_conn(conn_);
425 close();
426 co_return std::unexpected(error);
427 }
428
429 co_return PgResult<void>{};
430 }
431
432 // Asynchronous parameterized query execution using boost::asio::awaitable
433 boost::asio::awaitable<PgResult<Result>> query(const std::string& query_text,
434 const std::vector<std::string>& params = {}) {
435 if (!is_open()) {
436 co_return std::unexpected(PgError{.message = "Connection is not open", .error_code = -1});
437 }
438
439 // Convert parameters
440 std::vector<const char*> values;
441 values.reserve(params.size());
442 for (const auto& param : params) {
443 values.push_back(param.c_str());
444 }
445
446 // Send the parameterized query
447 if (!PQsendQueryParams(conn_, query_text.c_str(), static_cast<int>(values.size()),
448 // TODO allow user to customize fields with nullptr values
449 nullptr, // param types - inferred
450 values.size() == 0 ? nullptr : values.data(),
451 nullptr, // param lengths - null-terminated strings
452 nullptr, // param formats - text format
453 0 // result format - text format
454 )) {
455 co_return std::unexpected(PgError::from_conn(conn_));
456 }
457
458 // Flush the outgoing data
459 auto flush_result = co_await flush_outgoing_data();
460 if (!flush_result) {
461 co_return std::unexpected(flush_result.error());
462 }
463
464 // Get and return the result
465 co_return co_await get_query_result();
466 }
467
468 // Transaction support
469 // ------------------
470
471 // Begin transaction with specified isolation level
472 boost::asio::awaitable<PgResult<void>> begin_transaction(
474 if (in_transaction_) {
475 co_return std::unexpected(PgError{.message = "Already in a transaction", .error_code = -1});
476 }
477
478 std::string isolation_str;
479 switch (isolation) {
481 isolation_str = "READ UNCOMMITTED";
482 break;
484 isolation_str = "READ COMMITTED";
485 break;
487 isolation_str = "REPEATABLE READ";
488 break;
490 isolation_str = "SERIALIZABLE";
491 break;
492 }
493
494 const std::string begin_cmd = "BEGIN ISOLATION LEVEL " + isolation_str;
495 auto res_result = co_await query(begin_cmd);
496
497 if (!res_result) {
498 co_return std::unexpected(res_result.error());
499 }
500
501 if (!*res_result) {
502 co_return std::unexpected(PgError{.message = res_result->error_message(),
503 .error_code = static_cast<int>(res_result->status())});
504 }
505
506 in_transaction_ = true;
507 co_return PgResult<void>{};
508 }
509
510 // Commit the current transaction
511 boost::asio::awaitable<PgResult<void>> commit() {
512 if (!in_transaction_) {
513 co_return std::unexpected(PgError{.message = "Not in a transaction", .error_code = -1});
514 }
515
516 auto res_result = co_await query("COMMIT");
517
518 if (!res_result) {
519 co_return std::unexpected(res_result.error());
520 }
521
522 if (!*res_result) {
523 co_return std::unexpected(PgError{.message = res_result->error_message(),
524 .error_code = static_cast<int>(res_result->status())});
525 }
526
527 in_transaction_ = false;
528 co_return PgResult<void>{};
529 }
530
531 // Rollback the current transaction
532 boost::asio::awaitable<PgResult<void>> rollback() {
533 if (!in_transaction_) {
534 co_return std::unexpected(PgError{.message = "Not in a transaction", .error_code = -1});
535 }
536
537 auto res_result = co_await query("ROLLBACK");
538
539 if (!res_result) {
540 co_return std::unexpected(res_result.error());
541 }
542
543 if (!*res_result) {
544 co_return std::unexpected(PgError{.message = res_result->error_message(),
545 .error_code = static_cast<int>(res_result->status())});
546 }
547
548 in_transaction_ = false;
549 co_return PgResult<void>{};
550 }
551
552 // Prepared statement support
553 // -------------------------
554
555 // Create a prepared statement
556 boost::asio::awaitable<PgResult<std::shared_ptr<PreparedStatement>>> prepare_statement(
557 const std::string& name, const std::string& query_text) {
558 if (!is_open()) {
559 co_return std::unexpected(PgError{.message = "Connection is not open", .error_code = -1});
560 }
561
562 auto it = statements_.find(name);
563 if (it != statements_.end()) {
564 // Statement with this name already exists
565 if (it->second->query() != query_text) {
566 // Deallocate the old statement since the query is different
567 auto deallocate_result = co_await it->second->deallocate();
568 if (!deallocate_result) {
569 co_return std::unexpected(deallocate_result.error());
570 }
571 statements_.erase(it);
572 } else {
573 // Return the existing statement if it has the same query
574 co_return it->second;
575 }
576 }
577
578 // Create a new prepared statement
579 auto stmt = std::make_shared<PreparedStatement>(*this, name, query_text);
580 statements_[name] = stmt;
581
582 // Prepare it
583 auto prepare_result = co_await stmt->prepare();
584 if (!prepare_result) {
585 statements_.erase(name);
586 co_return std::unexpected(prepare_result.error());
587 }
588
589 co_return stmt;
590 }
591
592 // Get a prepared statement by name
594 auto it = statements_.find(name);
595 if (it != statements_.end()) {
596 return it->second;
597 }
598 return std::unexpected(
599 PgError{.message = "Prepared statement not found: " + name, .error_code = -1});
600 }
601
602 // Execute a prepared statement by name
603 boost::asio::awaitable<PgResult<Result>> execute_prepared(
604 const std::string& name, const std::vector<std::string>& params = {}) {
605 auto stmt_result = get_prepared_statement(name);
606 if (!stmt_result) {
607 co_return std::unexpected(stmt_result.error());
608 }
609
610 co_return co_await (*stmt_result)->execute(params);
611 }
612
613 // Deallocate a prepared statement by name
614 boost::asio::awaitable<PgResult<void>> deallocate_prepared(const std::string& name) {
615 auto stmt_result = get_prepared_statement(name);
616 if (!stmt_result) {
617 co_return std::unexpected(stmt_result.error());
618 }
619
620 auto deallocate_result = co_await (*stmt_result)->deallocate();
621 if (!deallocate_result) {
622 co_return std::unexpected(deallocate_result.error());
623 }
624
625 statements_.erase(name);
626 co_return PgResult<void>{};
627 }
628
629 // Deallocate all prepared statements
630 boost::asio::awaitable<PgResult<void>> deallocate_all_prepared() {
631 std::vector<std::string> names;
632 names.reserve(statements_.size());
633
634 for (auto& [name, _] : statements_) {
635 names.push_back(name);
636 }
637
638 for (const auto& name : names) {
639 auto deallocate_result = co_await deallocate_prepared(name);
640 if (!deallocate_result) {
641 co_return std::unexpected(deallocate_result.error());
642 }
643 }
644
645 co_return PgResult<void>{};
646 }
647
648 friend class PreparedStatement;
649};
650
651} // namespace relx::pgsql_async_wrapper
Abstract base class for database connections.
boost::asio::awaitable< PgResult< void > > deallocate_prepared(const std::string &name)
PgResult< boost::asio::ip::tcp::socket * > socket()
boost::asio::awaitable< PgResult< Result > > query(const std::string &query_text, const std::vector< std::string > &params={})
boost::asio::awaitable< PgResult< void > > rollback()
PgResult< std::shared_ptr< PreparedStatement > > get_prepared_statement(const std::string &name)
boost::asio::awaitable< PgResult< void > > deallocate_all_prepared()
boost::asio::awaitable< PgResult< std::shared_ptr< PreparedStatement > > > prepare_statement(const std::string &name, const std::string &query_text)
Connection & operator=(const Connection &)=delete
boost::asio::awaitable< PgResult< void > > begin_transaction(IsolationLevel isolation=IsolationLevel::ReadCommitted)
Connection(const Connection &)=delete
boost::asio::awaitable< PgResult< void > > connect(const std::string &conninfo)
boost::asio::awaitable< PgResult< void > > commit()
boost::asio::awaitable< PgResult< Result > > execute_prepared(const std::string &name, const std::vector< std::string > &params={})
Connection & operator=(Connection &&other) noexcept
PreparedStatement(PreparedStatement &&other) noexcept
PreparedStatement & operator=(PreparedStatement &&other) noexcept
PreparedStatement(Connection &conn, std::string name, std::string query)
PreparedStatement(const PreparedStatement &)=delete
PreparedStatement & operator=(const PreparedStatement &)=delete
boost::asio::awaitable< PgResult< void > > deallocate()
boost::asio::awaitable< PgResult< Result > > execute(const std::vector< std::string > &params)
boost::asio::awaitable< PgResult< void > > prepare()
Result & operator=(const Result &)=delete
bool is_null(int row, int col) const
const char * field_name(int col) const
Result & operator=(Result &&other) noexcept
Result(const Result &)=delete
int field_number(const char *name) const
const char * get_value(int row, int col) const
int get_length(int row, int col) const
std::expected< T, PgError > PgResult
std::string format_error(const PgError &error)
Format a PgError for exception messages.
STL namespace.
pg_result PGresult
static PgError from_conn(PGconn *conn)
static PgError from_result(PGresult *result)