3#include "../results/streaming_result.hpp"
11#include <boost/asio.hpp>
12#include <boost/asio/awaitable.hpp>
40 std::vector<std::string> params = {});
55 boost::asio::awaitable<ConnectionResult<void>>
initialize();
85 std::vector<std::string> params_;
87 std::vector<std::string> column_names_;
88 std::vector<bool> is_bytea_column_;
97 std::optional<std::string> first_row_cached_;
100 std::unique_ptr<
struct pg_result, void (*)(
struct pg_result*)> current_result_;
101 size_t current_row_index_;
102 bool has_pending_results_;
105 boost::asio::awaitable<ConnectionResult<void>> start_query();
108 void process_column_metadata_from_pg_result(
struct pg_result* pg_result);
113 std::string format_single_row(
struct pg_result* pg_result);
118 std::string convert_pg_bytea_to_binary(
const std::string& hex_value)
const;
125template <
typename DataSource>
133 : source_(source), result_set_(result_set), current_row_(), at_end_(at_end) {}
144 auto next_row_data =
co_await source_.get_next_row();
146 current_row_ =
result::LazyRow(std::move(*next_row_data), source_.get_column_names());
168 if (!reset_called_) {
169 reset_called_ =
true;
171 if constexpr (std::is_same_v<DataSource, PostgreSQLAsyncStreamingSource>) {
172 source_.get_connection().reset_connection_state_sync();
183 : source_(std::move(other.source_)), reset_called_(other.reset_called_) {
184 other.reset_called_ =
true;
188 if (
this != &other) {
190 if (!reset_called_) {
191 reset_called_ =
true;
192 if constexpr (std::is_same_v<DataSource, PostgreSQLAsyncStreamingSource>) {
193 source_.get_connection().reset_connection_state_sync();
197 source_ = std::move(other.source_);
198 reset_called_ = other.reset_called_;
199 other.reset_called_ =
true;
285 template <
typename Func>
286 boost::asio::awaitable<void>
for_each(Func&& func) {
290 co_await it.advance();
292 while (!it.is_at_end()) {
294 using ReturnType = std::invoke_result_t<Func,
decltype(*it)>;
296 if constexpr (is_awaitable_v<ReturnType>) {
300 using AwaitableValueType =
typename ReturnType::value_type;
302 if constexpr (std::is_same_v<AwaitableValueType, bool>) {
304 bool result =
co_await func(*it);
316 if constexpr (std::is_same_v<ReturnType, bool>) {
329 co_await it.advance();
338 co_await source_.async_cleanup();
345 if (!reset_called_) {
346 reset_called_ =
true;
348 if constexpr (std::is_same_v<DataSource, PostgreSQLAsyncStreamingSource>) {
349 auto reset_result =
co_await source_.get_connection().reset_connection_state();
366template <
typename... Args>
370 std::vector<std::string> param_strings;
371 if constexpr (
sizeof...(Args) > 0) {
372 param_strings.reserve(
sizeof...(Args));
374 auto add_param = [¶m_strings](
auto&& param) {
375 using ParamType = std::remove_cvref_t<
decltype(param)>;
377 if constexpr (std::is_same_v<ParamType, std::nullptr_t>) {
378 param_strings.push_back(
"NULL");
379 }
else if constexpr (std::is_same_v<ParamType, std::string> ||
380 std::is_same_v<ParamType, const char*> ||
381 std::is_same_v<ParamType, std::string_view>) {
382 param_strings.push_back(std::string(param));
383 }
else if constexpr (std::is_arithmetic_v<ParamType>) {
384 param_strings.push_back(std::to_string(param));
385 }
else if constexpr (std::is_same_v<ParamType, bool>) {
386 param_strings.push_back(param ?
"t" :
"f");
388 std::ostringstream ss;
390 param_strings.push_back(ss.str());
394 (add_param(std::forward<Args>(args)), ...);
Iterator that reads data on demand asynchronously.
async_streaming_iterator(DataSource &source, AsyncStreamingResultSet &result_set, bool at_end=false)
const auto & operator*() const
Get the current row (must be called after advance())
boost::asio::awaitable< void > advance()
Advance to the next row asynchronously.
Async streaming result set that yields rows asynchronously.
AsyncStreamingResultSet(DataSource source)
AsyncStreamingResultSet & operator=(AsyncStreamingResultSet &&other) noexcept
boost::asio::awaitable< void > cleanup()
Explicitly cleanup the streaming source.
AsyncStreamingResultSet & operator=(const AsyncStreamingResultSet &)=delete
AsyncStreamingResultSet(const AsyncStreamingResultSet &)=delete
boost::asio::awaitable< void > auto_reset_connection_state()
Internal method to automatically reset connection state (called only once)
async_streaming_iterator begin()
Begin async iteration.
AsyncStreamingResultSet(AsyncStreamingResultSet &&other) noexcept
boost::asio::awaitable< void > for_each(Func &&func)
Process all rows with an async callback.
async_streaming_iterator end()
End marker for async iteration.
~AsyncStreamingResultSet()
Destructor that ensures cleanup when result set goes out of scope.
Asynchronous PostgreSQL implementation of the Connection interface.
Async PostgreSQL streaming data source for processing large result sets.
boost::asio::awaitable< ConnectionResult< void > > initialize()
Initialize the streaming query asynchronously.
~PostgreSQLAsyncStreamingSource()
Destructor that cleans up any active query.
const std::vector< std::string > & get_column_names() const
Get the column names for the result set.
bool is_initialized() const
Check if the streaming source has been initialized.
PostgreSQLAsyncStreamingSource(const PostgreSQLAsyncStreamingSource &)=delete
PostgreSQLAsyncConnection & get_connection()
Get reference to the underlying connection.
PostgreSQLAsyncStreamingSource(PostgreSQLAsyncStreamingSource &&other) noexcept
PostgreSQLAsyncStreamingSource(PostgreSQLAsyncConnection &connection, std::string sql, std::vector< std::string > params={})
Constructor with async connection and query parameters.
PostgreSQLAsyncStreamingSource & operator=(const PostgreSQLAsyncStreamingSource &)=delete
PostgreSQLAsyncStreamingSource & operator=(PostgreSQLAsyncStreamingSource &&other) noexcept
boost::asio::awaitable< std::optional< std::string > > get_next_row()
Get the next row from the result set asynchronously.
boost::asio::awaitable< void > async_cleanup()
Explicitly cleanup any active query asynchronously.
bool has_more_rows() const
Check if there are more rows available.
Lazy row that defers cell parsing until accessed.
AsyncStreamingResultSet< PostgreSQLAsyncStreamingSource > create_async_streaming_result(PostgreSQLAsyncConnection &connection, const std::string &sql, Args &&... args)
Create an async streaming result set from a PostgreSQL async connection and query.
constexpr bool is_awaitable_v
Helper trait to detect if a type is boost::asio::awaitable.