Skip to content
This repository was archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
refs #33: Improve agent termination discovery and handling
Browse files Browse the repository at this point in the history
 * No longer repeated "agent-lost" logs (only once!)
 * Moved agent-specific logic in gateway/events.py to gateway/kernel.py
 * Synchronize correctly when updating/reading the "last seen" dictionary shared by all worker processes
  • Loading branch information
achimnol committed Aug 29, 2017
1 parent b0bc60c commit f7e51a3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 43 deletions.
21 changes: 2 additions & 19 deletions sorna/gateway/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,21 @@ def __init__(self, app, loop=None):
self.app = app
self.loop = loop if loop else asyncio.get_event_loop()
self.handlers = defaultdict(list)
self.heartbeat_timeout = app.config.heartbeat_timeout
# TODO: optimize?
self.last_seen = app['shared_states']['agent_last_seen']

def add_handler(self, event_name, callback):
assert callable(callback)
self.handlers[event_name].append(callback)

def dispatch(self, event_name, agent_id, args=tuple()):
log.debug(f"DISPATCH({event_name}/{agent_id}, {str(args[0]) if args else ''})")
self.last_seen[agent_id] = time.monotonic()
first_arg = f', {args[0]}' if args else ''
log.debug(f"DISPATCH({event_name}/{agent_id}{first_arg})")
for handler in self.handlers[event_name]:
if asyncio.iscoroutine(handler) or asyncio.iscoroutinefunction(handler):
self.loop.create_task(handler(self.app, agent_id, *args))
else:
cb = functools.partial(handler, self.app, agent_id, *args)
self.loop.call_soon(cb)

@catch_unexpected(log)
async def check_lost(self, interval):
try:
now = time.monotonic()
for agent_id, prev in self.last_seen.copy().items():
if now - prev >= self.heartbeat_timeout:
del self.last_seen[agent_id]
self.dispatch('instance_terminated', agent_id, ('agent-lost', ))
except (BrokenPipeError, asyncio.CancelledError):
pass


