Skip to content

Commit f959db3

Browse files
committed
dropping inserts on connection errors
1 parent e143c5d commit f959db3

File tree

2 files changed

+39
-17
lines changed

2 files changed

+39
-17
lines changed

src/influxdb-cpp-rest/influxdb_raw_db.cpp

+34-16
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,15 @@ void influxdb::raw::db::post(string_t const & query)
7474
// synchronous for now
7575
auto response = client.request(
7676
request_from(builder.to_string(), "", username, password)
77-
).get();
77+
);
7878

79-
if (response.status_code() != status_codes::OK) {
80-
throw_response(response);
79+
try {
80+
response.wait();
81+
if (response.get().status_code() != status_codes::OK) {
82+
throw_response(response.get());
83+
}
84+
} catch (const std::exception& e) {
85+
throw std::runtime_error(e.what());
8186
}
8287
}
8388

@@ -90,29 +95,42 @@ string_t influxdb::raw::db::get(string_t const & query)
9095
// synchronous for now
9196
auto response = client.request(
9297
request_from(builder.to_string(), "", username, password)
93-
).get();
94-
if (response.status_code() == status_codes::OK)
95-
{
96-
return response.extract_string().get();
97-
}
98-
else
99-
{
100-
throw_response(response);
101-
return string_t();
98+
);
99+
100+
try {
101+
response.wait();
102+
if (response.get().status_code() == status_codes::OK)
103+
{
104+
return response.get().extract_string().get();
105+
}
106+
else
107+
{
108+
throw_response(response.get());
109+
return string_t();
110+
}
111+
} catch (const std::exception& e) {
112+
throw std::runtime_error(e.what());
102113
}
103114
}
104115

105116
void influxdb::raw::db::insert(std::string const & lines)
106117
{
107-
auto response = client.request(request_from(uri_with_db, lines, username, password)).get();
108-
if (!(response.status_code() == status_codes::OK || response.status_code() == status_codes::NoContent)) {
109-
throw_response(response);
118+
auto response = client.request(request_from(uri_with_db, lines, username, password));
119+
120+
try {
121+
response.wait();
122+
if (!(response.get().status_code() == status_codes::OK || response.get().status_code() == status_codes::NoContent)) {
123+
throw_response(response.get());
124+
}
125+
} catch (const std::exception& e) {
126+
throw std::runtime_error(e.what());
110127
}
111128
}
112129

130+
// synchronous for now
113131
void influxdb::raw::db::insert_async(std::string const & lines)
114132
{
115-
client.request(request_from(uri_with_db, lines, username, password));
133+
insert(lines);
116134
}
117135

118136
void influxdb::raw::db::with_authentication(std::string const& username, std::string const& password)

src/influxdb-cpp-rest/influxdb_simple_async_api.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ struct influxdb::async_api::simple_db::impl {
6767
.observe_on(rxcpp::synchronize_new_thread())
6868
.subscribe([this](std::shared_ptr<fmt::MemoryWriter> const& w) {
6969
if (w->size() > 0u) {
70-
db.insert_async(w->str());
70+
try {
71+
db.insert_async(w->str());
72+
} catch (const std::runtime_error& e) {
73+
std::cerr << "async_api::insert failed: " << e.what() << " -> Dropping " << w->size() << " bytes" << std::endl;
74+
}
7175
}
7276
},
7377
[](std::exception_ptr ep) {

0 commit comments

Comments
 (0)