diff --git a/vendor/postgres b/vendor/postgres index edf91ce106a8..04cfa326a543 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit edf91ce106a8bad3c4a0baf6f9b8a7b7d9b6bcfc +Subproject commit 04cfa326a543171967c16954306f5a9dd8a470ea diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index e5187c3f6be5..4c83f1c4baf9 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] regex = "1.4.5" +bincode = "1.3" bytes = "1.0.1" byteorder = "1.4.3" fs2 = "0.4.3" diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index e6eb9cb5c69a..9483a2c6cb19 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -1,29 +1,33 @@ -//! This implements the Safekeeper protocol. +//! This implements the Safekeeper protocol, picking up immediately after the "START_WAL_PUSH" message //! //! FIXME: better description needed here use anyhow::{bail, Result}; +use bincode::config::Options; +use bytes::{Buf, Bytes}; use log::*; use postgres::{Client, Config, NoTls}; use serde::{Deserialize, Serialize}; use std::cmp::{max, min}; use std::fs::{self, File, OpenOptions}; -use std::io::{BufReader, Read, Seek, SeekFrom, Write}; -use std::net::{SocketAddr, TcpStream}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::net::SocketAddr; use std::str; use std::sync::Arc; use std::thread; use std::thread::sleep; -use zenith_utils::bin_ser::LeSer; +use zenith_utils::bin_ser::{self, le_coder, LeSer}; use zenith_utils::connstring::connection_host_port; use zenith_utils::lsn::Lsn; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, FeMessage, SystemId}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::replication::HotStandbyFeedback; +use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; -use zenith_utils::pq_proto::SystemId; pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 1; @@ -134,17 +138,11 @@ struct SafeKeeperResponse { hs_feedback: HotStandbyFeedback, } -#[derive(Debug)] -pub struct ReceiveWalConn { - pub timeline: Option>, - /// Postgres connection, buffered input - pub stream_in: BufReader, - /// Postgres connection, output - pub stream_out: TcpStream, - /// The cached result of socket.peer_addr() - pub peer_addr: SocketAddr, - /// wal acceptor configuration - pub conf: WalAcceptorConf, +pub struct ReceiveWalConn<'pg> { + /// Postgres connection + pg_backend: &'pg mut PostgresBackend, + /// The cached result of `pg_backend.socket().peer_addr()` (roughly) + peer_addr: SocketAddr, } /// @@ -190,37 +188,64 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT } } -impl ReceiveWalConn { - pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { - let peer_addr = socket.peer_addr()?; - let conn = ReceiveWalConn { - timeline: None, - stream_in: BufReader::new(socket.try_clone()?), - stream_out: socket, +impl<'pg> ReceiveWalConn<'pg> { + pub fn new(pg: &'pg mut PostgresBackend) -> Result> { + let peer_addr = pg.get_peer_addr()?; + Ok(ReceiveWalConn { + pg_backend: pg, peer_addr, - conf, - }; - Ok(conn) + }) } - fn read_req(&mut self) -> Result { - // As the trait bound implies, this always encodes little-endian. - Ok(T::des_from(&mut self.stream_in)?) + // Read and extract the bytes of a `CopyData` message from the postgres instance + fn read_msg_bytes(&mut self) -> Result { + match self.pg_backend.read_message()? { + Some(FeMessage::CopyData(bytes)) => Ok(bytes), + Some(msg) => bail!("expected `CopyData` message, found {:?}", msg), + None => bail!("connection closed unexpectedly"), + } + } + + // Read the result of a `CopyData` message sent from the postgres instance + // + // As the trait bound implies, this always encodes little-endian. + fn read_msg(&mut self) -> Result { + let data = self.read_msg_bytes()?; + // Taken directly from `LeSer::des`: + let value = le_coder() + .reject_trailing_bytes() + .deserialize(&data) + .or(Err(bin_ser::DeserializeError::BadInput))?; + Ok(value) + } + + // Writes the value into a `CopyData` message sent to the postgres instance + fn write_msg(&mut self, value: &T) -> Result<()> { + let mut buf = Vec::new(); + value.ser_into(&mut buf)?; + self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; + Ok(()) } /// Receive WAL from wal_proposer - pub fn run(&mut self) -> Result<()> { + pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> { + let mut this_timeline: Option> = None; + + // Notify the libpq client that it's allowed to send `CopyData` messages + self.pg_backend + .write_message(&BeMessage::CopyBothResponse)?; + // Receive information about server - let server_info = self.read_req::()?; + let server_info = self.read_msg::()?; info!( "Start handshake with wal_proposer {} sysid {} timeline {} tenant {}", self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id, ); // FIXME: also check that the system identifier matches - self.timeline.set(server_info.timeline_id)?; - self.timeline.get().load_control_file(&self.conf)?; + this_timeline.set(server_info.timeline_id)?; + this_timeline.get().load_control_file(&swh.conf)?; - let mut my_info = self.timeline.get().get_info(); + let mut my_info = this_timeline.get().get_info(); /* Check protocol compatibility */ if server_info.protocol_version != SK_PROTOCOL_VERSION { @@ -246,24 +271,24 @@ impl ReceiveWalConn { my_info.server.node_id = node_id; /* Calculate WAL end based on local data */ - let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); + let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true); my_info.flush_lsn = flush_lsn; - my_info.server.timeline = timeline; + my_info.server.timeline = timeline_id; info!( "find_end_of_wal in {:?}: timeline={} flush_lsn={}", - &self.conf.data_dir, timeline, flush_lsn + &swh.conf.data_dir, timeline_id, flush_lsn ); /* Report my identifier to proposer */ - my_info.ser_into(&mut self.stream_out)?; + self.write_msg(&my_info)?; /* Wait for vote request */ - let prop = self.read_req::()?; + let prop = self.read_msg::()?; /* This is Paxos check which should ensure that only one master can perform commits */ if prop.node_id < my_info.server.node_id { /* Send my node-id to inform proposer that it's candidate was rejected */ - my_info.server.node_id.ser_into(&mut self.stream_out)?; + self.write_msg(&my_info.server.node_id)?; bail!( "Reject connection attempt with term {} because my term is {}", prop.node_id.term, @@ -271,21 +296,21 @@ impl ReceiveWalConn { ); } my_info.server.node_id = prop.node_id; - self.timeline.get().set_info(&my_info); + this_timeline.get().set_info(&my_info); /* Need to persist our vote first */ - self.timeline.get().save_control_file(true)?; + this_timeline.get().save_control_file(true)?; let mut flushed_restart_lsn = Lsn(0); let wal_seg_size = server_info.wal_seg_size as usize; /* Acknowledge the proposed candidate by returning it to the proposer */ - prop.node_id.ser_into(&mut self.stream_out)?; + self.write_msg(&prop.node_id)?; - if self.conf.pageserver_addr.is_some() { + if swh.conf.pageserver_addr.is_some() { // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - let conf = self.conf.clone(); - let timelineid = self.timeline.get().timelineid; + let conf = swh.conf.clone(); + let timelineid = this_timeline.get().timelineid; let tenantid = server_info.tenant_id; thread::spawn(move || { request_callback(conf, timelineid, tenantid); @@ -302,7 +327,10 @@ impl ReceiveWalConn { let mut sync_control_file = false; /* Receive message header */ - let req = self.read_req::()?; + let msg_bytes = self.read_msg_bytes()?; + let mut msg_reader = msg_bytes.reader(); + + let req = SafeKeeperRequest::des_from(&mut msg_reader)?; if req.sender_id != my_info.server.node_id { bail!("Sender NodeId is changed"); } @@ -320,12 +348,20 @@ impl ReceiveWalConn { rec_size, start_pos, end_pos, ); - /* Receive message body */ - let mut inbuf = vec![0u8; rec_size]; - self.stream_in.read_exact(&mut inbuf)?; + /* Receive message body (from the rest of the message) */ + let mut buf = Vec::with_capacity(rec_size); + msg_reader.read_to_end(&mut buf)?; + assert_eq!(buf.len(), rec_size); /* Save message in file */ - self.write_wal_file(start_pos, timeline, wal_seg_size, &inbuf)?; + Self::write_wal_file( + swh, + start_pos, + timeline_id, + this_timeline.get(), + wal_seg_size, + &buf, + )?; my_info.restart_lsn = req.restart_lsn; my_info.commit_lsn = req.commit_lsn; @@ -350,7 +386,7 @@ impl ReceiveWalConn { * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; - self.timeline.get().save_control_file(sync_control_file)?; + this_timeline.get().save_control_file(sync_control_file)?; if sync_control_file { flushed_restart_lsn = my_info.restart_lsn; @@ -361,25 +397,27 @@ impl ReceiveWalConn { let resp = SafeKeeperResponse { epoch: my_info.epoch, flush_lsn: end_pos, - hs_feedback: self.timeline.get().get_hs_feedback(), + hs_feedback: this_timeline.get().get_hs_feedback(), }; - resp.ser_into(&mut self.stream_out)?; + self.write_msg(&resp)?; /* * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ - self.timeline + this_timeline .get() .notify_wal_senders(min(req.commit_lsn, end_pos)); } + Ok(()) } fn write_wal_file( - &self, + swh: &SendWalHandler, startpos: Lsn, - timeline: TimeLineID, + timeline_id: TimeLineID, + timeline: &Arc, wal_seg_size: usize, buf: &[u8], ) -> Result<()> { @@ -407,16 +445,16 @@ impl ReceiveWalConn { /* Open file */ let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self + let wal_file_name = XLogFileName(timeline_id, segno, wal_seg_size); + let wal_file_path = swh .conf .data_dir - .join(self.timeline.get().timelineid.to_string()) + .join(timeline.timelineid.to_string()) .join(wal_file_name.clone()); - let wal_file_partial_path = self + let wal_file_partial_path = swh .conf .data_dir - .join(self.timeline.get().timelineid.to_string()) + .join(timeline.timelineid.to_string()) .join(wal_file_name.clone() + ".partial"); { @@ -454,7 +492,7 @@ impl ReceiveWalConn { wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; // Flush file is not prohibited - if !self.conf.no_sync { + if !swh.conf.no_sync { wal_file.sync_all()?; } } diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index a233c8d53671..907e4868cd76 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -1,5 +1,5 @@ //! This module implements the streaming side of replication protocol, starting -//! with the "START REPLICATION" message. +//! with the "START_REPLICATION" message. use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index e10d9bdf167f..a6be98fc1a26 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -2,6 +2,7 @@ //! pageserver/any other consumer. //! +use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; @@ -45,6 +46,9 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_REPLICATION") { ReplicationConn::new(pgb).run(self, pgb, &query_string)?; Ok(()) + } else if query_string.starts_with(b"START_WAL_PUSH") { + ReceiveWalConn::new(pgb)?.run(self)?; + Ok(()) } else { bail!("Unexpected command {:?}", query_string); } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index f4e05a0bc915..33d619bcb8b2 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -4,11 +4,9 @@ //! use anyhow::Result; use log::*; -use std::io::Read; use std::net::{TcpListener, TcpStream}; use std::thread; -use crate::receive_wal::ReceiveWalConn; use crate::send_wal::SendWalHandler; use crate::WalAcceptorConf; use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; @@ -37,35 +35,15 @@ pub fn thread_main(conf: WalAcceptorConf) -> Result<()> { } } -/// This is run by main_loop, inside a background thread. +/// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket(mut socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { +fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { socket.set_nodelay(true)?; - // Peek at the incoming data to see what protocol is being sent. - let peeked = peek_u32(&mut socket)?; - if peeked == 0 { - // Consume the 4 bytes we peeked at. This protocol begins after them. - socket.read_exact(&mut [0u8; 4])?; - ReceiveWalConn::new(socket, conf)?.run()?; // internal protocol between wal_proposer and wal_acceptor - } else { - let mut conn_handler = SendWalHandler::new(conf); - let pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; - // libpq replication protocol between wal_acceptor and replicas/pagers - pgbackend.run(&mut conn_handler)?; - } - Ok(()) -} + let mut conn_handler = SendWalHandler::new(conf); + let pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; + // libpq replication protocol between wal_acceptor and replicas/pagers + pgbackend.run(&mut conn_handler)?; -/// Fetch the first 4 bytes from the network (big endian), without consuming them. -/// -/// This is used to help determine what protocol the peer is using. -fn peek_u32(stream: &mut TcpStream) -> Result { - let mut buf = [0u8; 4]; - loop { - let num_bytes = stream.peek(&mut buf)?; - if num_bytes == 4 { - return Ok(u32::from_be_bytes(buf)); - } - } + Ok(()) } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 8cde6fa7ce3a..d4de084442dd 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -10,7 +10,7 @@ use log::*; use rand::Rng; use serde::{Deserialize, Serialize}; use std::io::{self, BufReader, Write}; -use std::net::{Shutdown, TcpStream}; +use std::net::{Shutdown, SocketAddr, TcpStream}; use std::str::FromStr; pub trait Handler { @@ -137,6 +137,10 @@ impl PostgresBackend { } } + pub fn get_peer_addr(&self) -> Result { + Ok(self.stream_out.peer_addr()?) + } + pub fn take_stream_in(&mut self) -> Option> { self.stream_in.take() }