Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use postgres protocol for postgres <-> walkeeper communication #366

Merged
merged 1 commit into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vendor/postgres
1 change: 1 addition & 0 deletions walkeeper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2018"

[dependencies]
regex = "1.4.5"
bincode = "1.3"
bytes = "1.0.1"
byteorder = "1.4.3"
fs2 = "0.4.3"
Expand Down
164 changes: 101 additions & 63 deletions walkeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
//! This implements the Safekeeper protocol.
//! This implements the Safekeeper protocol, picking up immediately after the "START_WAL_PUSH" message
//!
//! FIXME: better description needed here

use anyhow::{bail, Result};
use bincode::config::Options;
use bytes::{Buf, Bytes};
use log::*;
use postgres::{Client, Config, NoTls};
use serde::{Deserialize, Serialize};
use std::cmp::{max, min};
use std::fs::{self, File, OpenOptions};
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
use std::net::{SocketAddr, TcpStream};
use std::io::{Read, Seek, SeekFrom, Write};
use std::net::SocketAddr;
use std::str;
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use zenith_utils::bin_ser::LeSer;
use zenith_utils::bin_ser::{self, le_coder, LeSer};
use zenith_utils::connstring::connection_host_port;
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage, SystemId};
use zenith_utils::zid::{ZTenantId, ZTimelineId};

use crate::replication::HotStandbyFeedback;
use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ};
use zenith_utils::pq_proto::SystemId;

pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 1;
Expand Down Expand Up @@ -134,17 +138,11 @@ struct SafeKeeperResponse {
hs_feedback: HotStandbyFeedback,
}

#[derive(Debug)]
pub struct ReceiveWalConn {
pub timeline: Option<Arc<Timeline>>,
/// Postgres connection, buffered input
pub stream_in: BufReader<TcpStream>,
/// Postgres connection, output
pub stream_out: TcpStream,
/// The cached result of socket.peer_addr()
pub peer_addr: SocketAddr,
/// wal acceptor configuration
pub conf: WalAcceptorConf,
pub struct ReceiveWalConn<'pg> {
/// Postgres connection
pg_backend: &'pg mut PostgresBackend,
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
peer_addr: SocketAddr,
}

///
Expand Down Expand Up @@ -190,37 +188,64 @@ fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId, tenantid: ZT
}
}

