sqlx_postgres/connection/establish.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
use crate::HashMap;
use crate::common::StatementCache;
use crate::connection::{sasl, stream::PgStream};
use crate::error::Error;
use crate::io::StatementId;
use crate::message::{
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
};
use crate::{PgConnectOptions, PgConnection};
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
impl PgConnection {
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
// Upgrade to TLS if we were asked to and the server supports it
let mut stream = PgStream::connect(options).await?;
// To begin a session, a frontend opens a connection to the server
// and sends a startup message.
let mut params = vec![
// Sets the display format for date and time values,
// as well as the rules for interpreting ambiguous date input values.
("DateStyle", "ISO, MDY"),
// Sets the client-side encoding (character set).
// <https://www.postgresql.org/docs/devel/multibyte.html#MULTIBYTE-CHARSET-SUPPORTED>
("client_encoding", "UTF8"),
// Sets the time zone for displaying and interpreting time stamps.
("TimeZone", "UTC"),
];
if let Some(ref extra_float_digits) = options.extra_float_digits {
params.push(("extra_float_digits", extra_float_digits));
}
if let Some(ref application_name) = options.application_name {
params.push(("application_name", application_name));
}
if let Some(ref options) = options.options {
params.push(("options", options));
}
stream.write(Startup {
username: Some(&options.username),
database: options.database.as_deref(),
params: ¶ms,
})?;
stream.flush().await?;
// The server then uses this information and the contents of
// its configuration files (such as pg_hba.conf) to determine whether the connection is
// provisionally acceptable, and what additional
// authentication is required (if any).
let mut process_id = 0;
let mut secret_key = 0;
let transaction_status;
loop {
let message = stream.recv().await?;
match message.format {
BackendMessageFormat::Authentication => match message.decode()? {
Authentication::Ok => {
// the authentication exchange is successfully completed
// do nothing; no more information is required to continue
}
Authentication::CleartextPassword => {
// The frontend must now send a [PasswordMessage] containing the
// password in clear-text form.
stream
.send(Password::Cleartext(
options.password.as_deref().unwrap_or_default(),
))
.await?;
}
Authentication::Md5Password(body) => {
// The frontend must now send a [PasswordMessage] containing the
// password (with user name) encrypted via MD5, then encrypted again
// using the 4-byte random salt specified in the
// [AuthenticationMD5Password] message.
stream
.send(Password::Md5 {
username: &options.username,
password: options.password.as_deref().unwrap_or_default(),
salt: body.salt,
})
.await?;
}
Authentication::Sasl(body) => {
sasl::authenticate(&mut stream, options, body).await?;
}
method => {
return Err(err_protocol!(
"unsupported authentication method: {:?}",
method
));
}
},
BackendMessageFormat::BackendKeyData => {
// provides secret-key data that the frontend must save if it wants to be
// able to issue cancel requests later
let data: BackendKeyData = message.decode()?;
process_id = data.process_id;
secret_key = data.secret_key;
}
BackendMessageFormat::ReadyForQuery => {
// start-up is completed. The frontend can now issue commands
transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
break;
}
_ => {
return Err(err_protocol!(
"establish: unexpected message: {:?}",
message.format
))
}
}
}
Ok(PgConnection {
stream,
process_id,
secret_key,
transaction_status,
transaction_depth: 0,
pending_ready_for_query_count: 0,
next_statement_id: StatementId::NAMED_START,
cache_statement: StatementCache::new(options.statement_cache_capacity),
cache_type_oid: HashMap::new(),
cache_type_info: HashMap::new(),
cache_elem_type_to_array: HashMap::new(),
log_settings: options.log_settings.clone(),
})
}
}