relx 0.1.0
A Modern C++23 Type-Safe SQL Query Builder
Loading...
Searching...
No Matches
postgresql_streaming_source.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "../results/streaming_result.hpp"
5
6#include <memory>
7#include <optional>
8#include <string>
9#include <vector>
10
11// Forward declarations
12struct pg_conn;
13using PGconn = pg_conn;
14
15namespace relx::connection {
16
22public:
27 PostgreSQLStreamingSource(PostgreSQLConnection& connection, std::string sql,
28 std::vector<std::string> params = {});
29
35 PostgreSQLStreamingSource(PostgreSQLConnection& connection, std::string sql,
36 std::vector<std::string> params, std::vector<bool> is_binary);
37
40
41 // Disable copy operations
44
45 // Allow move operations
48
52
56 std::optional<std::string> get_next_row();
57
60 const std::vector<std::string>& get_column_names() const;
61
64 bool is_initialized() const { return initialized_; }
65
68 bool has_more_rows() const { return !finished_; }
69
70private:
71 PostgreSQLConnection& connection_;
72 std::string sql_;
73 std::vector<std::string> params_;
74 std::vector<bool> is_binary_;
75 bool use_binary_;
76
77 std::vector<std::string> column_names_;
78 std::vector<bool> is_bytea_column_;
79 bool initialized_;
80 bool finished_;
81 bool convert_bytea_;
82
83 // We need to track the active query state
84 bool query_active_;
85
86 // Cache for the first row (since we consume it during metadata processing)
87 std::optional<std::string> first_row_cached_;
88
90 ConnectionResult<void> start_query();
91
94 void process_column_metadata(struct pg_result* pg_result);
95
99 std::optional<std::string> format_row(struct pg_result* pg_result);
100
104 std::string convert_pg_bytea_to_binary(const std::string& hex_value) const;
105
107 void cleanup();
108};
109
115template <typename... Args>
117 PostgreSQLConnection& connection, const std::string& sql, Args&&... args) {
118 // Convert parameters to strings (similar to execute_typed)
119 std::vector<std::string> param_strings;
120 if constexpr (sizeof...(Args) > 0) {
121 param_strings.reserve(sizeof...(Args));
122
123 auto add_param = [&param_strings](auto&& param) {
124 using ParamType = std::remove_cvref_t<decltype(param)>;
125
126 if constexpr (std::is_same_v<ParamType, std::nullptr_t>) {
127 param_strings.push_back("NULL");
128 } else if constexpr (std::is_same_v<ParamType, std::string> ||
129 std::is_same_v<ParamType, const char*> ||
130 std::is_same_v<ParamType, std::string_view>) {
131 param_strings.push_back(std::string(param));
132 } else if constexpr (std::is_arithmetic_v<ParamType>) {
133 param_strings.push_back(std::to_string(param));
134 } else if constexpr (std::is_same_v<ParamType, bool>) {
135 param_strings.push_back(param ? "t" : "f");
136 } else {
137 std::ostringstream ss;
138 ss << param;
139 param_strings.push_back(ss.str());
140 }
141 };
142
143 (add_param(std::forward<Args>(args)), ...);
144 }
145
147 PostgreSQLStreamingSource(connection, sql, std::move(param_strings)));
148}
149
150} // namespace relx::connection
PostgreSQL implementation of the Connection interface.
PostgreSQL streaming data source for processing large result sets.
PostgreSQLStreamingSource(PostgreSQLConnection &connection, std::string sql, std::vector< std::string > params={})
Constructor with connection and query parameters.
bool has_more_rows() const
Check if there are more rows available.
const std::vector< std::string > & get_column_names() const
Get the column names for the result set.
PostgreSQLStreamingSource & operator=(const PostgreSQLStreamingSource &)=delete
PostgreSQLStreamingSource(PostgreSQLStreamingSource &&other) noexcept
std::optional< std::string > get_next_row()
Get the next row from the result set.
~PostgreSQLStreamingSource()
Destructor that cleans up any active query.
ConnectionResult< void > initialize()
Initialize the streaming query.
PostgreSQLStreamingSource & operator=(PostgreSQLStreamingSource &&other) noexcept
PostgreSQLStreamingSource(const PostgreSQLStreamingSource &)=delete
bool is_initialized() const
Check if the streaming source has been initialized.
PostgreSQLStreamingSource(PostgreSQLConnection &connection, std::string sql, std::vector< std::string > params, std::vector< bool > is_binary)
Constructor with connection and binary query parameters.
Streaming result set for very large datasets.
result::StreamingResultSet< PostgreSQLStreamingSource > create_streaming_result(PostgreSQLConnection &connection, const std::string &sql, Args &&... args)
Create a streaming result set from a PostgreSQL connection and query.
std::expected< T, ConnectionError > ConnectionResult
Type alias for result of connection operations.