impl ReceiveWalConn {
pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result<ReceiveWalConn> {
let peer_addr = socket.peer_addr()?;
let conn = ReceiveWalConn {
timeline: None,
stream_in: BufReader::new(socket.try_clone()?),
stream_out: socket,
impl<'pg> ReceiveWalConn<'pg> {
pub fn new(pg: &'pg mut PostgresBackend) -> Result<ReceiveWalConn<'pg>> {
let peer_addr = pg.get_peer_addr()?;
Ok(ReceiveWalConn {
pg_backend: pg,
peer_addr,
conf,
};
Ok(conn)
})
}

fn read_req<T: LeSer>(&mut self) -> Result<T> {
// As the trait bound implies, this always encodes little-endian.
Ok(T::des_from(&mut self.stream_in)?)
// Read and extract the bytes of a `CopyData` message from the postgres instance
fn read_msg_bytes(&mut self) -> Result<Bytes> {
match self.pg_backend.read_message()? {
Some(FeMessage::CopyData(bytes)) => Ok(bytes),
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
None => bail!("connection closed unexpectedly"),
}
}

// Read the result of a `CopyData` message sent from the postgres instance
//
// As the trait bound implies, this always encodes little-endian.
fn read_msg<T: LeSer>(&mut self) -> Result<T> {
let data = self.read_msg_bytes()?;
// Taken directly from `LeSer::des`:
let value = le_coder()
.reject_trailing_bytes()
.deserialize(&data)
.or(Err(bin_ser::DeserializeError::BadInput))?;
Ok(value)
}

// Writes the value into a `CopyData` message sent to the postgres instance
fn write_msg<T: LeSer>(&mut self, value: &T) -> Result<()> {
let mut buf = Vec::new();
value.ser_into(&mut buf)?;
self.pg_backend.write_message(&BeMessage::CopyData(&buf))?;
Ok(())
}

/// Receive WAL from wal_proposer
pub fn run(&mut self) -> Result<()> {
pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> {
let mut this_timeline: Option<Arc<Timeline>> = None;

// Notify the libpq client that it's allowed to send `CopyData` messages
self.pg_backend
.write_message(&BeMessage::CopyBothResponse)?;

// Receive information about server
let server_info = self.read_req::<ServerInfo>()?;
let server_info = self.read_msg::<ServerInfo>()?;
info!(
"Start handshake with wal_proposer {} sysid {} timeline {} tenant {}",
self.peer_addr, server_info.system_id, server_info.timeline_id, server_info.tenant_id,
);
// FIXME: also check that the system identifier matches
self.timeline.set(server_info.timeline_id)?;
self.timeline.get().load_control_file(&self.conf)?;
this_timeline.set(server_info.timeline_id)?;
this_timeline.get().load_control_file(&swh.conf)?;

let mut my_info = self.timeline.get().get_info();
let mut my_info = this_timeline.get().get_info();

/* Check protocol compatibility */
if server_info.protocol_version != SK_PROTOCOL_VERSION {
Expand All @@ -246,46 +271,46 @@ impl ReceiveWalConn {
my_info.server.node_id = node_id;

/* Calculate WAL end based on local data */
let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true);
let (flush_lsn, timeline_id) = this_timeline.find_end_of_wal(&swh.conf.data_dir, true);
my_info.flush_lsn = flush_lsn;
my_info.server.timeline = timeline;
my_info.server.timeline = timeline_id;

info!(
"find_end_of_wal in {:?}: timeline={} flush_lsn={}",
&self.conf.data_dir, timeline, flush_lsn
&swh.conf.data_dir, timeline_id, flush_lsn
);

/* Report my identifier to proposer */
my_info.ser_into(&mut self.stream_out)?;
self.write_msg(&my_info)?;

/* Wait for vote request */
let prop = self.read_req::<RequestVote>()?;
let prop = self.read_msg::<RequestVote>()?;
/* This is Paxos check which should ensure that only one master can perform commits */
if prop.node_id < my_info.server.node_id {
/* Send my node-id to inform proposer that it's candidate was rejected */
my_info.server.node_id.ser_into(&mut self.stream_out)?;
self.write_msg(&my_info.server.node_id)?;
bail!(
"Reject connection attempt with term {} because my term is {}",
prop.node_id.term,
my_info.server.node_id.term,
);
}
my_info.server.node_id = prop.node_id;
self.timeline.get().set_info(&my_info);
this_timeline.get().set_info(&my_info);
/* Need to persist our vote first */
self.timeline.get().save_control_file(true)?;
this_timeline.get().save_control_file(true)?;

let mut flushed_restart_lsn = Lsn(0);
let wal_seg_size = server_info.wal_seg_size as usize;

/* Acknowledge the proposed candidate by returning it to the proposer */
prop.node_id.ser_into(&mut self.stream_out)?;
self.write_msg(&prop.node_id)?;

if self.conf.pageserver_addr.is_some() {
if swh.conf.pageserver_addr.is_some() {
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
let conf = self.conf.clone();
let timelineid = self.timeline.get().timelineid;
let conf = swh.conf.clone();
let timelineid = this_timeline.get().timelineid;
let tenantid = server_info.tenant_id;
thread::spawn(move || {
request_callback(conf, timelineid, tenantid);
Expand All @@ -302,7 +327,10 @@ impl ReceiveWalConn {
let mut sync_control_file = false;

/* Receive message header */
let req = self.read_req::<SafeKeeperRequest>()?;
let msg_bytes = self.read_msg_bytes()?;
let mut msg_reader = msg_bytes.reader();

let req = SafeKeeperRequest::des_from(&mut msg_reader)?;
if req.sender_id != my_info.server.node_id {
bail!("Sender NodeId is changed");
}
Expand All @@ -320,12 +348,20 @@ impl ReceiveWalConn {
rec_size, start_pos, end_pos,
);

/* Receive message body */
let mut inbuf = vec![0u8; rec_size];
self.stream_in.read_exact(&mut inbuf)?;
/* Receive message body (from the rest of the message) */
let mut buf = Vec::with_capacity(rec_size);
msg_reader.read_to_end(&mut buf)?;
assert_eq!(buf.len(), rec_size);

/* Save message in file */
self.write_wal_file(start_pos, timeline, wal_seg_size, &inbuf)?;
Self::write_wal_file(
swh,
start_pos,
timeline_id,
this_timeline.get(),
wal_seg_size,
&buf,
)?;

my_info.restart_lsn = req.restart_lsn;
my_info.commit_lsn = req.commit_lsn;
Expand All @@ -350,7 +386,7 @@ impl ReceiveWalConn {
* when restart_lsn delta exceeds WAL segment size.
*/
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
self.timeline.get().save_control_file(sync_control_file)?;
this_timeline.get().save_control_file(sync_control_file)?;

if sync_control_file {
flushed_restart_lsn = my_info.restart_lsn;
Expand All @@ -361,25 +397,27 @@ impl ReceiveWalConn {
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: end_pos,
hs_feedback: self.timeline.get().get_hs_feedback(),
hs_feedback: this_timeline.get().get_hs_feedback(),
};
resp.ser_into(&mut self.stream_out)?;
self.write_msg(&resp)?;

/*
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
self.timeline
this_timeline
.get()
.notify_wal_senders(min(req.commit_lsn, end_pos));
}

Ok(())
}

fn write_wal_file(
&self,
swh: &SendWalHandler,
startpos: Lsn,
timeline: TimeLineID,
timeline_id: TimeLineID,
timeline: &Arc<Timeline>,
wal_seg_size: usize,
buf: &[u8],
) -> Result<()> {
Expand Down Expand Up @@ -407,16 +445,16 @@ impl ReceiveWalConn {

/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self
let wal_file_name = XLogFileName(timeline_id, segno, wal_seg_size);
let wal_file_path = swh
.conf
.data_dir
.join(self.timeline.get().timelineid.to_string())
.join(timeline.timelineid.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
let wal_file_partial_path = swh
.conf
.data_dir
.join(self.timeline.get().timelineid.to_string())
.join(timeline.timelineid.to_string())
.join(wal_file_name.clone() + ".partial");

{
Expand Down Expand Up @@ -454,7 +492,7 @@ impl ReceiveWalConn {
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;

// Flush file is not prohibited
if !self.conf.no_sync {
if !swh.conf.no_sync {
wal_file.sync_all()?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion walkeeper/src/replication.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! This module implements the streaming side of replication protocol, starting
//! with the "START REPLICATION" message.
//! with the "START_REPLICATION" message.

use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools};
Expand Down
4 changes: 4 additions & 0 deletions walkeeper/src/send_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! pageserver/any other consumer.
//!

use crate::receive_wal::ReceiveWalConn;
use crate::replication::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
Expand Down Expand Up @@ -45,6 +46,9 @@ impl postgres_backend::Handler for SendWalHandler {
} else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
Ok(())
} else if query_string.starts_with(b"START_WAL_PUSH") {
ReceiveWalConn::new(pgb)?.run(self)?;
Ok(())
} else {
bail!("Unexpected command {:?}", query_string);
}
Expand Down
Loading