Skip to content
This repository was archived by the owner on Dec 6, 2018. It is now read-only.

fix prefix in streams by using a streams2 transform. #43

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"level-hooks": ">=4.4.0 <5",
"string-range": "~1.2.1",
"level-fix-range": "2.0",
"xtend": "~2.0.4"
"xtend": "~2.0.4",
"readable-stream": "~1.1.9"
},
"devDependencies": {
"rimraf": "~2.1.4",
Expand All @@ -23,7 +24,8 @@
"stream-to-pull-stream": "~1.2.0",
"tape": "~1.0.4",
"through": "~2.3.4",
"level": "~0.15.0"
"level": "~0.15.0",
"stream-to": "~0.3.1"
},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done"
Expand Down
48 changes: 15 additions & 33 deletions sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var inherits = require('util').inherits
var ranges = require('string-range')
var fixRange = require('level-fix-range')
var xtend = require('xtend')
var Transform = require('readable-stream').Transform

inherits(SubDB, EventEmitter)

Expand Down Expand Up @@ -156,44 +157,25 @@ SDB.createReadStream = function (opts) {
selectivelyMerge(_opts, xtend(opts, this._options))

var s = r.createReadStream(_opts)
if (_opts.keys === false) return s

var tr = Transform({ objectMode: true })
s.on('error', tr.emit.bind(tr, 'error'))
s.pipe(tr)

if(_opts.values === false) {
var read = s.read
if (read) {
s.read = function (size) {
var val = read.call(this, size)
if (val) val = val.substring(p.length)
return val
}
} else {
var emit = s.emit
s.emit = function (event, val) {
if(event === 'data') {
emit.call(this, 'data', val.substring(p.length))
} else
emit.call(this, event, val)
}
tr._transform = function (key, enc, done) {
key = key.substring(p.length)
done(null, key)
}
return s
} else if(_opts.keys === false)
return s
else {
var read = s.read
if (read) {
s.read = function (size) {
var d = read.call(this, size)
if (d) d.key = d.key.substring(p.length)
return d
}
} else {
s.on('data', function (d) {
//mutate the prefix!
//this doesn't work for createKeyStream admittedly.
d.key = d.key.substring(p.length)
})
} else {
tr._transform = function (data, enc, done) {
data.key = data.key.substring(p.length)
done(null, data)
}
return s
}

return tr
}


Expand Down
34 changes: 16 additions & 18 deletions test/key-value-stream.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

var pl = require('pull-level')
var pull = require('pull-stream')
var toPull = require('stream-to-pull-stream')
var pl = require('pull-level')
var pull = require('pull-stream')
var streamTo = require('stream-to');

var level = require('level-test')()
var sublevel = require('../')
Expand All @@ -21,25 +21,23 @@ tape('keys', function (t) {
throw err
}

toPull(db.createKeyStream())
.pipe(pull.collect(function (err, ary) {
streamTo.array(db.createKeyStream(), function (err, ary) {
console.log(ary)
ary.forEach(function (e) {
t.equal(typeof e, 'string')
t.ok(/^key_/.test(e))
})
streamTo.array(db.createValueStream(), function (err, ary) {
console.log(ary)
ary.forEach(function (e) {
t.equal(typeof e, 'string')
t.ok(/^key_/.test(e))
t.ok(/^value_/.test(e))
console.log(e)
})
toPull(db.createValueStream())
.pipe(pull.collect(function (err, ary) {
console.log(ary)
ary.forEach(function (e) {
t.equal(typeof e, 'string')
t.ok(/^value_/.test(e))
console.log(e)
})

t.end()
}))
}))

t.end()
})
})
}))
})