diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2 deleted file mode 100644 index 0de2c6143b5a..000000000000 --- a/docker/conf-workers/supervisord.conf.j2 +++ /dev/null @@ -1,41 +0,0 @@ -# This file contains the base config for supervisord, as part of ../Dockerfile-workers. -# configure_workers_and_start.py uses and amends to this file depending on the workers -# that have been selected. -[supervisord] -nodaemon=true -user=root - -[program:nginx] -command=/usr/sbin/nginx -g "daemon off;" -priority=500 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 -username=www-data -autorestart=true - -[program:redis] -command=/usr/bin/redis-server /etc/redis/redis.conf --daemonize no -priority=1 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 -username=redis -autorestart=true - -[program:synapse_main] -command=/usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml -priority=10 -# Log startup failures to supervisord's stdout/err -# Regular synapse logs will still go in the configured data directory -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 -autorestart=unexpected -exitcodes=0 - -# Additional process blocks -{{ worker_config }} \ No newline at end of file diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 1d22a4d5719d..2814783f49c4 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -26,9 +26,16 @@ # in the project's README), this script may be run multiple times, and functionality should # continue to work if so. +import json import os +import random +import socket +import string import subprocess import sys +from time import sleep +from typing import Dict, List, Optional +from urllib.request import urlopen import jinja2 import yaml @@ -168,21 +175,6 @@ } # Templates for sections that may be inserted multiple times in config files -SUPERVISORD_PROCESS_CONFIG_BLOCK = """ -[program:synapse_{name}] -command=/usr/local/bin/python -m {app} \ - --config-path="{config_path}" \ - --config-path=/conf/workers/shared.yaml \ - --config-path=/conf/workers/{name}.yaml -autorestart=unexpected -priority=500 -exitcodes=0 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 -""" - NGINX_LOCATION_CONFIG_BLOCK = """ location ~* {endpoint} {{ proxy_pass {upstream}; @@ -206,7 +198,7 @@ def log(txt: str): Args: txt: The text to log. """ - print(txt) + print(txt, file=sys.stderr) def error(txt: str): @@ -301,7 +293,54 @@ def generate_base_homeserver_config(): subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"]) -def generate_worker_files(environ, config_path: str, data_dir: str): +def get_worker_configs(environ, config_path: str) -> Dict[str, List[dict]]: + # Read the desired worker configuration from the environment + worker_types = environ.get("SYNAPSE_WORKER_TYPES") + if worker_types is None: + # No workers, just the main process + return {} + + # The resulting configurations. + worker_configs = {} + + # Start worker ports from this arbitrary port + worker_port = 18009 + + # A counter of worker_type -> int. Used for determining the name for a given + # worker type when generating its config file, as each worker's name is just + # worker_type + instance # + worker_type_counter = {} + + for worker_type in worker_types.split(","): + worker_type = worker_type.strip() + + worker_config = WORKERS_CONFIG.get(worker_type) + if worker_config: + worker_config = worker_config.copy() + else: + log(worker_type + " is an unknown worker type! It will be ignored") + continue + + new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 + worker_type_counter[worker_type] = new_worker_count + + # Name workers by their type concatenated with an incrementing number + # e.g. federation_reader1 + worker_name = worker_type + str(new_worker_count) + worker_config.update( + {"name": worker_name, "port": worker_port, "config_path": config_path} + ) + + worker_configs.setdefault(worker_type, []).append(worker_config) + + worker_port += 1 + + return worker_configs + + +def generate_worker_files( + environ, config_path: str, worker_configs: Dict[str, List[dict]], data_dir: str +) -> None: """Read the desired list of workers from environment variables and generate shared homeserver, nginx and supervisord configs. @@ -366,103 +405,69 @@ def generate_worker_files(environ, config_path: str, data_dir: str): # spun up. To be placed in /etc/nginx/conf.d. nginx_locations = {} - # Read the desired worker configuration from the environment - worker_types = environ.get("SYNAPSE_WORKER_TYPES") - if worker_types is None: - # No workers, just the main process - worker_types = [] - else: - # Split type names by comma - worker_types = worker_types.split(",") - # Create the worker configuration directory if it doesn't already exist os.makedirs("/conf/workers", exist_ok=True) - # Start worker ports from this arbitrary port - worker_port = 18009 - - # A counter of worker_type -> int. Used for determining the name for a given - # worker type when generating its config file, as each worker's name is just - # worker_type + instance # - worker_type_counter = {} - # For each worker type specified by the user, create config values - for worker_type in worker_types: - worker_type = worker_type.strip() + for worker_type, worker_configs in worker_configs.items(): + worker_type_total_count = len(worker_configs) - worker_config = WORKERS_CONFIG.get(worker_type) - if worker_config: - worker_config = worker_config.copy() - else: - log(worker_type + " is an unknown worker type! It will be ignored") - continue - - new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 - worker_type_counter[worker_type] = new_worker_count - - # Name workers by their type concatenated with an incrementing number - # e.g. federation_reader1 - worker_name = worker_type + str(new_worker_count) - worker_config.update( - {"name": worker_name, "port": worker_port, "config_path": config_path} - ) + for worker_config in worker_configs: + worker_name = worker_config["name"] + worker_port = worker_config["port"] - # Update the shared config with any worker-type specific options - shared_config.update(worker_config["shared_extra_conf"]) + # Update the shared config with any worker-type specific options + shared_config.update(worker_config["shared_extra_conf"]) - # Check if more than one instance of this worker type has been specified - worker_type_total_count = worker_types.count(worker_type) - if worker_type_total_count > 1: - # Update the shared config with sharding-related options if necessary - add_sharding_to_shared_config( - shared_config, worker_type, worker_name, worker_port - ) - - # Enable the worker in supervisord - supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config) - - # Add nginx location blocks for this worker's endpoints (if any are defined) - for pattern in worker_config["endpoint_patterns"]: - # Determine whether we need to load-balance this worker + # Check if more than one instance of this worker type has been specified if worker_type_total_count > 1: - # Create or add to a load-balanced upstream for this worker - nginx_upstreams.setdefault(worker_type, set()).add(worker_port) - - # Upstreams are named after the worker_type - upstream = "http://" + worker_type - else: - upstream = "http://localhost:%d" % (worker_port,) - - # Note that this endpoint should proxy to this upstream - nginx_locations[pattern] = upstream - - # Write out the worker's logging config file - - # Check whether we should write worker logs to disk, in addition to the console - extra_log_template_args = {} - if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"): - extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format( - dir=data_dir, name=worker_name + # Update the shared config with sharding-related options if necessary + add_sharding_to_shared_config( + shared_config, worker_type, worker_name, worker_port + ) + + # Add nginx location blocks for this worker's endpoints (if any are defined) + for pattern in worker_config["endpoint_patterns"]: + # Determine whether we need to load-balance this worker + if worker_type_total_count > 1: + # Create or add to a load-balanced upstream for this worker + nginx_upstreams.setdefault(worker_type, set()).add(worker_port) + + # Upstreams are named after the worker_type + upstream = "http://" + worker_type + else: + upstream = "http://localhost:%d" % (worker_port,) + + # Note that this endpoint should proxy to this upstream + nginx_locations[pattern] = upstream + + # Write out the worker's logging config file + + # Check whether we should write worker logs to disk, in addition to the console + extra_log_template_args = {} + if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"): + extra_log_template_args[ + "LOG_FILE_PATH" + ] = "{dir}/logs/{name}.log".format(dir=data_dir, name=worker_name) + + # Render and write the file + log_config_filepath = "/conf/workers/{name}.log.config".format( + name=worker_name + ) + convert( + "/conf/log.config", + log_config_filepath, + worker_name=worker_name, + **extra_log_template_args, ) - # Render and write the file - log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name) - convert( - "/conf/log.config", - log_config_filepath, - worker_name=worker_name, - **extra_log_template_args, - ) - - # Then a worker config file - convert( - "/conf/worker.yaml.j2", - "/conf/workers/{name}.yaml".format(name=worker_name), - **worker_config, - worker_log_config_filepath=log_config_filepath, - ) - - worker_port += 1 + # Then a worker config file + convert( + "/conf/worker.yaml.j2", + "/conf/workers/{name}.yaml".format(name=worker_name), + **worker_config, + worker_log_config_filepath=log_config_filepath, + ) # Build the nginx location config blocks nginx_location_config = "" @@ -502,26 +507,67 @@ def generate_worker_files(environ, config_path: str, data_dir: str): upstream_directives=nginx_upstream_config, ) - # Supervisord config - convert( - "/conf/supervisord.conf.j2", - "/etc/supervisor/conf.d/supervisord.conf", - main_config_path=config_path, - worker_config=supervisord_config, - ) - # Ensure the logging directory exists log_dir = data_dir + "/logs" if not os.path.exists(log_dir): os.mkdir(log_dir) -def start_supervisord(): +def start_process( + args: List[str], env: Optional[Dict[str, str]] = None +) -> subprocess.Popen: """Starts up supervisord which then starts and monitors all other necessary processes Raises: CalledProcessError if calling start.py return a non-zero exit code. """ - subprocess.run(["/usr/bin/supervisord"], stdin=subprocess.PIPE) + proc_name = args[0].rsplit("/", 1)[1] + log(f"Starting {proc_name}") + + return subprocess.Popen( + args, stdout=None, stderr=subprocess.STDOUT, env=env + ) + + +def start_process_and_await_notify(args: List[str]): + """ + This method binds a listener to a newly created unix socket and waits for a + `READY=1` to be received. The socket address is added to the `env` map passed + in under `NOTIFY_SOCKET`. + + This is basically a noddy implementation of the `sd_notify` mechanism. + """ + + # Create a random abstract socket name. Abstract sockets start with a null + # byte. + random_id = "".join(random.choice(string.ascii_uppercase) for _ in range(20)) + path = f"\0sytest-{random_id}.sock" + + # We replace null byte with '@' to allow us to pass it in via env. (This is + # as per the sd_notify spec). + path_env = path.replace("\0", "@") + env = { + "NOTIFY_SOCKET": path_env, + } + + # We need to set this up *before* we start the process as we need to bind the + # socket before starting the process. + with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock: + sock.bind(path) + + start_process(args, env) + + # Listen until we receive a `READY=1` notification. + for _ in range(10): + # Payloads are newline separated list of variable assignments. + dgram = sock.recv(1024) + + for line in dgram.split(b"\n"): + if line == b"READY=1": + return + + sleep(0.01) + else: + raise RuntimeError("Process died without becoming ready") def main(args, environ): @@ -538,20 +584,67 @@ def main(args, environ): log("Generating base homeserver config") generate_base_homeserver_config() + worker_configs_by_type = get_worker_configs(environ, config_path) + # This script may be run multiple times (mostly by Complement, see note at top of file). # Don't re-configure workers in this instance. mark_filepath = "/conf/workers_have_been_configured" if not os.path.exists(mark_filepath): # Always regenerate all other config files - generate_worker_files(environ, config_path, data_dir) + generate_worker_files(environ, config_path, worker_configs_by_type, data_dir) # Mark workers as being configured with open(mark_filepath, "w") as f: f.write("") - # Start supervisord, which will start Synapse, all of the configured worker - # processes, redis, nginx etc. according to the config we created above. - start_supervisord() + # Start the processes, this is done in tiers to ensure successful startup: + # + # 0. Caddy & Postgres (these are started by the OS already). + # 1. Redis + # 2. Synapse main + # 3. Synapse workers + # 4. nginx + # + # Once nginx starts than the image will start to respond to HTTP requests + # and must be started and configured. + # start_process() + start_process( + ["/usr/bin/redis-server", "/etc/redis/redis.conf", "--daemonize", "no"] + ) + + start_process_and_await_notify( + [ + "/usr/local/bin/python", + "-m", + "synapse.app.homeserver", + f"--config-path={config_path}", + "--config-path=/conf/workers/shared.yaml", + ] + ) + + # TODO These could be started in parallel. + for worker_configs in worker_configs_by_type.values(): + for worker_config in worker_configs: + name = worker_config["name"] + app = worker_config["app"] + + start_process_and_await_notify( + [ + "/usr/local/bin/python", + "-m", + app, + f"--config-path={config_path}", + "--config-path=/conf/workers/shared.yaml", + f"--config-path=/conf/workers/{name}.yaml", + ] + ) + + start_process(["/usr/sbin/nginx", "-g", "daemon off;"]) + + # TODO Should we monitor processes and restart? + with urlopen('http://localhost:8080/_matrix/client/versions') as f: + data = json.load(f) + print(data) if __name__ == "__main__": diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 82b31d24f1fa..4534fd57d3de 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -104,7 +104,7 @@ def start_doing_background_updates(self) -> None: run_as_background_process("background_updates", self.run_background_updates) async def run_background_updates(self, sleep: bool = True) -> None: - logger.info("Starting background schema updates") + logger.debug("Starting background schema updates") while True: if sleep: await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) @@ -117,7 +117,7 @@ async def run_background_updates(self, sleep: bool = True) -> None: logger.exception("Error doing update") else: if result: - logger.info( + logger.debug( "No more background updates to do." " Unscheduling background update task." ) @@ -207,7 +207,7 @@ def get_background_updates_txn(txn): depends_on = upd["depends_on"] if not depends_on or depends_on not in pending: break - logger.info( + logger.debug( "Not starting on bg update %s until %s is done", upd["update_name"], depends_on, @@ -227,7 +227,7 @@ def get_background_updates_txn(txn): async def _do_background_update(self, desired_duration_ms: float) -> int: assert self._current_background_update is not None update_name = self._current_background_update - logger.info("Starting update batch on background update '%s'", update_name) + logger.debug("Starting update batch on background update '%s'", update_name) update_handler = self._background_update_handlers[update_name] @@ -263,7 +263,7 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: duration_ms = time_stop - time_start - logger.info( + logger.debug( "Running background update %r. Processed %r items in %rms." " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)", update_name, @@ -417,7 +417,7 @@ def create_index_sqlite(conn: Connection) -> None: async def updater(progress, batch_size): if runner is not None: - logger.info("Adding index %s to %s", index_name, table) + logger.debug("Adding index %s to %s", index_name, table) await self.db_pool.runWithConnection(runner) await self._end_background_update(update_name) return 1 diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 61392b9639c3..85eeb1394693 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -404,7 +404,7 @@ def _upgrade_existing_database( for v in range(start_ver, SCHEMA_VERSION + 1): if not is_worker: - logger.info("Applying schema deltas for v%d", v) + logger.debug("Applying schema deltas for v%d", v) cur.execute("DELETE FROM schema_version") cur.execute( @@ -412,7 +412,7 @@ def _upgrade_existing_database( (v, True), ) else: - logger.info("Checking schema deltas for v%d", v) + logger.debug("Checking schema deltas for v%d", v) # We need to search both the global and per data store schema # directories for schema updates. @@ -505,7 +505,7 @@ def _upgrade_existing_database( raise PrepareDatabaseException( UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path ) - logger.info("Applying schema %s", relative_path) + logger.debug("Applying schema %s", relative_path) executescript(cur, absolute_path) elif ext == specific_engine_extension and root_name.endswith(".sql"): # A .sql file specific to our engine; just read and execute it