Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update findCIDInNetwork #3825

Merged
merged 15 commits into from
Sep 6, 2022
9 changes: 3 additions & 6 deletions creator-node/src/fileManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ const MAX_MEMORY_FILE_SIZE = parseInt(config.get('maxMemoryFileSizeBytes')) // D
const ALLOWED_UPLOAD_FILE_EXTENSIONS = config.get('allowedUploadFileExtensions') // default set in config.json
const AUDIO_MIME_TYPE_REGEX = /audio\/(.*)/

const EMPTY_FILE_CID = 'QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH' // deterministic CID for a 0 byte, completely empty file

/**
* Saves file to disk under /multihash name
*/
Expand All @@ -45,7 +43,7 @@ async function saveFileFromBufferToDisk(req, buffer, numRetries = 5) {
const fileSize = (await fs.stat(dstPath)).size
const fileIsEmpty = fileSize === 0
// there is one case where an empty file could be valid, check for that CID explicitly
if (fileIsEmpty && cid !== EMPTY_FILE_CID) {
if (fileIsEmpty && cid !== Utils.EMPTY_FILE_CID) {
throw new Error(`File has no content, content length is 0: ${cid}`)
}

Expand Down Expand Up @@ -342,7 +340,7 @@ async function saveFileForMultihashToFS(

const fileIsEmpty = fileSize === 0
// there is one case where an empty file could be valid, check for that CID explicitly
if (fileIsEmpty && multihash !== EMPTY_FILE_CID) {
if (fileIsEmpty && multihash !== Utils.EMPTY_FILE_CID) {
throw new Error(
`File has no content, content length is 0: ${multihash}`
)
Expand Down Expand Up @@ -690,6 +688,5 @@ module.exports = {
checkFileMiddleware,
getTmpTrackUploadArtifactsPathWithInputUUID,
getTmpSegmentsPath,
copyMultihashToFs,
EMPTY_FILE_CID
copyMultihashToFs
}
4 changes: 3 additions & 1 deletion creator-node/src/routes/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,9 @@ const getDirCID = async (req, res) => {
// CID is the file CID, parse it from the storagePath
const CID = storagePath.split('/').slice(-1).join('')
const libs = req.app.get('audiusLibs')
await findCIDInNetwork(storagePath, CID, req.logger, libs)
const found = await findCIDInNetwork(storagePath, CID, req.logger, libs)
if (!found) throw new Error(`CID=${CID} not found in network`)

return await streamFromFileSystem(req, res, storagePath)
} catch (e) {
req.logger.error(
Expand Down
8 changes: 6 additions & 2 deletions creator-node/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import {
createDirForFile,
writeStreamToFileSystem,
_streamFileToDiskHelper,
runShellCommand
runShellCommand,
verifyCIDMatchesExpected,
EMPTY_FILE_CID
} from './legacyUtils'
import {
validateMetadata,
Expand Down Expand Up @@ -53,5 +55,7 @@ module.exports = {
runShellCommand,
validateAssociatedWallets,
validateMetadata,
strToReplicaSet
strToReplicaSet,
verifyCIDMatchesExpected,
EMPTY_FILE_CID
}
177 changes: 129 additions & 48 deletions creator-node/src/utils/legacyUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ const axios = require('axios')
const spawn = require('child_process').spawn
const stream = require('stream')
const { promisify } = require('util')
const pipeline = promisify(stream.pipeline)
const { logger: genericLogger } = require('../logging.js')

const { logger: genericLogger } = require('../logging')
const asyncRetry = require('./asyncRetry')
const models = require('../models')
const redis = require('../redis')
const config = require('../config')
const { generateTimestampAndSignature } = require('../apiSigning')
const { libs } = require('@audius/sdk')

const pipeline = promisify(stream.pipeline)
const LibsUtils = libs.Utils

const THIRTY_MINUTES_IN_SECONDS = 60 * 30

export const EMPTY_FILE_CID = 'QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH' // deterministic CID for a 0 byte, completely empty file

export function verifySignature(data, sig) {
return recoverPersonalSignature({ data, sig })
}
Expand Down Expand Up @@ -103,14 +107,18 @@ export async function validateStateForImageDirCIDAndReturnFileUUID(
}

/**
* Fetches a CID from the Content Node network
* Fetches a CID from the Content Node network, verifies content, and writes to disk up to numRetries times.
* If the fetch request is unauthorized or bad, or if the target content is delisted or not found, do not retry on
* the particular Content Node.
* Also do not retry if after content verifications that recently written content is not what is expected.
*
* @param {String} filePath location of the file on disk
* @param {String} cid content hash of the file
* @param {Object} logger logger object
* @param {Object} libs libs instance
* @param {Integer?} trackId optional trackId that corresponds to the cid, see file_lookup route for more info
* @param {Array?} excludeList optional array of content nodes to exclude in network wide search
* @param {number?} [numRetries=5] the number of retries to attempt to fetch cid, write to disk, and verify
* @returns {Boolean} returns true if the file was found in the network
*/
export async function findCIDInNetwork(
Expand All @@ -119,74 +127,115 @@ export async function findCIDInNetwork(
logger,
libs,
trackId = null,
excludeList = []
excludeList = [],
numRetries = 5
) {
if (!config.get('findCIDInNetworkEnabled')) return
let found = false
if (!config.get('findCIDInNetworkEnabled')) return false

const attemptedStateFix = await getIfAttemptedStateFix(filePath)
if (attemptedStateFix) return
if (attemptedStateFix) return false

// get list of creator nodes
// Get all registered Content Nodes
const creatorNodes = await getAllRegisteredCNodes(libs)
if (!creatorNodes.length) return
if (!creatorNodes.length) return false

// Remove excluded nodes from list of creator nodes, no-op if empty list or nothing passed in
// Remove excluded nodes from list of creator nodes or self, no-op if empty list or nothing passed in
const creatorNodesFiltered = creatorNodes.filter(
(c) => !excludeList.includes(c.endpoint)
(c) =>
!excludeList.includes(c.endpoint) ||
config.get('creatorNodeEndpoint') !== c.endpoint
)

// generate signature
// Generate signature to auth fetching files
const delegateWallet = config.get('delegateOwnerWallet').toLowerCase()
const { signature, timestamp } = generateTimestampAndSignature(
{ filePath, delegateWallet },
config.get('delegatePrivateKey')
)
let node

for (let index = 0; index < creatorNodesFiltered.length; index++) {
node = creatorNodesFiltered[index]
let found = false
for (const { endpoint } of creatorNodesFiltered) {
try {
const resp = await axios({
method: 'get',
url: `${node.endpoint}/file_lookup`,
params: {
filePath,
timestamp,
delegateWallet,
signature,
trackId
},
responseType: 'stream',
timeout: 1000
})
if (resp.data) {
await writeStreamToFileSystem(resp.data, filePath, /* createDir */ true)
found = await asyncRetry({
asyncFn: async (bail) => {
let response
try {
response = await axios({
method: 'get',
url: `${endpoint}/file_lookup`,
params: {
filePath,
timestamp,
delegateWallet,
signature,
trackId
},
responseType: 'stream',
timeout: 1000
})
} catch (e) {
if (
e.response?.status === 403 || // delist
e.response?.status === 401 || // unauth
e.response?.status === 400 || // bad req
e.response?.status === 404 // not found
) {
bail(
new Error(
`Content multihash=${cid} not available on ${endpoint} with statusCode=${e.response?.status}`
)
)
return
}

throw new Error(
`Failed to fetch content multihash=${cid} with statusCode=${e.response?.status} from endpoint=${endpoint}. Retrying..`
)
}

if (!response || !response.data) {
throw new Error('Received empty response from file lookup')
}

await writeStreamToFileSystem(
response.data,
filePath,
/* createDir */ true
)

// Verify that the file written matches the hash expected
const expectedCID = await LibsUtils.fileHasher.generateNonImageCid(
filePath
)
const isCIDProper = await verifyCIDMatchesExpected({
cid,
path: filePath,
logger
})

if (!isCIDProper) {
try {
await fs.unlink(filePath)
} catch (e) {
logger.error(`Could not remove file at path=${path}`)
}

bail(new Error(`CID=${cid} from endpoint=${endpoint} is improper`))
return
}

if (cid !== expectedCID) {
await fs.unlink(filePath)
logger.error(
`findCIDInNetwork - File contents from ${node.endpoint} and hash don't match. CID: ${cid} expectedCID: ${expectedCID}`
)
} else {
found = true
logger.info(
`findCIDInNetwork - successfully fetched file ${filePath} from node ${node.endpoint}`
`Successfully fetched CID=${cid} file=${filePath} from node ${endpoint}`
)
break
},
logger,
logLabel: 'findCIDInNetwork',
options: {
retries: numRetries,
minTimeout: 3000
}
}
})

found = true
} catch (e) {
// Do not error and stop the flow of execution for functions that call it
logger.error(
`findCIDInNetwork fetch error from ${node.endpoint} - ${e.toString()}`
)
continue
logger.error(`findCIDInNetwork error from ${endpoint} - ${e.message}`)
}
}

Expand Down Expand Up @@ -355,6 +404,36 @@ export function currentNodeShouldHandleTranscode({
return currentNodeShouldHandleTranscode
}

/**
* Verify that the file written matches the hash expected
* @param {Object} param
* @param {string} param.cid target cid
* @param {string} param.path the path at which the cid exists
* @param {Object} param.logger
* @returns boolean if the cid is proper or not
*/
export async function verifyCIDMatchesExpected({ cid, path, logger }) {
const fileSize = (await fs.stat(path)).size
const fileIsEmpty = fileSize === 0

// there is one case where an empty file could be valid, check for that CID explicitly
if (fileIsEmpty && cid !== EMPTY_FILE_CID) {
logger.error(`File has no content, content length is 0: ${cid}`)
return false
}

const expectedCID = await LibsUtils.fileHasher.generateNonImageCid(path)

const isCIDProper = cid === expectedCID
if (!isCIDProper) {
logger.error(
`File contents and hash don't match. CID: ${cid} expectedCID: ${expectedCID}`
)
}

return isCIDProper
}

module.exports.timeout = timeout
module.exports.verifySignature = verifySignature
module.exports.getRandomInt = getRandomInt
Expand All @@ -366,3 +445,5 @@ module.exports.findCIDInNetwork = findCIDInNetwork
module.exports.runShellCommand = runShellCommand
module.exports.currentNodeShouldHandleTranscode =
currentNodeShouldHandleTranscode
module.exports.verifyCIDMatchesExpected = verifyCIDMatchesExpected
module.exports.EMPTY_FILE_CID = EMPTY_FILE_CID
Loading