Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring and update multiplier in real time #3948

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions backend/ee/onyx/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,44 @@

from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT
from onyx.background.celery.tasks.beat_schedule import (
cloud_tasks_to_schedule as base_cloud_tasks_to_schedule,
beat_system_tasks as base_beat_system_tasks,
)
from onyx.background.celery.tasks.beat_schedule import (
tasks_to_schedule as base_tasks_to_schedule,
beat_task_templates as base_beat_task_templates,
)
from onyx.background.celery.tasks.beat_schedule import generate_cloud_tasks
from onyx.background.celery.tasks.beat_schedule import (
get_tasks_to_schedule as base_get_tasks_to_schedule,
)
from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from shared_configs.configs import MULTI_TENANT

ee_cloud_tasks_to_schedule = [
{
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_autogenerate-usage-report",
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
"schedule": timedelta(days=30),
"options": {
"priority": OnyxCeleryPriority.HIGHEST,
"expires": BEAT_EXPIRES_DEFAULT,
},
"kwargs": {
"task_name": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK,
},
},
{
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-ttl-management",
"task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
"schedule": timedelta(hours=1),
"options": {
"priority": OnyxCeleryPriority.HIGHEST,
"expires": BEAT_EXPIRES_DEFAULT,
ee_beat_system_tasks: list[dict] = []

ee_beat_task_templates: list[dict] = []
ee_beat_task_templates.extend(
[
{
"name": "autogenerate-usage-report",
"task": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK,
"schedule": timedelta(days=30),
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
"kwargs": {
"task_name": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK,
{
"name": "check-ttl-management",
"task": OnyxCeleryTask.CHECK_TTL_MANAGEMENT_TASK,
"schedule": timedelta(hours=1),
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
},
]
]
)

ee_tasks_to_schedule: list[dict] = []

Expand All @@ -65,9 +67,14 @@
]


def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]:
return ee_cloud_tasks_to_schedule + base_cloud_tasks_to_schedule
def get_cloud_tasks_to_schedule(beat_multiplier: float) -> list[dict[str, Any]]:
beat_system_tasks = ee_beat_system_tasks + base_beat_system_tasks
beat_task_templates = ee_beat_task_templates + base_beat_task_templates
cloud_tasks = generate_cloud_tasks(
beat_system_tasks, beat_task_templates, beat_multiplier
)
return cloud_tasks


def get_tasks_to_schedule() -> list[dict[str, Any]]:
return ee_tasks_to_schedule + base_tasks_to_schedule
return ee_tasks_to_schedule + base_get_tasks_to_schedule()
138 changes: 88 additions & 50 deletions backend/onyx/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,56 @@
from datetime import timedelta
from typing import Any
from typing import cast

from celery import Celery
from celery import signals
from celery.beat import PersistentScheduler # type: ignore
from celery.signals import beat_init
from celery.utils.log import get_task_logger

import onyx.background.celery.apps.app_base as app_base
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import POSTGRES_CELERY_BEAT_APP_NAME
from onyx.db.engine import get_all_tenant_ids
from onyx.db.engine import SqlEngine
from onyx.utils.logger import setup_logger
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.utils.variable_functionality import fetch_versioned_implementation
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
from shared_configs.configs import MULTI_TENANT

logger = setup_logger(__name__)
task_logger = get_task_logger(__name__)

celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.beat")


class DynamicTenantScheduler(PersistentScheduler):
"""This scheduler is useful because we can dynamically adjust task generation rates
through it."""

RELOAD_INTERVAL = 60

def __init__(self, *args: Any, **kwargs: Any) -> None:
logger.info("Initializing DynamicTenantScheduler")
super().__init__(*args, **kwargs)
self._reload_interval = timedelta(minutes=2)

self.last_beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

self._reload_interval = timedelta(
seconds=DynamicTenantScheduler.RELOAD_INTERVAL
)
self._last_reload = self.app.now() - self._reload_interval

# Let the parent class handle store initialization
self.setup_schedule()
self._try_updating_schedule()
logger.info(f"Set reload interval to {self._reload_interval}")
task_logger.info(
f"DynamicTenantScheduler initialized: reload_interval={self._reload_interval}"
)

def setup_schedule(self) -> None:
logger.info("Setting up initial schedule")
super().setup_schedule()
logger.info("Initial schedule setup complete")

def tick(self) -> float:
retval = super().tick()
Expand All @@ -44,36 +59,35 @@ def tick(self) -> float:
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
logger.info("Reload interval reached, initiating task update")
task_logger.debug("Reload interval reached, initiating task update")
try:
self._try_updating_schedule()
except (AttributeError, KeyError) as e:
logger.exception(f"Failed to process task configuration: {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error updating tasks: {str(e)}")
except (AttributeError, KeyError):
task_logger.exception("Failed to process task configuration")
except Exception:
task_logger.exception("Unexpected error updating tasks")

self._last_reload = now
logger.info("Task update completed, reset reload timer")

return retval

def _generate_schedule(
self, tenant_ids: list[str] | list[None]
self, tenant_ids: list[str] | list[None], beat_multiplier: float
) -> dict[str, dict[str, Any]]:
"""Given a list of tenant id's, generates a new beat schedule for celery."""
logger.info("Fetching tasks to schedule")

new_schedule: dict[str, dict[str, Any]] = {}

if MULTI_TENANT:
# cloud tasks only need the single task beat across all tenants
# cloud tasks are system wide and thus only need to be on the beat schedule
# once for all tenants
get_cloud_tasks_to_schedule = fetch_versioned_implementation(
"onyx.background.celery.tasks.beat_schedule",
"get_cloud_tasks_to_schedule",
)

cloud_tasks_to_schedule: list[
dict[str, Any]
] = get_cloud_tasks_to_schedule()
cloud_tasks_to_schedule: list[dict[str, Any]] = get_cloud_tasks_to_schedule(
beat_multiplier
)
for task in cloud_tasks_to_schedule:
task_name = task["name"]
cloud_task = {
Expand All @@ -82,11 +96,14 @@ def _generate_schedule(
"kwargs": task.get("kwargs", {}),
}
if options := task.get("options"):
logger.debug(f"Adding options to task {task_name}: {options}")
task_logger.debug(f"Adding options to task {task_name}: {options}")
cloud_task["options"] = options
new_schedule[task_name] = cloud_task

# regular task beats are multiplied across all tenants
# note that currently this just schedules for a single tenant in self hosted
# and doesn't do anything in the cloud because it's much more scalable
# to schedule a single cloud beat task to dispatch per tenant tasks.
get_tasks_to_schedule = fetch_versioned_implementation(
"onyx.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
)
Expand All @@ -95,7 +112,7 @@ def _generate_schedule(

for tenant_id in tenant_ids:
if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST:
logger.info(
task_logger.debug(
f"Skipping tenant {tenant_id} as it is in the ignored syncing list"
)
continue
Expand All @@ -104,14 +121,14 @@ def _generate_schedule(
task_name = task["name"]
tenant_task_name = f"{task['name']}-{tenant_id}"

logger.debug(f"Creating task configuration for {tenant_task_name}")
task_logger.debug(f"Creating task configuration for {tenant_task_name}")
tenant_task = {
"task": task["task"],
"schedule": task["schedule"],
"kwargs": {"tenant_id": tenant_id},
}
if options := task.get("options"):
logger.debug(
task_logger.debug(
f"Adding options to task {tenant_task_name}: {options}"
)
tenant_task["options"] = options
Expand All @@ -121,44 +138,57 @@ def _generate_schedule(

def _try_updating_schedule(self) -> None:
"""Only updates the actual beat schedule on the celery app when it changes"""
do_update = False

logger.info("_try_updating_schedule starting")
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)

task_logger.debug("_try_updating_schedule starting")

tenant_ids = get_all_tenant_ids()
logger.info(f"Found {len(tenant_ids)} IDs")
task_logger.debug(f"Found {len(tenant_ids)} IDs")

# get current schedule and extract current tenants
current_schedule = self.schedule.items()

# there are no more per tenant beat tasks, so comment this out
# NOTE: we may not actualy need this scheduler any more and should
# test reverting to a regular beat schedule implementation
# get potential new state
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT
beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier")
if beat_multiplier_raw is not None:
try:
beat_multiplier_bytes = cast(bytes, beat_multiplier_raw)
beat_multiplier = float(beat_multiplier_bytes.decode())
except ValueError:
task_logger.error(
f"Invalid beat_multiplier value: {beat_multiplier_raw}"
)

# current_tenants = set()
# for task_name, _ in current_schedule:
# task_name = cast(str, task_name)
# if task_name.startswith(ONYX_CLOUD_CELERY_TASK_PREFIX):
# continue
new_schedule = self._generate_schedule(tenant_ids, beat_multiplier)

# if "_" in task_name:
# # example: "check-for-condition-tenant_12345678-abcd-efgh-ijkl-12345678"
# # -> "12345678-abcd-efgh-ijkl-12345678"
# current_tenants.add(task_name.split("_")[-1])
# logger.info(f"Found {len(current_tenants)} existing items in schedule")
# if the schedule or beat multiplier has changed, update
while True:
if beat_multiplier != self.last_beat_multiplier:
do_update = True
break

# for tenant_id in tenant_ids:
# if tenant_id not in current_tenants:
# logger.info(f"Processing new tenant: {tenant_id}")
if not DynamicTenantScheduler._compare_schedules(
current_schedule, new_schedule
):
do_update = True
break

new_schedule = self._generate_schedule(tenant_ids)
break

if DynamicTenantScheduler._compare_schedules(current_schedule, new_schedule):
logger.info(
"_try_updating_schedule: Current schedule is up to date, no changes needed"
if not do_update:
# exit early if nothing changed
task_logger.info(
f"_try_updating_schedule - Schedule unchanged: "
f"tasks={len(new_schedule)} "
f"beat_multiplier={beat_multiplier}"
)
return

logger.info(
# schedule needs updating
task_logger.debug(
"Schedule update required",
extra={
"new_tasks": len(new_schedule),
Expand All @@ -185,11 +215,19 @@ def _try_updating_schedule(self) -> None:
# Ensure changes are persisted
self.sync()

logger.info("_try_updating_schedule: Schedule updated successfully")
task_logger.info(
f"_try_updating_schedule - Schedule updated: "
f"prev_num_tasks={len(current_schedule)} "
f"prev_beat_multiplier={self.last_beat_multiplier} "
f"tasks={len(new_schedule)} "
f"beat_multiplier={beat_multiplier}"
)

self.last_beat_multiplier = beat_multiplier

@staticmethod
def _compare_schedules(schedule1: dict, schedule2: dict) -> bool:
"""Compare schedules to determine if an update is needed.
"""Compare schedules by task name only to determine if an update is needed.
True if equivalent, False if not."""
current_tasks = set(name for name, _ in schedule1)
new_tasks = set(schedule2.keys())
Expand All @@ -201,7 +239,7 @@ def _compare_schedules(schedule1: dict, schedule2: dict) -> bool:

@beat_init.connect
def on_beat_init(sender: Any, **kwargs: Any) -> None:
logger.info("beat_init signal received.")
task_logger.info("beat_init signal received.")

# Celery beat shouldn't touch the db at all. But just setting a low minimum here.
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
Expand Down
Loading
Loading