Skip to content

Commit 91522e4

Browse files
authored
Merge pull request #22 from MaterializeInc/hostaddrs
Integrate upstream hostaddr/load balancing support
2 parents b759caa + ea22fd8 commit 91522e4

File tree

17 files changed

+398
-114
lines changed

17 files changed

+398
-114
lines changed

Diff for: .github/workflows/ci.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
- uses: actions/checkout@v3
2121
- uses: sfackler/actions/rustup@master
2222
- uses: sfackler/actions/rustfmt@master
23-
23+
2424
clippy:
2525
name: clippy
2626
runs-on: ubuntu-latest
@@ -55,7 +55,7 @@ jobs:
5555
- run: docker compose up -d
5656
- uses: sfackler/actions/rustup@master
5757
with:
58-
version: 1.64.0
58+
version: 1.77.0
5959
- run: echo "version=$(rustc --version)" >> $GITHUB_OUTPUT
6060
id: rust-version
6161
- uses: actions/cache@v3

Diff for: postgres-protocol/src/types/test.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,15 @@ fn ltree_str() {
174174
let mut query = vec![1u8];
175175
query.extend_from_slice("A.B.C".as_bytes());
176176

177-
assert!(matches!(ltree_from_sql(query.as_slice()), Ok(_)))
177+
assert!(ltree_from_sql(query.as_slice()).is_ok())
178178
}
179179

180180
#[test]
181181
fn ltree_wrong_version() {
182182
let mut query = vec![2u8];
183183
query.extend_from_slice("A.B.C".as_bytes());
184184

185-
assert!(matches!(ltree_from_sql(query.as_slice()), Err(_)))
185+
assert!(ltree_from_sql(query.as_slice()).is_err())
186186
}
187187

188188
#[test]
@@ -202,15 +202,15 @@ fn lquery_str() {
202202
let mut query = vec![1u8];
203203
query.extend_from_slice("A.B.C".as_bytes());
204204

205-
assert!(matches!(lquery_from_sql(query.as_slice()), Ok(_)))
205+
assert!(lquery_from_sql(query.as_slice()).is_ok())
206206
}
207207

208208
#[test]
209209
fn lquery_wrong_version() {
210210
let mut query = vec![2u8];
211211
query.extend_from_slice("A.B.C".as_bytes());
212212

213-
assert!(matches!(lquery_from_sql(query.as_slice()), Err(_)))
213+
assert!(lquery_from_sql(query.as_slice()).is_err())
214214
}
215215

216216
#[test]
@@ -230,13 +230,13 @@ fn ltxtquery_str() {
230230
let mut query = vec![1u8];
231231
query.extend_from_slice("a & b*".as_bytes());
232232

233-
assert!(matches!(ltree_from_sql(query.as_slice()), Ok(_)))
233+
assert!(ltree_from_sql(query.as_slice()).is_ok())
234234
}
235235

236236
#[test]
237237
fn ltxtquery_wrong_version() {
238238
let mut query = vec![2u8];
239239
query.extend_from_slice("a & b*".as_bytes());
240240

241-
assert!(matches!(ltree_from_sql(query.as_slice()), Err(_)))
241+
assert!(ltree_from_sql(query.as_slice()).is_err())
242242
}

