relx 0.1.0
A Modern C++23 Type-Safe SQL Query Builder
Loading...
Searching...
No Matches
postgresql_async_streaming_source.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "../results/streaming_result.hpp"
5
6#include <memory>
7#include <optional>
8#include <string>
9#include <vector>
10
11#include <boost/asio.hpp>
12#include <boost/asio/awaitable.hpp>
13
14// Forward declarations
15struct pg_conn;
16using PGconn = pg_conn;
17
18namespace relx::connection {
19
21template <typename T>
22struct is_awaitable : std::false_type {};
23
24template <typename T>
25struct is_awaitable<boost::asio::awaitable<T>> : std::true_type {};
26
27template <typename T>
28inline constexpr bool is_awaitable_v = is_awaitable<T>::value;
29
34public:
40 std::vector<std::string> params = {});
41
44
45 // Disable copy operations
48
49 // Allow move operations
52
55 boost::asio::awaitable<ConnectionResult<void>> initialize();
56
60 boost::asio::awaitable<std::optional<std::string>> get_next_row();
61
64 const std::vector<std::string>& get_column_names() const;
65
68 bool is_initialized() const { return initialized_; }
69
72 bool has_more_rows() const { return !finished_; }
73
76 boost::asio::awaitable<void> async_cleanup();
77
80 PostgreSQLAsyncConnection& get_connection() { return connection_; }
81
82private:
83 PostgreSQLAsyncConnection& connection_;
84 std::string sql_;
85 std::vector<std::string> params_;
86
87 std::vector<std::string> column_names_;
88 std::vector<bool> is_bytea_column_;
89 bool initialized_;
90 bool finished_;
91 bool convert_bytea_;
92
93 // Track query state
94 bool query_active_;
95
96 // Cache for the first row (since we consume it during metadata processing)
97 std::optional<std::string> first_row_cached_;
98
99 // Streaming state for proper result management
100 std::unique_ptr<struct pg_result, void (*)(struct pg_result*)> current_result_;
101 size_t current_row_index_;
102 bool has_pending_results_;
103
105 boost::asio::awaitable<ConnectionResult<void>> start_query();
106
108 void process_column_metadata_from_pg_result(struct pg_result* pg_result);
109
113 std::string format_single_row(struct pg_result* pg_result);
114
118 std::string convert_pg_bytea_to_binary(const std::string& hex_value) const;
119
121 void cleanup();
122};
123
125template <typename DataSource>
127public:
130 public:
131 async_streaming_iterator(DataSource& source, AsyncStreamingResultSet& result_set,
132 bool at_end = false)
133 : source_(source), result_set_(result_set), current_row_(), at_end_(at_end) {}
134
136 const auto& operator*() const { return current_row_; }
137
139 boost::asio::awaitable<void> advance() {
140 if (at_end_) {
141 co_return;
142 }
143
144 auto next_row_data = co_await source_.get_next_row();
145 if (next_row_data) {
146 current_row_ = result::LazyRow(std::move(*next_row_data), source_.get_column_names());
147 } else {
148 at_end_ = true;
149 // Automatically reset connection state when streaming completes
150 co_await result_set_.auto_reset_connection_state();
151 }
152 }
153
154 bool is_at_end() const { return at_end_; }
155
156 private:
157 DataSource& source_;
158 AsyncStreamingResultSet& result_set_;
159 result::LazyRow current_row_;
160 bool at_end_;
161 };
162
163 AsyncStreamingResultSet(DataSource source) : source_(std::move(source)), reset_called_(false) {}
164
167 // Call synchronous cleanup for RAII when going out of scope
168 if (!reset_called_) {
169 reset_called_ = true;
170 // Only call reset for PostgreSQL async streaming sources
171 if constexpr (std::is_same_v<DataSource, PostgreSQLAsyncStreamingSource>) {
172 source_.get_connection().reset_connection_state_sync();
173 }
174 }
175 }
176
177 // Disable copy operations to ensure single ownership
180
181 // Allow move operations
183 : source_(std::move(other.source_)), reset_called_(other.reset_called_) {
184 other.reset_called_ = true; // Prevent cleanup in moved-from object
185 }
186
188 if (this != &other) {
189 // Cleanup current object before move
190 if (!reset_called_) {
191 reset_called_ = true;
192 if constexpr (std::is_same_v<DataSource, PostgreSQLAsyncStreamingSource>) {
193 source_.get_connection().reset_connection_state_sync();
194 }
195 }
196
197 source_ = std::move(other.source_);
198 reset_called_ = other.reset_called_;
199 other.reset_called_ = true; // Prevent cleanup in moved-from object
200 }
201 return *this;
202 }
203
206
208 async_streaming_iterator end() { return async_streaming_iterator(source_, *this, true); }
209
285 template <typename Func>
286 boost::asio::awaitable<void> for_each(Func&& func) {
287 auto it = begin();
288
289 // Advance to first row
290 co_await it.advance();
291
292 while (!it.is_at_end()) {
293 // Handle different function signatures
294 using ReturnType = std::invoke_result_t<Func, decltype(*it)>;
295
296 if constexpr (is_awaitable_v<ReturnType>) {
297 // Asynchronous function - return type is boost::asio::awaitable<T>
298
299 // Extract the value type from the awaitable
300 using AwaitableValueType = typename ReturnType::value_type;
301
302 if constexpr (std::is_same_v<AwaitableValueType, bool>) {
303 // Async function returning bool - check for early termination
304 bool result = co_await func(*it);
305 if (result) {
306 // Early termination - need to manually reset connection state
308 co_return;
309 }
310 } else {
311 // Async function returning void - just await it
312 co_await func(*it);
313 }
314 } else {
315 // Synchronous function
316 if constexpr (std::is_same_v<ReturnType, bool>) {
317 // Sync function returning bool - check for early termination
318 if (func(*it)) {
319 // Early termination - need to manually reset connection state
321 co_return;
322 }
323 } else {
324 // Sync function returning void
325 func(*it);
326 }
327 }
328
329 co_await it.advance();
330 }
331
332 // Connection state reset is automatically called when iterator reaches end via advance()
333 }
334
337 boost::asio::awaitable<void> cleanup() {
338 co_await source_.async_cleanup();
340 }
341
344 boost::asio::awaitable<void> auto_reset_connection_state() {
345 if (!reset_called_) {
346 reset_called_ = true;
347 // Only call reset for PostgreSQL async streaming sources
348 if constexpr (std::is_same_v<DataSource, PostgreSQLAsyncStreamingSource>) {
349 auto reset_result = co_await source_.get_connection().reset_connection_state();
350 // Note: We don't propagate reset errors since streaming operation was successful
351 (void)reset_result; // Silence unused variable warning
352 }
353 }
354 }
355
356private:
357 DataSource source_;
358 bool reset_called_;
359};
360
366template <typename... Args>
368 PostgreSQLAsyncConnection& connection, const std::string& sql, Args&&... args) {
369 // Convert parameters to strings (similar to execute_typed)
370 std::vector<std::string> param_strings;
371 if constexpr (sizeof...(Args) > 0) {
372 param_strings.reserve(sizeof...(Args));
373
374 auto add_param = [&param_strings](auto&& param) {
375 using ParamType = std::remove_cvref_t<decltype(param)>;
376
377 if constexpr (std::is_same_v<ParamType, std::nullptr_t>) {
378 param_strings.push_back("NULL");
379 } else if constexpr (std::is_same_v<ParamType, std::string> ||
380 std::is_same_v<ParamType, const char*> ||
381 std::is_same_v<ParamType, std::string_view>) {
382 param_strings.push_back(std::string(param));
383 } else if constexpr (std::is_arithmetic_v<ParamType>) {
384 param_strings.push_back(std::to_string(param));
385 } else if constexpr (std::is_same_v<ParamType, bool>) {
386 param_strings.push_back(param ? "t" : "f");
387 } else {
388 std::ostringstream ss;
389 ss << param;
390 param_strings.push_back(ss.str());
391 }
392 };
393
394 (add_param(std::forward<Args>(args)), ...);
395 }
396
398 PostgreSQLAsyncStreamingSource(connection, sql, std::move(param_strings)));
399}
400
401} // namespace relx::connection
async_streaming_iterator(DataSource &source, AsyncStreamingResultSet &result_set, bool at_end=false)
const auto & operator*() const
Get the current row (must be called after advance())
boost::asio::awaitable< void > advance()
Advance to the next row asynchronously.
Async streaming result set that yields rows asynchronously.
AsyncStreamingResultSet & operator=(AsyncStreamingResultSet &&other) noexcept
boost::asio::awaitable< void > cleanup()
Explicitly cleanup the streaming source.
AsyncStreamingResultSet & operator=(const AsyncStreamingResultSet &)=delete
AsyncStreamingResultSet(const AsyncStreamingResultSet &)=delete
boost::asio::awaitable< void > auto_reset_connection_state()
Internal method to automatically reset connection state (called only once)
async_streaming_iterator begin()
Begin async iteration.
AsyncStreamingResultSet(AsyncStreamingResultSet &&other) noexcept
boost::asio::awaitable< void > for_each(Func &&func)
Process all rows with an async callback.
async_streaming_iterator end()
End marker for async iteration.
~AsyncStreamingResultSet()
Destructor that ensures cleanup when result set goes out of scope.
Asynchronous PostgreSQL implementation of the Connection interface.
Async PostgreSQL streaming data source for processing large result sets.
boost::asio::awaitable< ConnectionResult< void > > initialize()
Initialize the streaming query asynchronously.
~PostgreSQLAsyncStreamingSource()
Destructor that cleans up any active query.
const std::vector< std::string > & get_column_names() const
Get the column names for the result set.
bool is_initialized() const
Check if the streaming source has been initialized.
PostgreSQLAsyncStreamingSource(const PostgreSQLAsyncStreamingSource &)=delete
PostgreSQLAsyncConnection & get_connection()
Get reference to the underlying connection.
PostgreSQLAsyncStreamingSource(PostgreSQLAsyncStreamingSource &&other) noexcept
PostgreSQLAsyncStreamingSource(PostgreSQLAsyncConnection &connection, std::string sql, std::vector< std::string > params={})
Constructor with async connection and query parameters.
PostgreSQLAsyncStreamingSource & operator=(const PostgreSQLAsyncStreamingSource &)=delete
PostgreSQLAsyncStreamingSource & operator=(PostgreSQLAsyncStreamingSource &&other) noexcept
boost::asio::awaitable< std::optional< std::string > > get_next_row()
Get the next row from the result set asynchronously.
boost::asio::awaitable< void > async_cleanup()
Explicitly cleanup any active query asynchronously.
bool has_more_rows() const
Check if there are more rows available.
Lazy row that defers cell parsing until accessed.
AsyncStreamingResultSet< PostgreSQLAsyncStreamingSource > create_async_streaming_result(PostgreSQLAsyncConnection &connection, const std::string &sql, Args &&... args)
Create an async streaming result set from a PostgreSQL async connection and query.
STL namespace.
Helper trait to detect if a type is boost::asio::awaitable.