async def event_subscriber(dispatcher):
event_sock = await aiozmq.create_zmq_stream(
Expand All @@ -93,11 +79,8 @@ async def init(app):
dispatcher = EventDispatcher(app)
app['event_dispatcher'] = dispatcher
app['event_subscriber'] = loop.create_task(event_subscriber(dispatcher))
app['agent_lost_checker'] = aiotools.create_timer(dispatcher.check_lost, 1.0)


async def shutdown(app):
app['event_subscriber'].cancel()
await app['event_subscriber']
app['agent_lost_checker'].cancel()
await app['agent_lost_checker']
47 changes: 31 additions & 16 deletions sorna/gateway/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import aiohttp
from aiohttp import web
import aiotools
import aiozmq
from aiozmq import create_zmq_stream as aiozmq_sock
from async_timeout import timeout as _timeout
Expand Down Expand Up @@ -167,20 +168,13 @@ async def instance_started(app, agent_id):

@catch_unexpected(log)
async def instance_terminated(app, agent_id, reason):
with app['shared_states'].lock:
try:
del app['shared_states'].agent_last_seen[agent_id]
except KeyError:
pass
if reason == 'agent-lost':
log.warning(f'agent {agent_id} heartbeat timeout detected.')
await app['registry'].update_instance(agent_id, {
'status': AgentStatus.LOST,
'lost_at': datetime.now(tzutc()),
})
await update_instance_usage(app, agent_id)
# TODO: interpret kern_id to sess_id
#for kern_id in (await app['registry'].get_kernels_in_instance(agent_id)):
# for handler in app['stream_pty_handlers'][kern_id].copy():
# handler.cancel()
# await handler
# TODO: define behavior when agent reuse running instances upon revive
#await app['registry'].forget_all_kernels_in_instance(agent_id)
await app['registry'].mark_agent_terminated(agent_id, AgentStatus.LOST)
elif reason == 'agent-restart':
log.info(f'agent@{agent_id} restarting for maintenance.')
await app['registry'].update_instance(agent_id, {
Expand All @@ -189,16 +183,31 @@ async def instance_terminated(app, agent_id, reason):
else:
# On normal instance termination, kernel_terminated events were already
# triggered by the agent.
await app['registry'].update_instance(agent_id, {
'status': AgentStatus.TERMINATED,
})
await app['registry'].mark_agent_terminated(agent_id, AgentStatus.TERMINATED)


@catch_unexpected(log)
async def instance_heartbeat(app, agent_id, agent_info):
with app['shared_states'].lock:
app['shared_states'].agent_last_seen[agent_id] = time.monotonic()
await app['registry'].handle_heartbeat(agent_id, agent_info)


@catch_unexpected(log)
async def check_agent_lost(app, interval):
try:
now = time.monotonic()
with app['shared_states'].lock:
copied = app['shared_states'].agent_last_seen.copy()
for agent_id, prev in copied.items():
if now - prev >= app.config.heartbeat_timeout:
# TODO: change this to "send_event" (actual zeromq push) for non-duplicate events
app['event_dispatcher'].dispatch('instance_terminated',
agent_id, ('agent-lost', ))
except asyncio.CancelledError:
pass


# NOTE: This event is ignored during the grace period.
@catch_unexpected(log)
async def instance_stats(app, agent_id, kern_stats):
Expand Down Expand Up @@ -559,12 +568,18 @@ async def init(app):
app['stream_pty_handlers'] = defaultdict(set)
app['stream_stdin_socks'] = defaultdict(set)

app['agent_lost_checker'] = aiotools.create_timer(
functools.partial(check_agent_lost, app), 1.0)

app['registry'] = InstanceRegistry(app['dbpool'])
await app['registry'].init()
app['status'] = GatewayStatus.RUNNING


async def shutdown(app):
app['agent_lost_checker'].cancel()
await app['agent_lost_checker']

checked_tasks = ('kernel_agent_event_collector', 'kernel_ddtimer')
for tname in checked_tasks:
t = app.get(tname, None)
Expand Down
20 changes: 13 additions & 7 deletions sorna/gateway/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ipaddress import ip_address
import logging
import multiprocessing as mp
from multiprocessing.managers import SyncManager
import signal
import ssl
import sys
Expand Down Expand Up @@ -262,14 +263,19 @@ def main():

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

manager = mp.Manager()
shared_states = manager.dict()
shared_states['agent_last_seen'] = manager.dict()
manager = SyncManager()
manager.start(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
shared_states = manager.Namespace()
shared_states.lock = manager.Lock()
shared_states.agent_last_seen = manager.dict()

aiotools.start_server(server_main, num_workers=2,
extra_procs=[event_router],
args=(config, shared_states))
log.info('terminated.')
try:
aiotools.start_server(server_main, num_workers=2,
extra_procs=[event_router],
args=(config, shared_states))
finally:
manager.shutdown()
log.info('terminated.')


if __name__ == '__main__':
Expand Down
32 changes: 31 additions & 1 deletion sorna/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ async def handle_heartbeat(self, agent_id, agent_info):
result = await conn.execute(query)
assert result.rowcount == 1
elif prev_status == AgentStatus.ALIVE:
log.debug(f'agent {agent_id} still alive')
pass
elif prev_status in (AgentStatus.LOST, AgentStatus.TERMINATED):
log.warning(f'agent {agent_id} revived!')
query = (sa.update(agents)
Expand All @@ -556,6 +556,36 @@ async def handle_heartbeat(self, agent_id, agent_info):
else:
log.error(f'should not reach here! {type(prev_status)}')

async def mark_agent_terminated(self, agent_id, status, conn=None):
# TODO: interpret kern_id to sess_id
#for kern_id in (await app['registry'].get_kernels_in_instance(agent_id)):
# for handler in app['stream_pty_handlers'][kern_id].copy():
# handler.cancel()
# await handler
# TODO: define behavior when agent reuse running instances upon revive
#await app['registry'].forget_all_kernels_in_instance(agent_id)
async with reenter_txn(self.dbpool, conn) as conn:

query = (sa.select([agents.c.status])
.select_from(agents)
.where(agents.c.id == agent_id))
result = await conn.execute(query)
prev_status = await result.scalar()
if prev_status in (None, AgentStatus.LOST, AgentStatus.TERMINATED):
return

if status == AgentStatus.LOST:
log.warning(f'agent {agent_id} heartbeat timeout detected.')
elif status == AgentStatus.TERMINATED:
log.warning(f'agent {agent_id} has terminated.')
query = (sa.update(agents)
.values({
'status': status,
'lost_at': datetime.now(tzutc()),
})
.where(agents.c.id == agent_id))
await conn.execute(query)

async def mark_kernel_terminated(self, kernel_id, conn=None):
async with reenter_txn(self.dbpool, conn) as conn:
# check if already terminated
Expand Down

0 comments on commit f7e51a3

Please sign in to comment.