Skip to content
This repository was archived by the owner on Jul 31, 2020. It is now read-only.

Commit 04fd995

Browse files
authored
Merge pull request #122 from brave/fix/limit-put-concurrency
Add requestUtil.bufferedPut to limit concurrency of put
2 parents 59eccd7 + eae9b0d commit 04fd995

9 files changed

+143
-18
lines changed

client/constants/messages.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ const messages = {
8282
* browser sends this to the webview with the data that needs to be synced
8383
* to the sync server.
8484
*/
85-
SEND_SYNC_RECORDS: _, /* @param {string} categoryName, @param {Array.<Object>} records */
85+
SEND_SYNC_RECORDS: _, /* @param {string=} categoryName, @param {Array.<Object>} records */
8686
/**
8787
* browser -> webview
8888
* browser sends this to delete the current user.

client/constants/proto.js

+7
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,10 @@ module.exports.actions = {
1111
UPDATE: 1,
1212
DELETE: 2
1313
}
14+
15+
module.exports.categoryMap = {
16+
bookmark: 'BOOKMARKS',
17+
historySite: 'HISTORY_SITES',
18+
siteSetting: 'PREFERENCES',
19+
device: 'PREFERENCES'
20+
}

client/recordUtil.js

+18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ const proto = require('./constants/proto')
55
const serializer = require('../lib/serializer')
66
const valueEquals = require('../lib/valueEquals')
77

8+
// ['0', '1', '2']
9+
module.exports.CATEGORY_IDS = Object.values(proto.categories)
10+
811
/**
912
* @param {string} type e.g. 'historySite'
1013
* @param {Function} isValidRecord checks if the update record has enough props to make a create record
@@ -290,3 +293,18 @@ module.exports.syncRecordAsJS = (record) => {
290293
}
291294
return object
292295
}
296+
297+
/**
298+
* Derive category ID number from a JS Sync record.
299+
* @param {Object} record e.g. {"action":0, "bookmark": {"isFolder": false,"site": {...}, ...}
300+
* @returns {string} e.g. '0' for bookmark
301+
*/
302+
module.exports.getRecordCategory = (record) => {
303+
for (let type in proto.categoryMap) {
304+
if (record[type]) {
305+
const categoryName = proto.categoryMap[type]
306+
if (!categoryName) { return undefined }
307+
return proto.categories[categoryName]
308+
}
309+
}
310+
}

client/requestUtil.js

+17-4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
const awsSdk = require('aws-sdk')
44
const cryptoUtil = require('./cryptoUtil')
5+
const recordUtil = require('./recordUtil')
56
const proto = require('./constants/proto')
7+
const {limitConcurrency} = require('../lib/promiseHelper')
68
const s3Helper = require('../lib/s3Helper')
79
const serializer = require('../lib/serializer')
810

911
const CONFIG = require('./config')
12+
const PUT_CONCURRENCY = 100
1013
const S3_MAX_RETRIES = 1
1114
const EXPIRED_CREDENTIAL_ERRORS = [
1215
/The provided token has expired\./,
@@ -55,6 +58,10 @@ const RequestUtil = function (opts = {}) {
5558
this.encrypt = cryptoUtil.Encrypt(this.serializer, opts.keys.secretboxKey, CONFIG.nonceCounter)
5659
this.decrypt = cryptoUtil.Decrypt(this.serializer, opts.keys.secretboxKey)
5760
this.sign = cryptoUtil.Sign(opts.keys.secretKey)
61+
this.putConcurrency = opts.putConcurrency || PUT_CONCURRENCY
62+
// Like put() but with limited concurrency to avoid out of memory/connection
63+
// errors (net::ERR_INSUFFICIENT_RESOURCES)
64+
this.bufferedPut = limitConcurrency(RequestUtil.prototype.put, this.putConcurrency)
5865
if (opts.credentialsBytes) {
5966
const credentials = this.parseAWSResponse(opts.credentialsBytes)
6067
this.saveAWSCredentials(credentials)
@@ -217,12 +224,18 @@ RequestUtil.prototype.currentRecordPrefix = function (category) {
217224

218225
/**
219226
* Puts a single record, splitting it into multiple objects if needed.
220-
* @param {string} category - the category ID
221-
* @param {Uint8Array} record - the object content, serialized and encrypted
227+
* See also bufferedPut() assigned in the constructor.
228+
* @param {string=} category - the category ID
229+
* @param {object} record - the object content
222230
*/
223231
RequestUtil.prototype.put = function (category, record) {
224-
const s3Prefix = this.currentRecordPrefix(category)
225-
const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, record)
232+
const thisCategory = category || recordUtil.getRecordCategory(record)
233+
if (!recordUtil.CATEGORY_IDS.includes(thisCategory)) {
234+
throw new Error(`Unsupported sync category: ${category}`)
235+
}
236+
const encryptedRecord = this.encrypt(record)
237+
const s3Prefix = this.currentRecordPrefix(thisCategory)
238+
const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, encryptedRecord)
226239
return this.withRetry(() => {
227240
const fetchPromises = s3Keys.map((key, _i) => {
228241
const params = {

client/sync.js

+3-4
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,8 @@ const startSync = (requester) => {
128128
ipc.send(messages.RESOLVED_SYNC_RECORDS, category, resolvedRecords)
129129
})
130130
ipc.on(messages.SEND_SYNC_RECORDS, (e, category, records) => {
131-
if (!proto.categories[category]) {
132-
throw new Error(`Unsupported sync category: ${category}`)
133-
}
131+
logSync(`Sending ${records.length} records`)
132+
const categoryId = proto.categories[category]
134133
records.forEach((record) => {
135134
// Workaround #17
136135
record.deviceId = new Uint8Array(record.deviceId)
@@ -139,7 +138,7 @@ const startSync = (requester) => {
139138
record.bookmark.parentFolderObjectId = new Uint8Array(record.bookmark.parentFolderObjectId)
140139
}
141140
logSync(`sending record: ${JSON.stringify(record)}`)
142-
requester.put(proto.categories[category], requester.encrypt(record))
141+
requester.bufferedPut(categoryId, record)
143142
})
144143
})
145144
ipc.on(messages.DELETE_SYNC_USER, (e) => {

lib/promiseHelper.js

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict'
2+
3+
/**
4+
* Wrap a Promise-returning function so calls to it fill a queue which has
5+
* a concurrency limit.
6+
* e.g. there is an API rate limited to 10 concurrent connections.
7+
* const getApi = (arg) => window.fetch(arg)
8+
* const throttledGetApi = limitConcurrency(getApi, 10)
9+
* for (let i; i < 1000; i++) { throttledGetApi(i) }
10+
* @param fn {function} Function which returns a Promise
11+
* @param concurrency {number} Maximum pending/concurrent fn calls
12+
* @returns {function}
13+
*/
14+
module.exports.limitConcurrency = function (fn, concurrency) {
15+
var queue = null
16+
var active = []
17+
const enqueueFnFactory = function (_this, args) {
18+
return function () {
19+
const enqueued = fn.apply(_this, args)
20+
enqueued.then(function () {
21+
active.splice(active.indexOf(enqueued), 1)
22+
})
23+
active.push(enqueued)
24+
return {
25+
enqueued,
26+
newQueue: Promise.race(active)
27+
}
28+
}
29+
}
30+
return function () {
31+
var enqueueFn = enqueueFnFactory(this, arguments)
32+
if (active.length < concurrency) {
33+
const promises = enqueueFn()
34+
queue = promises.newQueue
35+
return promises.enqueued
36+
} else {
37+
const advanceQueue = queue.then(enqueueFn)
38+
queue = advanceQueue.then(promises => promises.newQueue)
39+
return advanceQueue.then(promises => promises.enqueued)
40+
}
41+
}
42+
}

test/client/recordUtil.js

+14
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ const updateSiteProps = {customTitle: 'a ball pit filled with plush coconuts'}
5252
const recordBookmark = Record({objectData: 'bookmark', bookmark: props.bookmark})
5353
const recordHistorySite = Record({objectData: 'historySite', historySite: siteProps})
5454
const recordSiteSetting = Record({objectData: 'siteSetting', siteSetting: props.siteSetting})
55+
const recordDevice = Record({objectData: 'device', device: {name: 'test pyramid'}})
5556
const baseRecords = [recordBookmark, recordHistorySite, recordSiteSetting]
5657

5758
const updateBookmark = UpdateRecord({
@@ -509,3 +510,16 @@ test('recordUtil.syncRecordAsJS()', (t) => {
509510
conversionEquals({ objectData: 'device', device })
510511
})
511512
})
513+
514+
test('recordUtil.getRecordCategory()', (t) => {
515+
t.plan(8)
516+
const brokenRecord = Record({})
517+
t.equals(recordUtil.getRecordCategory(recordBookmark), '0')
518+
t.equals(recordUtil.getRecordCategory(updateBookmark), '0')
519+
t.equals(recordUtil.getRecordCategory(recordHistorySite), '1')
520+
t.equals(recordUtil.getRecordCategory(updateHistorySite), '1')
521+
t.equals(recordUtil.getRecordCategory(recordSiteSetting), '2')
522+
t.equals(recordUtil.getRecordCategory(updateSiteSetting), '2')
523+
t.equals(recordUtil.getRecordCategory(recordDevice), '2')
524+
t.equals(recordUtil.getRecordCategory(brokenRecord), undefined)
525+
})

test/client/requestUtil.js

+7-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ const test = require('tape')
22
const testHelper = require('../testHelper')
33
const timekeeper = require('timekeeper')
44
const clientTestHelper = require('./testHelper')
5-
const cryptoUtil = require('../../client/cryptoUtil')
65
const Serializer = require('../../lib/serializer')
76
const RequestUtil = require('../../client/requestUtil')
87
const proto = require('../../client/constants/proto')
@@ -45,7 +44,6 @@ test('client RequestUtil', (t) => {
4544
.catch((error) => { console.log(`Cleanup failed: ${error}`) })
4645
})
4746
const serializer = requestUtil.serializer
48-
const encrypt = cryptoUtil.Encrypt(serializer, keys.secretboxKey, 0)
4947

5048
t.plan(2)
5149
t.test('#put preference: device', (t) => {
@@ -60,7 +58,7 @@ test('client RequestUtil', (t) => {
6058
device: {name}
6159
}
6260
timekeeper.freeze(1480000000 * 1000)
63-
requestUtil.put(proto.categories.PREFERENCES, encrypt(record))
61+
requestUtil.put(proto.categories.PREFERENCES, record)
6462
.then((response) => {
6563
timekeeper.reset()
6664
t.pass(`${t.name} resolves`)
@@ -102,7 +100,7 @@ test('client RequestUtil', (t) => {
102100
objectId,
103101
device: {name}
104102
}
105-
const putRequest = requestUtil.put(proto.categories.PREFERENCES, encrypt(record))
103+
const putRequest = requestUtil.put(proto.categories.PREFERENCES, record)
106104
timekeeper.reset()
107105
return putRequest
108106
}
@@ -156,7 +154,7 @@ test('client RequestUtil', (t) => {
156154
t.test('#put history site: large URL (multipart)', (t) => {
157155
t.plan(2)
158156
timekeeper.freeze(1480000000 * 1000)
159-
requestUtil.put(proto.categories.HISTORY_SITES, encrypt(record))
157+
requestUtil.put(proto.categories.HISTORY_SITES, record)
160158
.then((response) => {
161159
timekeeper.reset()
162160
t.pass(`${t.name} resolves`)
@@ -236,8 +234,8 @@ test('client RequestUtil', (t) => {
236234
}
237235

238236
Promise.all([
239-
requestUtil.put(proto.categories.PREFERENCES, encrypt(deviceRecord)),
240-
requestUtil.put(proto.categories.PREFERENCES, encrypt(siteSettingRecord))
237+
requestUtil.put(proto.categories.PREFERENCES, deviceRecord),
238+
requestUtil.put(proto.categories.PREFERENCES, siteSettingRecord)
241239
])
242240
.then(() => {
243241
requestUtil.deleteSiteSettings()
@@ -268,7 +266,7 @@ test('client RequestUtil', (t) => {
268266
siteSetting: {hostPattern: 'https://google.com', shieldsUp: true}
269267
}
270268

271-
requestUtil.put(proto.categories.PREFERENCES, encrypt(siteSettingRecord))
269+
requestUtil.put(proto.categories.PREFERENCES, siteSettingRecord)
272270
.then(() => {
273271
requestUtil.list(proto.categories.PREFERENCES, 0)
274272
.then((s3Objects) => {
@@ -365,7 +363,7 @@ test('client RequestUtil', (t) => {
365363
device: {name: 'sweet'}
366364
}
367365
const requestUtil = new RequestUtil(expiredArgs)
368-
requestUtil.put(proto.categories.PREFERENCES, encrypt(record))
366+
requestUtil.put(proto.categories.PREFERENCES, record)
369367
.then((response) => {
370368
t.pass(t.name)
371369
testCredentialRefreshDelete(t)

test/promiseHelper.js

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
const test = require('tape')
2+
const promiseHelper = require('../lib/promiseHelper')
3+
4+
test('promiseHelper', (t) => {
5+
t.plan(1)
6+
7+
t.test('limitConcurrency', (t) => {
8+
t.plan(1)
9+
10+
t.test('calls the original function the same number of times with correct args', (t) => {
11+
t.plan(2)
12+
const EXPECTED_CALL_COUNT = 100
13+
let callCount = 0
14+
const asyncFun = (i) => new Promise((resolve, reject) => {
15+
setTimeout(() => {
16+
callCount += 1
17+
resolve(i)
18+
}, Math.round(10 * Math.random()))
19+
})
20+
const throttedAsyncFun = promiseHelper.limitConcurrency(asyncFun, 3)
21+
const promises = []
22+
let expectedSum = 0
23+
for (let i = 0; i < EXPECTED_CALL_COUNT; i++) {
24+
promises.push(throttedAsyncFun(i))
25+
expectedSum += i
26+
}
27+
Promise.all(promises).then((results) => {
28+
const sum = results.reduce((a, b) => a + b)
29+
t.equal(callCount, EXPECTED_CALL_COUNT)
30+
t.equal(sum, expectedSum)
31+
})
32+
})
33+
})
34+
})

0 commit comments

Comments
 (0)