Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try to be more intelligent in WalAcceptor.stop #417

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions test_runner/batch_others/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory

pytest_plugins = ("fixtures.zenith_fixtures")

Expand Down Expand Up @@ -61,7 +61,7 @@ def test_many_timelines(zenith_cli, pageserver: ZenithPageserver, postgres: Post
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up
# along the way. 2 of 3 are always alive, so the work keeps going.
def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory):
def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory: WalAcceptorFactory):
fault_probability = 0.01
n_inserts = 1000
n_acceptors = 3
Expand Down
87 changes: 51 additions & 36 deletions test_runner/fixtures/zenith_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import getpass
import os
import pathlib
import uuid
Expand All @@ -7,9 +6,11 @@
import shutil
import signal
import subprocess
import time

from contextlib import closing
from pathlib import Path
from dataclasses import dataclass

# Type-related stuff
from psycopg2.extensions import connection as PgConnection
Expand Down Expand Up @@ -511,26 +512,27 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)


def read_pid(path):
def read_pid(path: Path):
""" Read content of file into number """
return int(Path(path).read_text())
return int(path.read_text())


@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
def __init__(self, wa_binpath, data_dir, port, num, auth_token: Optional[str] = None):
self.wa_binpath = wa_binpath
self.data_dir = data_dir
self.port = port
self.num = num # identifier for logging
self.auth_token = auth_token
bin_path: Path
data_dir: Path
port: int
num: int # identifier for logging
auth_token: Optional[str] = None

def start(self) -> 'WalAcceptor':
# create data directory if not exists
Path(self.data_dir).mkdir(parents=True, exist_ok=True)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.pidfile.unlink(missing_ok=True)

cmd = [self.wa_binpath]
cmd.extend(["-D", self.data_dir])
cmd = [str(self.bin_path)]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["-l", "localhost:{}".format(self.port)])
cmd.append("--daemonize")
cmd.append("--no-sync")
Expand All @@ -541,38 +543,51 @@ def start(self) -> 'WalAcceptor':
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
subprocess.run(cmd, check=True, env=env)

return self
# wait for wal acceptor start by checkking that pid is readable
for _ in range(3):
pid = self.get_pid()
if pid is not None:
return self
time.sleep(0.5)

raise RuntimeError("cannot get wal acceptor pid")

@property
def pidfile(self) -> Path:
return self.data_dir / "wal_acceptor.pid"

def get_pid(self) -> Optional[int]:
if not self.pidfile.exists():
return None

try:
pid = read_pid(self.pidfile)
except ValueError:
return None

return pid

def stop(self) -> 'WalAcceptor':
print('Stopping wal acceptor {}'.format(self.num))
pidfile_path = os.path.join(self.data_dir, "wal_acceptor.pid")
try:
pid = read_pid(pidfile_path)
try:
os.kill(pid, signal.SIGTERM)
except Exception:
pass # pidfile might be obsolete
# TODO: cleanup pid file on exit in wal acceptor
return self
# for _ in range(5):
# print('waiting wal acceptor {} (pid {}) to stop...', self.num, pid)
# try:
# read_pid(pidfile_path)
# except FileNotFoundError:
# return # done
# time.sleep(1)
# raise Exception('Failed to wait for wal acceptor {} shutdown'.format(self.num))
except FileNotFoundError:
pid = self.get_pid()
if pid is None:
print("Wal acceptor {} is not running".format(self.num))
return self

try:
os.kill(pid, signal.SIGTERM)
except Exception:
# TODO: cleanup pid file on exit in wal acceptor
pass # pidfile might be obsolete
return self


class WalAcceptorFactory:
""" An object representing multiple running wal acceptors. """
def __init__(self, zenith_binpath, data_dir):
self.wa_binpath = os.path.join(zenith_binpath, 'wal_acceptor')
def __init__(self, zenith_binpath: Path, data_dir: Path):
self.wa_binpath = zenith_binpath / 'wal_acceptor'
self.data_dir = data_dir
self.instances = []
self.instances: List[WalAcceptor] = []
self.initial_port = 54321

def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor:
Expand All @@ -583,7 +598,7 @@ def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor:
wa_num = len(self.instances)
wa = WalAcceptor(
self.wa_binpath,
os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)),
self.data_dir / "wal_acceptor_{}".format(wa_num),
self.initial_port + wa_num,
wa_num,
auth_token,
Expand Down Expand Up @@ -613,7 +628,7 @@ def get_connstrs(self) -> str:
@zenfixture
def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]:
""" Gives WalAcceptorFactory providing wal acceptors. """
wafactory = WalAcceptorFactory(zenith_binpath, os.path.join(repo_dir, "wal_acceptors"))
wafactory = WalAcceptorFactory(Path(zenith_binpath), Path(repo_dir) / "wal_acceptors")
yield wafactory
# After the yield comes any cleanup code we need.
print('Starting wal acceptors cleanup')
Expand Down