From ccb099d62cf28963e41c04ab14b7b2b95255bece Mon Sep 17 00:00:00 2001 From: sharnoff Date: Fri, 13 Aug 2021 08:59:58 -0700 Subject: [PATCH] Rework walkeeper protocol to use libpq Most of the work here was done on the postgres side. There's more information in the commit message there. (see: https://github.com/zenithdb/postgres/commit/04cfa326a543171967c16954306f5a9dd8a470ea) On the WAL acceptor side, we're now expecting 'START_WAL_PUSH' to initialize the WAL keeper protocol. Everything else is mostly the same, with the only real difference being that protocol messages are now discrete CopyData messages sent over the postgres protocol. For the sake of documentation, the full set of these messages is: <- recv: START_WAL_PUSH query <- recv: server info from postgres (type `ServerInfo`) -> send: walkeeper info (type `SafeKeeperInfo`) <- recv: vote info (type `RequestVote`) if node id mismatch: -> send: self node id (type `NodeId`); exit -> send: confirm vote (with node id) (type `NodeId`) loop: <- recv: info and maybe WAL block (type `SafeKeeperRequest` + bytes) (break loop if done) -> send: confirm receipt (type `SafeKeeperResponse`) --- vendor/postgres | 2 +- walkeeper/Cargo.toml | 1 + walkeeper/src/receive_wal.rs | 164 +++++++++++++++++---------- walkeeper/src/replication.rs | 2 +- walkeeper/src/send_wal.rs | 4 + walkeeper/src/wal_service.rs | 36 ++---- zenith_utils/src/postgres_backend.rs | 6 +- 7 files changed, 120 insertions(+), 95 deletions(-) diff --git a/vendor/postgres b/vendor/postgres index edf91ce106a8..04cfa326a543 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit edf91ce106a8bad3c4a0baf6f9b8a7b7d9b6bcfc +Subproject commit 04cfa326a543171967c16954306f5a9dd8a470ea diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index e5187c3f6be5..4c83f1c4baf9 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] regex = "1.4.5" +bincode = "1.3" bytes = "1.0.1" byteorder = "1.4.3" fs2 = "0.4.3" diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index e6eb9cb5c69a..9483a2c6cb19 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -1,29 +1,33 @@ -//! This implements the Safekeeper protocol. +//! This implements the Safekeeper protocol, picking up immediately after the "START_WAL_PUSH" message //! //! FIXME: better description needed here use anyhow::{bail, Result}; +use bincode::config::Options; +use bytes::{Buf, Bytes}; use log::*; use postgres::{Client, Config, NoTls}; use serde::{Deserialize, Serialize}; use std::cmp::{max, min}; use std::fs::{self, File, OpenOptions}; -use std::io::{BufReader, Read, Seek, SeekFrom, Write}; -use std::net::{SocketAddr, TcpStream}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::net::SocketAddr; use std::str; use std::sync::Arc; use std::thread; use std::thread::sleep; -use zenith_utils::bin_ser::LeSer; +use zenith_utils::bin_ser::{self, le_coder, LeSer}; use zenith_utils::connstring::connection_host_port; use zenith_utils::lsn::Lsn; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, FeMessage, SystemId}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::replication::HotStandbyFeedback; +use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; -use zenith_utils::pq_proto::SystemId; pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 1; @@ -134,17 +138,11 @@ struct SafeKeeperResponse { hs_feedback: HotStandbyFeedback, } -#[derive(Debug)] -pub struct ReceiveWalConn { - pub timeline: Option>, - /// Postgres connection, buffered input - pub stream_in: BufReader, - /// Postgres connection, output - pub stream_out: TcpStream, - /// The cached result of socket.peer_addr() - pub peer_addr: SocketAddr, - /// wal acceptor configuration - pub conf: WalAcceptorConf, +pub struct ReceiveWalConn<'pg> { + /// Postgres connection + pg_backend: &'pg mut PostgresBackend, + /// The cached result of `pg_backend.socket().peer_addr()` (roughly) + peer_addr: SocketAddr, } /// @@ -190,37 +188,64 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT } } -impl ReceiveWalConn { - pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { - let peer_addr = socket.peer_addr()?; - let conn = ReceiveWalConn { - timeline: None, - stream_in: BufReader::new(socket.try_clone()?), - stream_out: socket, +impl<'pg> ReceiveWalConn<'pg> { + pub fn new(pg: &'pg mut PostgresBackend) -> Result> { + let peer_addr = pg.get_peer_addr()?; + Ok(ReceiveWalConn { + pg_backend: pg, peer_addr, - conf, - }; - Ok(conn) + }) } - fn read_req(&mut self) -> Result { - // As the trait bound implies, this always encodes little-endian. - Ok(T::des_from(&mut self.stream_in)?) + // Read and extract the bytes of a `CopyData` message from the postgres instance + fn read_msg_bytes(&mut self) -> Result { + match self.pg_backend.read_message()? { + Some(FeMessage::CopyData(bytes)) => Ok(bytes), + Some(msg) => bail!("expected `CopyData` message, found {:?}", msg), + None => bail!("connection closed unexpectedly"), + } + } + + // Read the result of a `CopyData` message sent from the postgres instance + // + // As the trait bound implies, this always encodes little-endian. + fn read_msg(&mut self) -> Result { + let data = self.read_msg_bytes()?; + // Taken directly from `LeSer::des`: + let value = le_coder() + .reject_trailing_bytes() + .deserialize(&data) + .or(Err(bin_ser::DeserializeError::BadInput))?; + Ok(value) + } + + // Writes the value into a `CopyData` message sent to the postgres instance + fn write_msg(&mut self, value: &T) -> Result<()> { + let mut buf = Vec::new(); + value.ser_into(&mut buf)?; + self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; + Ok(()) } /// Receive WAL from wal_proposer - pub fn run(&mut self) -> Result<()> { + pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> { + let mut this_timeline: Option> = None; + + // Notify the libpq client that it's allowed to send `CopyData` messages + self.pg_backend + .write_message(&BeMessage::CopyBothResponse)?; + // Receive information about server - let server_info = self.read_req::()?; + let server_info = self.read_msg::()?; info!( "Start handshake with wal_proposer {} sysid {} timeline {} tenant {}", self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id, ); // FIXME: also check that the system identifier matches - self.timeline.set(server_info.timeline_id)?; - self.timeline.get().load_control_file(&self.conf)?; + this_timeline.set(server_info.timeline_id)?; + this_timeline.get().load_control_file(&swh.conf)?; - let mut my_info = self.timeline.get().get_info(); + let mut my_info = this_timeline.get().get_info(); /* Check protocol compatibility */ if server_info.protocol_version != SK_PROTOCOL_VERSION { @@ -246,24 +271,24 @@ impl ReceiveWalConn { my_info.server.node_id = node_id; /* Calculate WAL end based on local data */ - let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); + let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true); my_info.flush_lsn = flush_lsn; - my_info.server.timeline = timeline; + my_info.server.timeline = timeline_id; info!( "find_end_of_wal in {:?}: timeline={} flush_lsn={}", - &self.conf.data_dir, timeline, flush_lsn + &swh.conf.data_dir, timeline_id, flush_lsn ); /* Report my identifier to proposer */ - my_info.ser_into(&mut self.stream_out)?; + self.write_msg(&my_info)?; /* Wait for vote request */ - let prop = self.read_req::()?; + let prop = self.read_msg::()?; /* This is Paxos check which should ensure that only one master can perform commits */ if prop.node_id < my_info.server.node_id { /* Send my node-id to inform proposer that it's candidate was rejected */ - my_info.server.node_id.ser_into(&mut self.stream_out)?; + self.write_msg(&my_info.server.node_id)?; bail!( "Reject connection attempt with term {} because my term is {}", prop.node_id.term, @@ -271,21 +296,21 @@ impl ReceiveWalConn { ); } my_info.server.node_id = prop.node_id; - self.timeline.get().set_info(&my_info); + this_timeline.get().set_info(&my_info); /* Need to persist our vote first */ - self.timeline.get().save_control_file(true)?; + this_timeline.get().save_control_file(true)?; let mut flushed_restart_lsn = Lsn(0); let wal_seg_size = server_info.wal_seg_size as usize; /* Acknowledge the proposed candidate by returning it to the proposer */ - prop.node_id.ser_into(&mut self.stream_out)?; + self.write_msg(&prop.node_id)?; - if self.conf.pageserver_addr.is_some() { + if swh.conf.pageserver_addr.is_some() { // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - let conf = self.conf.clone(); - let timelineid = self.timeline.get().timelineid; + let conf = swh.conf.clone(); + let timelineid = this_timeline.get().timelineid; let tenantid = server_info.tenant_id; thread::spawn(move || { request_callback(conf, timelineid, tenantid); @@ -302,7 +327,10 @@ impl ReceiveWalConn { let mut sync_control_file = false; /* Receive message header */ - let req = self.read_req::()?; + let msg_bytes = self.read_msg_bytes()?; + let mut msg_reader = msg_bytes.reader(); + + let req = SafeKeeperRequest::des_from(&mut msg_reader)?; if req.sender_id != my_info.server.node_id { bail!("Sender NodeId is changed"); } @@ -320,12 +348,20 @@ impl ReceiveWalConn { rec_size, start_pos, end_pos, ); - /* Receive message body */ - let mut inbuf = vec![0u8; rec_size]; - self.stream_in.read_exact(&mut inbuf)?; + /* Receive message body (from the rest of the message) */ + let mut buf = Vec::with_capacity(rec_size); + msg_reader.read_to_end(&mut buf)?; + assert_eq!(buf.len(), rec_size); /* Save message in file */ - self.write_wal_file(start_pos, timeline, wal_seg_size, &inbuf)?; + Self::write_wal_file( + swh, + start_pos, + timeline_id, + this_timeline.get(), + wal_seg_size, + &buf, + )?; my_info.restart_lsn = req.restart_lsn; my_info.commit_lsn = req.commit_lsn; @@ -350,7 +386,7 @@ impl ReceiveWalConn { * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; - self.timeline.get().save_control_file(sync_control_file)?; + this_timeline.get().save_control_file(sync_control_file)?; if sync_control_file { flushed_restart_lsn = my_info.restart_lsn; @@ -361,25 +397,27 @@ impl ReceiveWalConn { let resp = SafeKeeperResponse { epoch: my_info.epoch, flush_lsn: end_pos, - hs_feedback: self.timeline.get().get_hs_feedback(), + hs_feedback: this_timeline.get().get_hs_feedback(), }; - resp.ser_into(&mut self.stream_out)?; + self.write_msg(&resp)?; /* * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ - self.timeline + this_timeline .get() .notify_wal_senders(min(req.commit_lsn, end_pos)); } + Ok(()) } fn write_wal_file( - &self, + swh: &SendWalHandler, startpos: Lsn, - timeline: TimeLineID, + timeline_id: TimeLineID, + timeline: &Arc, wal_seg_size: usize, buf: &[u8], ) -> Result<()> { @@ -407,16 +445,16 @@ impl ReceiveWalConn { /* Open file */ let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let wal_file_path = self + let wal_file_name = XLogFileName(timeline_id, segno, wal_seg_size); + let wal_file_path = swh .conf .data_dir - .join(self.timeline.get().timelineid.to_string()) + .join(timeline.timelineid.to_string()) .join(wal_file_name.clone()); - let wal_file_partial_path = self + let wal_file_partial_path = swh .conf .data_dir - .join(self.timeline.get().timelineid.to_string()) + .join(timeline.timelineid.to_string()) .join(wal_file_name.clone() + ".partial"); { @@ -454,7 +492,7 @@ impl ReceiveWalConn { wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; // Flush file is not prohibited - if !self.conf.no_sync { + if !swh.conf.no_sync { wal_file.sync_all()?; } } diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index a233c8d53671..907e4868cd76 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -1,5 +1,5 @@ //! This module implements the streaming side of replication protocol, starting -//! with the "START REPLICATION" message. +//! with the "START_REPLICATION" message. use crate::send_wal::SendWalHandler; use crate::timeline::{Timeline, TimelineTools}; diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index e10d9bdf167f..a6be98fc1a26 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -2,6 +2,7 @@ //! pageserver/any other consumer. //! +use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; @@ -45,6 +46,9 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_REPLICATION") { ReplicationConn::new(pgb).run(self, pgb, &query_string)?; Ok(()) + } else if query_string.starts_with(b"START_WAL_PUSH") { + ReceiveWalConn::new(pgb)?.run(self)?; + Ok(()) } else { bail!("Unexpected command {:?}", query_string); } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index f4e05a0bc915..33d619bcb8b2 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -4,11 +4,9 @@ //! use anyhow::Result; use log::*; -use std::io::Read; use std::net::{TcpListener, TcpStream}; use std::thread; -use crate::receive_wal::ReceiveWalConn; use crate::send_wal::SendWalHandler; use crate::WalAcceptorConf; use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; @@ -37,35 +35,15 @@ pub fn thread_main(conf: WalAcceptorConf) -> Result<()> { } } -/// This is run by main_loop, inside a background thread. +/// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket(mut socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { +fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { socket.set_nodelay(true)?; - // Peek at the incoming data to see what protocol is being sent. - let peeked = peek_u32(&mut socket)?; - if peeked == 0 { - // Consume the 4 bytes we peeked at. This protocol begins after them. - socket.read_exact(&mut [0u8; 4])?; - ReceiveWalConn::new(socket, conf)?.run()?; // internal protocol between wal_proposer and wal_acceptor - } else { - let mut conn_handler = SendWalHandler::new(conf); - let pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; - // libpq replication protocol between wal_acceptor and replicas/pagers - pgbackend.run(&mut conn_handler)?; - } - Ok(()) -} + let mut conn_handler = SendWalHandler::new(conf); + let pgbackend = PostgresBackend::new(socket, AuthType::Trust)?; + // libpq replication protocol between wal_acceptor and replicas/pagers + pgbackend.run(&mut conn_handler)?; -/// Fetch the first 4 bytes from the network (big endian), without consuming them. -/// -/// This is used to help determine what protocol the peer is using. -fn peek_u32(stream: &mut TcpStream) -> Result { - let mut buf = [0u8; 4]; - loop { - let num_bytes = stream.peek(&mut buf)?; - if num_bytes == 4 { - return Ok(u32::from_be_bytes(buf)); - } - } + Ok(()) } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 8cde6fa7ce3a..d4de084442dd 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -10,7 +10,7 @@ use log::*; use rand::Rng; use serde::{Deserialize, Serialize}; use std::io::{self, BufReader, Write}; -use std::net::{Shutdown, TcpStream}; +use std::net::{Shutdown, SocketAddr, TcpStream}; use std::str::FromStr; pub trait Handler { @@ -137,6 +137,10 @@ impl PostgresBackend { } } + pub fn get_peer_addr(&self) -> Result { + Ok(self.stream_out.peer_addr()?) + } + pub fn take_stream_in(&mut self) -> Option> { self.stream_in.take() }