Skip to content

Commit

Permalink
persist LC sync progress across restarts (#4371)
Browse files Browse the repository at this point in the history
Persist the latest finalized header and sync committee across restarts
of `nimbus_light_client` to avoid redoing time-consuming bootstrap step.
  • Loading branch information
etan-status authored Nov 30, 2022
1 parent c46dc3d commit 2e09011
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 24 deletions.
21 changes: 8 additions & 13 deletions beacon_chain/beacon_chain_db_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers]
spec/[eth2_ssz_serialization, helpers],
./db_limits

logScope: topics = "lcdata"

Expand Down Expand Up @@ -83,12 +84,6 @@ type
## Tracks the finalized sync committee periods for which complete data
## has been imported (from `dag.tail.slot`).

# No `uint64` support in Sqlite
template isSupportedBySQLite(slot: Slot): bool =
slot <= int64.high.Slot
template isSupportedBySQLite(period: SyncCommitteePeriod): bool =
period <= int64.high.SyncCommitteePeriod

proc initCurrentBranchesStore(
backend: SqStoreRef,
name: string): KvResult[CurrentSyncCommitteeBranchStore] =
Expand Down Expand Up @@ -152,9 +147,9 @@ proc getCurrentSyncCommitteeBranch*(
res.expect("SQL query OK")
try:
return SSZ.decode(branch, altair.CurrentSyncCommitteeBranch)
except MalformedSszError, SszSizeMismatchError:
error "LC store corrupted", store = "currentBranches", slot,
exc = getCurrentException().name, err = getCurrentExceptionMsg()
except SszError as exc:
error "LC data store corrupted", store = "currentBranches",
slot, exc = exc.msg
return default(altair.CurrentSyncCommitteeBranch)

func putCurrentSyncCommitteeBranch*(
Expand Down Expand Up @@ -222,9 +217,9 @@ proc getBestUpdate*(
res.expect("SQL query OK")
try:
return SSZ.decode(update, altair.LightClientUpdate)
except MalformedSszError, SszSizeMismatchError:
error "LC store corrupted", store = "bestUpdates", period,
exc = getCurrentException().name, err = getCurrentExceptionMsg()
except SszError as exc:
error "LC data store corrupted", store = "bestUpdates",
period, exc = exc.msg
return default(altair.LightClientUpdate)

func putBestUpdate*(
Expand Down
7 changes: 5 additions & 2 deletions beacon_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1169,8 +1169,11 @@ func outWalletFile*(config: BeaconNodeConf): Option[OutFile] =
else:
fail()

func databaseDir*(config: AnyConf): string =
config.dataDir / "db"
func databaseDir*(dataDir: OutDir): string =
dataDir / "db"

template databaseDir*(config: AnyConf): string =
config.dataDir.databaseDir

func runAsService*(config: BeaconNodeConf): bool =
config.cmd == noCommand and config.runAsServiceFlag
Expand Down
3 changes: 3 additions & 0 deletions beacon_chain/conf_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type LightClientConf* = object
defaultValue: 0
name: "stop-at-epoch" .}: uint64

template databaseDir*(config: LightClientConf): string =
config.dataDir.databaseDir

template loadJwtSecret*(
rng: var HmacDrbgContext,
config: LightClientConf,
Expand Down
19 changes: 19 additions & 0 deletions beacon_chain/db_limits.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import spec/datatypes/constants

# No `uint64` support in Sqlite
template isSupportedBySQLite*(slot: Slot): bool =
slot <= int64.high.Slot
template isSupportedBySQLite*(period: SyncCommitteePeriod): bool =
period <= int64.high.SyncCommitteePeriod
7 changes: 7 additions & 0 deletions beacon_chain/light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func optimisticHeader*(lightClient: LightClient): Opt[BeaconBlockHeader] =
else:
err()

func finalizedSyncCommittee*(
lightClient: LightClient): Opt[altair.SyncCommittee] =
if lightClient.store[].isSome:
ok lightClient.store[].get.current_sync_committee
else:
err()

proc createLightClient(
network: Eth2Node,
rng: ref HmacDrbgContext,
Expand Down
187 changes: 187 additions & 0 deletions beacon_chain/light_client_db.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
# Status libraries
chronicles,
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./db_limits

logScope: topics = "lcdb"

# `altair_lc_headers` holds the latest `LightClientStore.finalized_header`.
#
# `altair_sync_committees` holds finalized `SyncCommittee` by period, needed to
# continue an interrupted sync process without having to obtain bootstrap info.

type
LightClientHeaderKind {.pure.} = enum
Finalized = 1

LightClientHeadersStore = object
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]

SyncCommitteeStore = object
getStmt: SqliteStmt[int64, seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
keepFromStmt: SqliteStmt[int64, void]

LightClientDB* = ref object
backend: SqStoreRef
## SQLite backend

headers: LightClientHeadersStore
## LightClientHeaderKind -> BeaconBlockHeader
## Stores the latest light client headers.

syncCommittees: SyncCommitteeStore
## SyncCommitteePeriod -> altair.SyncCommittee
## Stores finalized `SyncCommittee` by sync committee period.

func initLightClientHeadersStore(
backend: SqStoreRef,
name: string): KvResult[LightClientHeadersStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`kind` INTEGER PRIMARY KEY, -- `LightClientHeaderKind`
`header` BLOB -- `BeaconBlockHeader` (SSZ)
);
""")

let
getStmt = backend.prepareStmt("""
SELECT `header`
FROM `""" & name & """`
WHERE `kind` = ?;
""", int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`kind`, `header`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")

ok LightClientHeadersStore(
getStmt: getStmt,
putStmt: putStmt)

func close(store: LightClientHeadersStore) =
store.getStmt.dispose()
store.putStmt.dispose()

proc getLatestFinalizedHeader*(db: LightClientDB): Opt[BeaconBlockHeader] =
var header: seq[byte]
for res in db.headers.getStmt.exec(
LightClientHeaderKind.Finalized.int64, header):
res.expect("SQL query OK")
try:
return ok SSZ.decode(header, BeaconBlockHeader)
except SszError as exc:
error "LC store corrupted", store = "headers",
kind = "Finalized", exc = exc.msg
return err()

func putLatestFinalizedHeader*(
db: LightClientDB, header: BeaconBlockHeader) =
block:
let res = db.headers.putStmt.exec(
(LightClientHeaderKind.Finalized.int64, SSZ.encode(header)))
res.expect("SQL query OK")
block:
let period = header.slot.sync_committee_period
doAssert period.isSupportedBySQLite
let res = db.syncCommittees.keepFromStmt.exec(period.int64)
res.expect("SQL query OK")

func initSyncCommitteesStore(
backend: SqStoreRef,
name: string): KvResult[SyncCommitteeStore] =
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`sync_committee` BLOB -- `altair.SyncCommittee` (SSZ)
);
""")

let
getStmt = backend.prepareStmt("""
SELECT `sync_committee`
FROM `""" & name & """`
WHERE `period` = ?;
""", int64, seq[byte], managed = false).expect("SQL query OK")
putStmt = backend.prepareStmt("""
REPLACE INTO `""" & name & """` (
`period`, `sync_committee`
) VALUES (?, ?);
""", (int64, seq[byte]), void, managed = false).expect("SQL query OK")
keepFromStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """`
WHERE `period` < ?;
""", int64, void, managed = false).expect("SQL query OK")

ok SyncCommitteeStore(
getStmt: getStmt,
putStmt: putStmt,
keepFromStmt: keepFromStmt)

func close(store: SyncCommitteeStore) =
store.getStmt.dispose()
store.putStmt.dispose()
store.keepFromStmt.dispose()

proc getSyncCommittee*(
db: LightClientDB, period: SyncCommitteePeriod): Opt[altair.SyncCommittee] =
doAssert period.isSupportedBySQLite
var syncCommittee: seq[byte]
for res in db.syncCommittees.getStmt.exec(period.int64, syncCommittee):
res.expect("SQL query OK")
try:
return ok SSZ.decode(syncCommittee, altair.SyncCommittee)
except SszError as exc:
error "LC store corrupted", store = "syncCommittees",
period, exc = exc.msg
return err()

func putSyncCommittee*(
db: LightClientDB, period: SyncCommitteePeriod,
syncCommittee: altair.SyncCommittee) =
doAssert period.isSupportedBySQLite
let res = db.syncCommittees.putStmt.exec(
(period.int64, SSZ.encode(syncCommittee)))
res.expect("SQL query OK")

type LightClientDBNames* = object
altairHeaders*: string
altairSyncCommittees*: string

func initLightClientDB*(
backend: SqStoreRef,
names: LightClientDBNames): KvResult[LightClientDB] =
let
headers =
? backend.initLightClientHeadersStore(names.altairHeaders)
syncCommittees =
? backend.initSyncCommitteesStore(names.altairSyncCommittees)

ok LightClientDB(
backend: backend,
headers: headers,
syncCommittees: syncCommittees)

func close*(db: LightClientDB) =
if db.backend != nil:
db.headers.close()
db.syncCommittees.close()
db[].reset()
49 changes: 40 additions & 9 deletions beacon_chain/nimbus_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

import
std/os,
chronicles, chronicles/chronos_tools, chronos,
eth/keys,
chronicles, chronicles/chronos_tools, chronos, stew/io2,
eth/db/kvstore_sqlite3, eth/keys,
./eth1/eth1_monitor,
./gossip_processing/optimistic_processor,
./networking/topic_params,
./spec/beaconstate,
./spec/datatypes/[phase0, altair, bellatrix],
"."/[light_client, nimbus_binary_common, version]
"."/[filepath, light_client, light_client_db, nimbus_binary_common, version]

from ./consensus_object_pools/consensus_manager import runForkchoiceUpdated
from ./gossip_processing/block_processor import newExecutionPayload
Expand Down Expand Up @@ -45,6 +45,18 @@ programMain:
notice "Launching light client",
version = fullVersionStr, cmdParams = commandLineParams(), config

let dbDir = config.databaseDir
if (let res = secureCreatePath(dbDir); res.isErr):
fatal "Failed to create create database directory",
path = dbDir, err = ioErrorMsg(res.error)
quit 1
let backend = SqStoreRef.init(dbDir, "nlc").expect("Database OK")
defer: backend.close()
let db = backend.initLightClientDB(LightClientDBNames(
altairHeaders: "altair_lc_headers",
altairSyncCommittees: "altair_sync_committees")).expect("Database OK")
defer: db.close()

let metadata = loadEth2Network(config.eth2Network)
for node in metadata.bootstrapNodes:
config.bootstrapNodes.add node
Expand Down Expand Up @@ -145,6 +157,12 @@ programMain:
info "New LC finalized header",
finalized_header = shortLog(finalizedHeader)

let
period = finalizedHeader.slot.sync_committee_period
syncCommittee = lightClient.finalizedSyncCommittee.expect("Bootstrap OK")
db.putSyncCommittee(period, syncCommittee)
db.putLatestFinalizedHeader(finalizedHeader)

proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: BeaconBlockHeader) =
info "New LC optimistic header",
Expand All @@ -155,6 +173,16 @@ programMain:
lightClient.onOptimisticHeader = onOptimisticHeader
lightClient.trustedBlockRoot = some config.trustedBlockRoot

let latestHeader = db.getLatestFinalizedHeader()
if latestHeader.isOk:
let
period = latestHeader.get.slot.sync_committee_period
syncCommittee = db.getSyncCommittee(period)
if syncCommittee.isErr:
error "LC store lacks sync committee", finalized_header = latestHeader.get
else:
lightClient.resetToFinalizedHeader(latestHeader.get, syncCommittee.get)

# Full blocks gossip is required to portably drive an EL client:
# - EL clients may not sync when only driven with `forkChoiceUpdated`,
# e.g., Geth: "Forkchoice requested unknown head"
Expand All @@ -166,11 +194,7 @@ programMain:
# Therefore, this current mechanism is to be seen as temporary; it is not
# optimized for reducing code duplication, e.g., with `nimbus_beacon_node`.

func shouldSyncOptimistically(wallSlot: Slot): bool =
# Check whether an EL is connected
if eth1Monitor == nil:
return false

func isSynced(wallSlot: Slot): bool =
# Check whether light client is used
let optimisticHeader = lightClient.optimisticHeader.valueOr:
return false
Expand All @@ -182,6 +206,13 @@ programMain:

true

func shouldSyncOptimistically(wallSlot: Slot): bool =
# Check whether an EL is connected
if eth1Monitor == nil:
return false

isSynced(wallSlot)

var blocksGossipState: GossipState = {}
proc updateBlocksGossipStatus(slot: Slot) =
let
Expand Down Expand Up @@ -243,7 +274,7 @@ programMain:
syncStatus =
if optimisticHeader.isNone:
"bootstrapping(" & $config.trustedBlockRoot & ")"
elif not shouldSyncOptimistically(wallSlot):
elif not isSynced(wallSlot):
"syncing"
else:
"synced"
Expand Down

0 comments on commit 2e09011

Please sign in to comment.