Diff for: postgres-types/src/chrono_04.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl ToSql for NaiveDateTime {
4040
impl<'a> FromSql<'a> for DateTime<Utc> {
4141
fn from_sql(type_: &Type, raw: &[u8]) -> Result<DateTime<Utc>, Box<dyn Error + Sync + Send>> {
4242
let naive = NaiveDateTime::from_sql(type_, raw)?;
43-
Ok(DateTime::from_utc(naive, Utc))
43+
Ok(DateTime::from_naive_utc_and_offset(naive, Utc))
4444
}
4545

4646
accepts!(TIMESTAMPTZ);
@@ -111,7 +111,7 @@ impl<'a> FromSql<'a> for NaiveDate {
111111
let jd = types::date_from_sql(raw)?;
112112
base()
113113
.date()
114-
.checked_add_signed(Duration::days(i64::from(jd)))
114+
.checked_add_signed(Duration::try_days(i64::from(jd)).unwrap())
115115
.ok_or_else(|| "value too large to decode".into())
116116
}
117117

Diff for: postgres/src/config.rs

+33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::connection::Connection;
66
use crate::Client;
77
use log::info;
88
use std::fmt;
9+
use std::net::IpAddr;
910
use std::path::Path;
1011
use std::str::FromStr;
1112
use std::sync::Arc;
@@ -43,6 +44,19 @@ use tokio_postgres::{Error, Socket};
4344
/// path to the directory containing Unix domain sockets. Otherwise, it is treated as a hostname. Multiple hosts
4445
/// can be specified, separated by commas. Each host will be tried in turn when connecting. Required if connecting
4546
/// with the `connect` method.
47+
/// * `hostaddr` - Numeric IP address of host to connect to. This should be in the standard IPv4 address format,
48+
/// e.g., 172.28.40.9. If your machine supports IPv6, you can also use those addresses.
49+
/// If this parameter is not specified, the value of `host` will be looked up to find the corresponding IP address,
50+
/// - or if host specifies an IP address, that value will be used directly.
51+
/// Using `hostaddr` allows the application to avoid a host name look-up, which might be important in applications
52+
/// with time constraints. However, a host name is required for verify-full SSL certificate verification.
53+
/// Specifically:
54+
/// * If `hostaddr` is specified without `host`, the value for `hostaddr` gives the server network address.
55+
/// The connection attempt will fail if the authentication method requires a host name;
56+
/// * If `host` is specified without `hostaddr`, a host name lookup occurs;
57+
/// * If both `host` and `hostaddr` are specified, the value for `hostaddr` gives the server network address.
58+
/// The value for `host` is ignored unless the authentication method requires it,
59+
/// in which case it will be used as the host name.
4660
/// * `port` - The port to connect to. Multiple ports can be specified, separated by commas. The number of ports must be
4761
/// either 1, in which case it will be used for all hosts, or the same as the number of hosts. Defaults to 5432 if
4862
/// omitted or the empty string.
@@ -74,6 +88,10 @@ use tokio_postgres::{Error, Socket};
7488
/// ```
7589
///
7690
/// ```not_rust
91+
/// host=host1,host2,host3 port=1234,,5678 hostaddr=127.0.0.1,127.0.0.2,127.0.0.3 user=postgres target_session_attrs=read-write
92+
/// ```
93+
///
94+
/// ```not_rust
7795
/// host=host1,host2,host3 port=1234,,5678 user=postgres target_session_attrs=read-write
7896
/// ```
7997
///
@@ -250,6 +268,7 @@ impl Config {
250268
///
251269
/// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order. On Unix
252270
/// systems, a host starting with a `/` is interpreted as a path to a directory containing Unix domain sockets.
271+
/// There must be either no hosts, or the same number of hosts as hostaddrs.
253272
pub fn host(&mut self, host: &str) -> &mut Config {
254273
self.config.host(host);
255274
self
@@ -260,6 +279,11 @@ impl Config {
260279
self.config.get_hosts()
261280
}
262281

282+
/// Gets the hostaddrs that have been added to the configuration with `hostaddr`.
283+
pub fn get_hostaddrs(&self) -> &[IpAddr] {
284+
self.config.get_hostaddrs()
285+
}
286+
263287
/// Adds a Unix socket host to the configuration.
264288
///
265289
/// Unlike `host`, this method allows non-UTF8 paths.
@@ -272,6 +296,15 @@ impl Config {
272296
self
273297
}
274298

299+
/// Adds a hostaddr to the configuration.
300+
///
301+
/// Multiple hostaddrs can be specified by calling this method multiple times, and each will be tried in order.
302+
/// There must be either no hostaddrs, or the same number of hostaddrs as hosts.
303+
pub fn hostaddr(&mut self, hostaddr: IpAddr) -> &mut Config {
304+
self.config.hostaddr(hostaddr);
305+
self
306+
}
307+
275308
/// Adds a port to the configuration.
276309
///
277310
/// Multiple ports can be specified by calling this method multiple times. There must either be no ports, in which

Diff for: tokio-postgres/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ serde = { version = "1.0", optional = true }
5959
socket2 = { version = "0.5", features = ["all"] }
6060
tokio = { version = "1.27", features = ["io-util"] }
6161
tokio-util = { version = "0.7", features = ["codec"] }
62+
rand = "0.8.5"
6263

6364
[dev-dependencies]
6465
futures-executor = "0.3"
@@ -78,7 +79,6 @@ eui48-04 = { version = "0.4", package = "eui48" }
7879
eui48-1 = { version = "1.0", package = "eui48" }
7980
geo-types-06 = { version = "0.6", package = "geo-types" }
8081
geo-types-07 = { version = "0.7", package = "geo-types" }
81-
serde-1 = { version = "1.0", package = "serde" }
8282
serde_json-1 = { version = "1.0", package = "serde_json" }
8383
smol_str-01 = { version = "0.1", package = "smol_str" }
8484
uuid-08 = { version = "0.8", package = "uuid" }

Diff for: tokio-postgres/src/cancel_query.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::client::SocketConfig;
2-
use crate::config::{Host, SslMode};
2+
use crate::config::SslMode;
33
use crate::tls::MakeTlsConnect;
44
use crate::{cancel_query_raw, connect_socket, Error, Socket};
55
use std::io;
@@ -24,24 +24,20 @@ where
2424
}
2525
};
2626

27-
let hostname = match &config.host {
28-
Host::Tcp(host) => &**host,
29-
// postgres doesn't support TLS over unix sockets, so the choice here doesn't matter
30-
#[cfg(unix)]
31-
Host::Unix(_) => "",
32-
};
3327
let tls = tls
34-
.make_tls_connect(hostname)
28+
.make_tls_connect(config.hostname.as_deref().unwrap_or(""))
3529
.map_err(|e| Error::tls(e.into()))?;
30+
let has_hostname = config.hostname.is_some();
3631

3732
let socket = connect_socket::connect_socket(
38-
&config.host,
33+
&config.addr,
3934
config.port,
4035
config.connect_timeout,
4136
config.tcp_user_timeout,
4237
config.keepalive.as_ref(),
4338
)
4439
.await?;
4540

46-
cancel_query_raw::cancel_query_raw(socket, ssl_mode, tls, process_id, secret_key).await
41+
cancel_query_raw::cancel_query_raw(socket, ssl_mode, tls, has_hostname, process_id, secret_key)
42+
.await
4743
}

Diff for: tokio-postgres/src/cancel_query_raw.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ pub async fn cancel_query_raw<S, T>(
99
stream: S,
1010
mode: SslMode,
1111
tls: T,
12+
has_hostname: bool,
1213
process_id: i32,
1314
secret_key: i32,
1415
) -> Result<(), Error>
1516
where
1617
S: AsyncRead + AsyncWrite + Unpin,
1718
T: TlsConnect<S>,
1819
{
19-
let mut stream = connect_tls::connect_tls(stream, mode, tls).await?;
20+
let mut stream = connect_tls::connect_tls(stream, mode, tls, has_hostname).await?;
2021

2122
let mut buf = BytesMut::new();
2223
frontend::cancel_request(process_id, secret_key, &mut buf);

Diff for: tokio-postgres/src/cancel_token.rs

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl CancelToken {
5555
stream,
5656
self.ssl_mode,
5757
tls,
58+
true,
5859
self.process_id,
5960
self.secret_key,
6061
)

Diff for: tokio-postgres/src/client.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
use crate::codec::{BackendMessages, FrontendMessage};
2-
#[cfg(feature = "runtime")]
3-
use crate::config::Host;
42
use crate::config::SslMode;
53
use crate::connection::{Request, RequestMessages};
64
use crate::copy_both::CopyBothDuplex;
@@ -29,6 +27,10 @@ use postgres_protocol::message::{backend::Message, frontend};
2927
use postgres_types::BorrowToSql;
3028
use std::collections::HashMap;
3129
use std::fmt;
30+
#[cfg(feature = "runtime")]
31+
use std::net::IpAddr;
32+
#[cfg(feature = "runtime")]
33+
use std::path::PathBuf;
3234
use std::sync::Arc;
3335
use std::task::{Context, Poll};
3436
#[cfg(feature = "runtime")]
@@ -155,13 +157,22 @@ impl InnerClient {
155157
#[cfg(feature = "runtime")]
156158
#[derive(Clone)]
157159
pub(crate) struct SocketConfig {
158-
pub host: Host,
160+
pub addr: Addr,
161+
pub hostname: Option<String>,
159162
pub port: u16,
160163
pub connect_timeout: Option<Duration>,
161164
pub tcp_user_timeout: Option<Duration>,
162165
pub keepalive: Option<KeepaliveConfig>,
163166
}
164167

168+
#[cfg(feature = "runtime")]
169+
#[derive(Clone)]
170+
pub(crate) enum Addr {
171+
Tcp(IpAddr),
172+
#[cfg(unix)]
173+
Unix(PathBuf),
174+
}
175+
165176
/// An asynchronous PostgreSQL client.
166177
///
167178
/// The client is one half of what is returned when a connection is established. Users interact with the database

0 commit comments

Comments
 (0)