17#include <unordered_map>
21#include <boost/asio.hpp>
22#include <boost/asio/awaitable.hpp>
23#include <boost/asio/co_spawn.hpp>
24#include <boost/asio/detached.hpp>
25#include <boost/asio/posix/stream_descriptor.hpp>
26#include <boost/asio/this_coro.hpp>
27#include <boost/asio/use_awaitable.hpp>
38 return PgError{.
message = PQerrorMessage(conn) ? PQerrorMessage(conn)
39 :
"Unknown PostgreSQL error",
40 .error_code = PQstatus(conn)};
44 return PgError{.
message = PQresultErrorMessage(result) ? PQresultErrorMessage(result)
45 :
"Unknown PostgreSQL error",
46 .error_code =
static_cast<int>(PQresultStatus(result))};
54 return std::format(
"PostgreSQL error: {} (Code: {})", error.
message, error.
error_code);
76 Result(
Result&& other) noexcept : res_(other.res_) { other.res_ =
nullptr; }
98 auto status = PQresultStatus(res_);
99 return status == PGRES_COMMAND_OK ||
status == PGRES_TUPLES_OK;
102 ExecStatusType
status()
const {
return res_ ? PQresultStatus(res_) : PGRES_FATAL_ERROR; }
105 return res_ ? PQresultErrorMessage(res_) :
"No result available";
108 int rows()
const {
return res_ ? PQntuples(res_) : 0; }
110 int columns()
const {
return res_ ? PQnfields(res_) : 0; }
112 const char*
field_name(
int col)
const {
return res_ ? PQfname(res_, col) :
nullptr; }
114 Oid
field_type(
int col)
const {
return res_ ? PQftype(res_, col) : 0; }
116 int field_size(
int col)
const {
return res_ ? PQfsize(res_, col) : 0; }
118 int field_number(
const char* name)
const {
return res_ ? PQfnumber(res_, name) : -1; }
120 bool is_null(
int row,
int col)
const {
return res_ ? PQgetisnull(res_, row, col) != 0 :
true; }
123 return res_ ? PQgetvalue(res_, row, col) :
nullptr;
126 int get_length(
int row,
int col)
const {
return res_ ? PQgetlength(res_, row, col) : 0; }
130 operator bool()
const {
return ok(); }
145 bool prepared_ =
false;
159 : conn_(other.conn_), name_(std::move(other.name_)), query_(std::move(other.query_)),
160 prepared_(other.prepared_) {
161 other.prepared_ =
false;
165 if (
this != &other) {
166 name_ = std::move(other.name_);
167 query_ = std::move(other.query_);
168 prepared_ = other.prepared_;
169 other.prepared_ =
false;
174 const std::string&
name()
const {
return name_; }
175 const std::string&
query()
const {
return query_; }
179 boost::asio::awaitable<PgResult<void>>
prepare();
180 boost::asio::awaitable<PgResult<Result>>
execute(
const std::vector<std::string>& params);
191 boost::asio::io_context& io_;
193 std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
194 std::unordered_map<std::string, std::shared_ptr<PreparedStatement>> statements_;
195 bool in_transaction_ =
false;
198 if (conn_ ==
nullptr) {
199 return std::unexpected(
200 PgError{.
message =
"Cannot create socket: no connection", .error_code = -1});
203 const int sock = PQsocket(conn_);
205 return std::unexpected(
PgError{.
message =
"Invalid socket", .error_code = -1});
208 socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_, boost::asio::ip::tcp::v4(), sock);
213 boost::asio::awaitable<PgResult<void>> flush_outgoing_data() {
215 const int flush_result = PQflush(conn_);
216 if (flush_result == -1) {
219 if (flush_result == 0) {
223 auto socket_result =
socket();
224 if (!socket_result) {
225 co_return std::unexpected(socket_result.error());
228 boost::system::error_code ec;
229 co_await (*socket_result)
230 ->async_wait(boost::asio::ip::tcp::socket::wait_write,
231 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
234 co_return std::unexpected(
PgError{.
message = ec.message(), .error_code = ec.value()});
241 boost::asio::awaitable<PgResult<Result>> get_query_result() {
244 if (PQconsumeInput(conn_) == 0) {
249 if (!PQisBusy(conn_)) {
254 while ((res = PQgetResult(conn_)) !=
nullptr) {
258 co_return result_obj;
262 auto socket_result =
socket();
263 if (!socket_result) {
264 co_return std::unexpected(socket_result.error());
267 boost::system::error_code ec;
268 co_await (*socket_result)
269 ->async_wait(boost::asio::ip::tcp::socket::wait_read,
270 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
273 co_return std::unexpected(
PgError{.
message = ec.message(), .error_code = ec.value()});
289 : io_(other.io_), conn_(other.conn_), socket_(std::move(other.socket_)),
290 statements_(std::move(other.statements_)), in_transaction_(other.in_transaction_) {
291 other.conn_ =
nullptr;
292 other.in_transaction_ =
false;
296 if (
this != &other) {
299 socket_ = std::move(other.socket_);
300 statements_ = std::move(other.statements_);
301 in_transaction_ = other.in_transaction_;
302 other.conn_ =
nullptr;
303 other.in_transaction_ =
false;
321 in_transaction_ =
false;
324 bool is_open()
const {
return conn_ !=
nullptr && PQstatus(conn_) == CONNECTION_OK; }
332 return std::unexpected(
PgError{.
message =
"Socket not initialized", .error_code = -1});
334 return socket_.get();
338 boost::asio::awaitable<PgResult<void>>
connect(
const std::string& conninfo) {
339 if (conn_ !=
nullptr) {
344 conn_ = PQconnectStart(conninfo.c_str());
345 if (conn_ ==
nullptr) {
346 co_return std::unexpected(
PgError{.
message =
"Out of memory", .error_code = -1});
350 if (PQstatus(conn_) == CONNECTION_BAD) {
353 co_return std::unexpected(error);
357 if (PQsetnonblocking(conn_, 1) != 0) {
360 co_return std::unexpected(error);
364 auto socket_result = create_socket();
365 if (!socket_result) {
367 co_return std::unexpected(socket_result.error());
372 const PostgresPollingStatusType poll_status = PQconnectPoll(conn_);
374 if (poll_status == PGRES_POLLING_FAILED) {
377 co_return std::unexpected(error);
380 if (poll_status == PGRES_POLLING_OK) {
386 if (poll_status == PGRES_POLLING_READING) {
388 auto socket_result =
socket();
389 if (!socket_result) {
391 co_return std::unexpected(socket_result.error());
394 boost::system::error_code ec;
395 co_await (*socket_result)
396 ->async_wait(boost::asio::ip::tcp::socket::wait_read,
397 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
401 co_return std::unexpected(
PgError{.
message = ec.message(), .error_code = ec.value()});
403 }
else if (poll_status == PGRES_POLLING_WRITING) {
405 auto socket_result =
socket();
406 if (!socket_result) {
408 co_return std::unexpected(socket_result.error());
411 boost::system::error_code ec;
412 co_await (*socket_result)
413 ->async_wait(boost::asio::ip::tcp::socket::wait_write,
414 boost::asio::redirect_error(boost::asio::use_awaitable, ec));
418 co_return std::unexpected(
PgError{.
message = ec.message(), .error_code = ec.value()});
423 if (PQstatus(conn_) != CONNECTION_OK) {
426 co_return std::unexpected(error);
433 boost::asio::awaitable<PgResult<Result>>
query(
const std::string& query_text,
434 const std::vector<std::string>& params = {}) {
436 co_return std::unexpected(
PgError{.
message =
"Connection is not open", .error_code = -1});
440 std::vector<const char*> values;
441 values.reserve(params.size());
442 for (
const auto& param : params) {
443 values.push_back(param.c_str());
447 if (!PQsendQueryParams(conn_, query_text.c_str(),
static_cast<int>(values.size()),
450 values.size() == 0 ?
nullptr : values.data(),
459 auto flush_result =
co_await flush_outgoing_data();
461 co_return std::unexpected(flush_result.error());
465 co_return co_await get_query_result();
474 if (in_transaction_) {
475 co_return std::unexpected(
PgError{.
message =
"Already in a transaction", .error_code = -1});
478 std::string isolation_str;
481 isolation_str =
"READ UNCOMMITTED";
484 isolation_str =
"READ COMMITTED";
487 isolation_str =
"REPEATABLE READ";
490 isolation_str =
"SERIALIZABLE";
494 const std::string begin_cmd =
"BEGIN ISOLATION LEVEL " + isolation_str;
495 auto res_result =
co_await query(begin_cmd);
498 co_return std::unexpected(res_result.error());
502 co_return std::unexpected(
PgError{.
message = res_result->error_message(),
503 .error_code =
static_cast<int>(res_result->status())});
506 in_transaction_ =
true;
511 boost::asio::awaitable<PgResult<void>>
commit() {
512 if (!in_transaction_) {
513 co_return std::unexpected(
PgError{.
message =
"Not in a transaction", .error_code = -1});
516 auto res_result =
co_await query(
"COMMIT");
519 co_return std::unexpected(res_result.error());
523 co_return std::unexpected(
PgError{.
message = res_result->error_message(),
524 .error_code =
static_cast<int>(res_result->status())});
527 in_transaction_ =
false;
532 boost::asio::awaitable<PgResult<void>>
rollback() {
533 if (!in_transaction_) {
534 co_return std::unexpected(
PgError{.
message =
"Not in a transaction", .error_code = -1});
537 auto res_result =
co_await query(
"ROLLBACK");
540 co_return std::unexpected(res_result.error());
544 co_return std::unexpected(
PgError{.
message = res_result->error_message(),
545 .error_code =
static_cast<int>(res_result->status())});
548 in_transaction_ =
false;
557 const std::string& name,
const std::string& query_text) {
559 co_return std::unexpected(
PgError{.
message =
"Connection is not open", .error_code = -1});
562 auto it = statements_.find(name);
563 if (it != statements_.end()) {
565 if (it->second->query() != query_text) {
567 auto deallocate_result =
co_await it->second->deallocate();
568 if (!deallocate_result) {
569 co_return std::unexpected(deallocate_result.error());
571 statements_.erase(it);
574 co_return it->second;
579 auto stmt = std::make_shared<PreparedStatement>(*
this, name, query_text);
580 statements_[name] = stmt;
583 auto prepare_result =
co_await stmt->prepare();
584 if (!prepare_result) {
585 statements_.erase(name);
586 co_return std::unexpected(prepare_result.error());
594 auto it = statements_.find(name);
595 if (it != statements_.end()) {
598 return std::unexpected(
599 PgError{.
message =
"Prepared statement not found: " + name, .error_code = -1});
604 const std::string& name,
const std::vector<std::string>& params = {}) {
607 co_return std::unexpected(stmt_result.error());
610 co_return co_await (*stmt_result)->execute(params);
617 co_return std::unexpected(stmt_result.error());
620 auto deallocate_result =
co_await (*stmt_result)->deallocate();
621 if (!deallocate_result) {
622 co_return std::unexpected(deallocate_result.error());
625 statements_.erase(name);
631 std::vector<std::string> names;
632 names.reserve(statements_.size());
634 for (
auto& [name, _] : statements_) {
635 names.push_back(name);
638 for (
const auto& name : names) {
640 if (!deallocate_result) {
641 co_return std::unexpected(deallocate_result.error());
Abstract base class for database connections.
bool in_transaction() const
boost::asio::awaitable< PgResult< void > > deallocate_prepared(const std::string &name)
PgResult< boost::asio::ip::tcp::socket * > socket()
boost::asio::awaitable< PgResult< Result > > query(const std::string &query_text, const std::vector< std::string > ¶ms={})
boost::asio::awaitable< PgResult< void > > rollback()
PgResult< std::shared_ptr< PreparedStatement > > get_prepared_statement(const std::string &name)
boost::asio::awaitable< PgResult< void > > deallocate_all_prepared()
boost::asio::awaitable< PgResult< std::shared_ptr< PreparedStatement > > > prepare_statement(const std::string &name, const std::string &query_text)
Connection(boost::asio::io_context &io)
Connection(Connection &&other) noexcept
Connection & operator=(const Connection &)=delete
boost::asio::awaitable< PgResult< void > > begin_transaction(IsolationLevel isolation=IsolationLevel::ReadCommitted)
Connection(const Connection &)=delete
boost::asio::awaitable< PgResult< void > > connect(const std::string &conninfo)
boost::asio::awaitable< PgResult< void > > commit()
boost::asio::awaitable< PgResult< Result > > execute_prepared(const std::string &name, const std::vector< std::string > ¶ms={})
Connection & operator=(Connection &&other) noexcept
~PreparedStatement()=default
PreparedStatement(PreparedStatement &&other) noexcept
PreparedStatement & operator=(PreparedStatement &&other) noexcept
PreparedStatement(Connection &conn, std::string name, std::string query)
const std::string & name() const
PreparedStatement(const PreparedStatement &)=delete
PreparedStatement & operator=(const PreparedStatement &)=delete
const std::string & query() const
boost::asio::awaitable< PgResult< void > > deallocate()
boost::asio::awaitable< PgResult< Result > > execute(const std::vector< std::string > ¶ms)
boost::asio::awaitable< PgResult< void > > prepare()
Result & operator=(const Result &)=delete
int field_size(int col) const
bool is_null(int row, int col) const
const char * error_message() const
const char * field_name(int col) const
Result & operator=(Result &&other) noexcept
ExecStatusType status() const
Result(const Result &)=delete
Oid field_type(int col) const
int field_number(const char *name) const
const char * get_value(int row, int col) const
int get_length(int row, int col) const
Result(Result &&other) noexcept
std::expected< T, PgError > PgResult
std::string format_error(const PgError &error)
Format a PgError for exception messages.
static PgError from_conn(PGconn *conn)
static PgError from_result(PGresult *result)