Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add granularity to secondary sync from primary metric recording #3854

Merged
merged 5 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,20 @@ export const METRIC_LABELS = Object.freeze({
result: [
'success',
'success_clocks_already_match',
'success_force_wipe',
'abort_user_does_not_exist_on_node',
'abort_multiple_users_returned_from_export',
'abort_missing_user_export_key_fields',
'abort_mismatched_export_wallet',
'abort_current_node_is_not_user_primary',
'abort_current_node_is_not_user_secondary',
'abort_force_wipe_disabled',
'failure_force_resync_check',
'failure_fetching_user_gateway',
'failure_delete_db_data',
'failure_sync_secondary_from_primary',
'failure_malformed_export',
'failure_db_transaction',
'failure_sync_in_progress',
'abort_sync_in_progress',
'failure_export_wallet',
'failure_skip_threshold_not_reached',
'failure_import_not_consistent',
Expand Down
121 changes: 69 additions & 52 deletions creator-node/src/services/sync/secondarySyncFromPrimary.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const handleSyncFromPrimary = async ({

const logger = secondarySyncFromPrimaryLogger

let returnValue = {}
try {
try {
await redis.WalletWriteLock.acquire(
Expand All @@ -42,10 +41,8 @@ const handleSyncFromPrimary = async ({
} catch (e) {
tracing.recordException(e)
return {
error: new Error(
`Cannot change state of wallet ${wallet}. Node sync currently in progress.`
),
result: 'failure_sync_in_progress'
abort: `Cannot change state of wallet ${wallet}. Node sync currently in progress`,
result: 'abort_sync_in_progress'
}
}

Expand All @@ -58,9 +55,10 @@ const handleSyncFromPrimary = async ({
ensurePrimary: false
})
if (userReplicaSet.primary !== creatorNodeEndpoint) {
throw new Error(
`Node being synced from is not primary. Node being synced from: ${creatorNodeEndpoint} Primary: ${userReplicaSet.primary}`
)
return {
abort: `Node being synced from is not primary. Node being synced from: ${creatorNodeEndpoint} Primary: ${userReplicaSet.primary}`,
result: 'abort_current_node_is_not_user_primary'
}
}

/**
Expand All @@ -86,18 +84,19 @@ const handleSyncFromPrimary = async ({

// Ensure we never wipe the data of a primary
if (thisContentNodeEndpoint === userReplicaSet.primary) {
throw new Error(
`Tried to wipe data of a primary. User replica set: ${JSON.stringify(
return {
abort: `Tried to wipe data of a primary. User replica set: ${JSON.stringify(
userReplicaSet
)}`
)
)}`,
result: 'abort_current_node_is_not_user_primary'
}
}

// Short circuit if wiping is disabled via env var
if (!config.get('syncForceWipeEnabled')) {
logger.warn('Stopping sync early because syncForceWipeEnabled=false')
return {
result: 'success'
abort: 'Stopping sync early because syncForceWipeEnabled=false',
result: 'abort_force_wipe_disabled'
}
}

Expand All @@ -117,16 +116,15 @@ const handleSyncFromPrimary = async ({
}

if (deleteError) {
returnValue = {
return {
error: deleteError,
result: 'failure_delete_db_data'
}
throw returnValue.error
}

if (forceWipe) {
return {
result: 'success'
result: 'success_force_wipe'
}
}
} else {
Expand All @@ -143,9 +141,10 @@ const handleSyncFromPrimary = async ({
thisContentNodeEndpoint !== userReplicaSet.secondary1 &&
thisContentNodeEndpoint !== userReplicaSet.secondary2
) {
throw new Error(
`This node is not one of the user's secondaries. This node: ${thisContentNodeEndpoint} Secondaries: [${userReplicaSet.secondary1},${userReplicaSet.secondary2}]`
)
return {
abort: `This node is not one of the user's secondaries. This node: ${thisContentNodeEndpoint} Secondaries: [${userReplicaSet.secondary1},${userReplicaSet.secondary2}]`,
result: 'abort_current_node_is_not_user_secondary'
}
}

/**
Expand All @@ -154,19 +153,25 @@ const handleSyncFromPrimary = async ({
* Secondary requests export of new data by passing its current max clock value in the request.
* Primary builds an export object of all data beginning from the next clock value.
*/
const { fetchedCNodeUser, error } = await fetchExportFromNode({
const { fetchedCNodeUser, error, abort } = await fetchExportFromNode({
nodeEndpointToFetchFrom: creatorNodeEndpoint,
wallet,
clockRangeMin: localMaxClockVal + 1,
selfEndpoint: thisContentNodeEndpoint,
logger
})
if (!_.isEmpty(error)) {
returnValue = {
return {
error: new Error(error.message),
result: error.code
}
throw returnValue.error
}

if (abort) {
return {
abort: abort.message,
result: abort.code
}
}

const {
Expand Down Expand Up @@ -205,8 +210,15 @@ const handleSyncFromPrimary = async ({
} catch (e) {
tracing.recordException(e)
logger.error(
`Couldn't filter out own endpoint from user's replica set to use use as cnode gateways in saveFileForMultihashToFS - ${e.message}`
`Couldn't filter out own endpoint from user's replica set to use as cnode gateways in saveFileForMultihashToFS - ${e.message}`
)

return {
error: new Error(
`Couldn't filter out own endpoint from user's replica set to use as cnode gateways in saveFileForMultihashToFS - ${e.message}`
),
result: 'failure_fetching_user_gateway'
}
}

/**
Expand All @@ -220,36 +232,37 @@ const handleSyncFromPrimary = async ({

// Error if returned data is not within requested range
if (fetchedLatestClockVal < localMaxClockVal) {
returnValue = {
return {
error: new Error(
`Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}`
),
result: 'failure_inconsistent_clock'
}
throw returnValue.error
} else if (
}

if (
localMaxClockVal !== -1 &&
fetchedClockRecords[0] &&
fetchedClockRecords[0].clock !== localMaxClockVal + 1
) {
returnValue = {
return {
error: new Error(
`Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}`
),
result: 'failure_import_not_contiguous'
}
throw returnValue.error
} else if (
}

if (
!_.isEmpty(fetchedClockRecords) &&
maxClockRecordId !== fetchedLatestClockVal
) {
returnValue = {
return {
error: new Error(
`Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}`
),
result: 'failure_import_not_consistent'
}
throw returnValue.error
}

// All DB updates must happen in single atomic tx - partial state updates will lead to data loss
Expand All @@ -260,7 +273,7 @@ const handleSyncFromPrimary = async ({
*/
try {
logger.info(
`beginning add ops for cnodeUser wallet ${fetchedWalletPublicKey}`
`Beginning db updates for cnodeUser wallet ${fetchedWalletPublicKey}`
)

/**
Expand Down Expand Up @@ -309,14 +322,14 @@ const handleSyncFromPrimary = async ({

// Error if update failed
if (numRowsUpdated !== 1 || respObj.length !== 1) {
returnValue = {
return {
error: new Error(
`Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}`
),
result: 'failure_db_transaction'
}
throw returnValue.error
}

cnodeUser = respObj[0]
} else {
logger.info(
Expand Down Expand Up @@ -531,9 +544,9 @@ const handleSyncFromPrimary = async ({
e
)

await transaction.rollback()

try {
await transaction.rollback()

const numRowsUpdated = await DBManager.fixInconsistentUser(
fetchedCNodeUser.cnodeUserUUID
)
Expand All @@ -542,29 +555,25 @@ const handleSyncFromPrimary = async ({
)
} catch (e) {
logger.error(
`fixInconsistentUser() error for ${fetchedCNodeUser.cnodeUserUUID} - ${e.message}`
`rollback or fixInconsistentUser() error for ${fetchedCNodeUser.cnodeUserUUID} - ${e.message}`
)
}

return _.isEmpty(returnValue)
? {
error: new Error(e),
result: 'failure_db_transaction'
}
: returnValue
return {
error: e,
result: 'failure_db_transaction'
}
}
} catch (e) {
tracing.recordException(e)
await SyncHistoryAggregator.recordSyncFail(wallet)

// for final log check the _secondarySyncFromPrimary function

return _.isEmpty(returnValue)
? {
error: new Error(e),
result: 'failure_sync_secondary_from_primary'
}
: returnValue
return {
error: e,
result: 'failure_sync_secondary_from_primary'
}
} finally {
try {
await redis.WalletWriteLock.release(wallet)
Expand Down Expand Up @@ -621,7 +630,7 @@ async function _secondarySyncFromPrimary({

secondarySyncFromPrimaryLogger.info('begin nodesync', 'time', start)

const { error, result } = await handleSyncFromPrimary({
const { error, result, abort } = await handleSyncFromPrimary({
serviceRegistry,
wallet,
creatorNodeEndpoint,
Expand All @@ -643,7 +652,15 @@ async function _secondarySyncFromPrimary({
Date.now() - start
}. From endpoint ${creatorNodeEndpoint}. Prometheus result: ${result}`
)
throw new Error(error)
throw error
}

if (abort) {
secondarySyncFromPrimaryLogger.warn(
`Sync complete for wallet: ${wallet}. Status: Abort. Duration sync: ${
Date.now() - start
}. From endpoint ${creatorNodeEndpoint}. Prometheus result: ${result}`
)
} else {
secondarySyncFromPrimaryLogger.info(
`Sync complete for wallet: ${wallet}. Status: Success. Duration sync: ${
Expand Down
27 changes: 18 additions & 9 deletions creator-node/src/services/sync/syncUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ type FetchExportOutput = {
fetchedCNodeUser?: any
error?: {
message: string
code: 'failure_export_wallet' | 'failure_malformed_export'
code: 'failure_export_wallet'
}
abort?: {
message: string
code:
| 'abort_user_does_not_exist_on_node'
| 'abort_multiple_users_returned_from_export'
| 'abort_missing_user_export_key_fields'
| 'abort_mismatched_export_wallet'
}
}

Expand Down Expand Up @@ -80,18 +88,18 @@ async function fetchExportFromNode({
const { data: body } = exportResp
if (!_.has(body, 'data.cnodeUsers') || _.isEmpty(body.data.cnodeUsers)) {
return {
error: {
abort: {
message: '"cnodeUsers" array is empty or missing from response body',
code: 'failure_malformed_export'
code: 'abort_user_does_not_exist_on_node'
}
}
}
const { cnodeUsers } = body.data
if (Object.keys(cnodeUsers).length > 1) {
return {
error: {
abort: {
message: 'Multiple cnodeUsers returned from export',
code: 'failure_malformed_export'
code: 'abort_multiple_users_returned_from_export'
}
}
}
Expand All @@ -105,25 +113,26 @@ async function fetchExportFromNode({
!_.has(fetchedCNodeUser, ['clockInfo', 'requestedClockRangeMax'])
) {
return {
error: {
abort: {
message:
'Required properties not found on CNodeUser in response object',
code: 'failure_malformed_export'
code: 'abort_missing_user_export_key_fields'
}
}
}

// Validate wallet from cnodeUsers array matches the wallet we requested in the /export request
if (wallet !== fetchedCNodeUser.walletPublicKey) {
return {
error: {
abort: {
message: `Returned data for walletPublicKey that was not requested: ${fetchedCNodeUser.walletPublicKey}`,
code: 'failure_malformed_export'
code: 'abort_mismatched_export_wallet'
}
}
}

logger.info('Export successful')

return {
fetchedCNodeUser
}
Expand Down
Loading