Skip to content

Commit

Permalink
importinto/lightning: check max row size when parsing csv to avoid OOM (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 12, 2025
1 parent cc73097 commit 9c5c1f7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 15 deletions.
8 changes: 7 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
var buf []byte
for {
buf = append(buf, parser.buf...)
if len(buf) > LargestEntryLimit {
if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) {
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
}
parser.buf = nil
Expand Down Expand Up @@ -513,6 +513,8 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error {

// ReadRow reads a row from the datafile.
func (parser *CSVParser) ReadRow() error {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
row := &parser.lastRow
row.Length = 0
row.RowID++
Expand Down Expand Up @@ -563,6 +565,8 @@ func (parser *CSVParser) ReadRow() error {

// ReadColumns reads the columns of this CSV file.
func (parser *CSVParser) ReadColumns() error {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
columns, err := parser.readRecord(nil)
if err != nil {
return errors.Trace(err)
Expand All @@ -582,6 +586,8 @@ func (parser *CSVParser) ReadColumns() error {
// returns the file offset beyond the terminator.
// This function is used in strict-format dividing a CSV file.
func (parser *CSVParser) ReadUntilTerminator() (int64, error) {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
for {
_, firstByte, err := parser.readUntil(&parser.newLineByteSet)
if err != nil {
Expand Down
57 changes: 43 additions & 14 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,20 +688,49 @@ func TestTooLargeRow(t *testing.T) {
Delimiter: `"`,
},
}
var testCase bytes.Buffer
testCase.WriteString("a,b,c,d")
// WARN: will take up 10KB memory here.
mydump.LargestEntryLimit = 10 * 1024
for i := 0; i < mydump.LargestEntryLimit; i++ {
testCase.WriteByte('d')
}
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
bak := mydump.LargestEntryLimit
t.Cleanup(func() {
mydump.LargestEntryLimit = bak
})
mydump.LargestEntryLimit = 1024
t.Run("too long field", func(t *testing.T) {
var dataBuf bytes.Buffer
dataBuf.WriteString("a,b,c,d")
for i := 0; i < mydump.LargestEntryLimit; i++ {
dataBuf.WriteByte('d')
}
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
})

t.Run("field is short, but whole row too long", func(t *testing.T) {
var dataBuf bytes.Buffer
for i := 0; i < 16; i++ {
if i > 0 {
dataBuf.WriteByte(',')
}
for j := 0; j < mydump.LargestEntryLimit/16; j++ {
dataBuf.WriteByte('d')
}
}
for i := 0; i < mydump.LargestEntryLimit-dataBuf.Len()+16; i++ {
dataBuf.WriteByte('d')
}
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
})
}

func TestSpecialChars(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ type blockParser struct {

// cache
remainBuf *bytes.Buffer
// holds the cached parsable data after last readBlock. all data inside is
// unparsed at the moment of the return of last readBlock, for current unparsed
// data, use buf.
appendBuf *bytes.Buffer

// the Logger associated with this parser for reporting failure
Logger log.Logger
metrics *metric.Metrics

checkRowLen bool
rowStartPos int64
}

func makeBlockParser(
Expand Down Expand Up @@ -164,6 +170,15 @@ func NewChunkParser(
}
}

func (parser *blockParser) beginRowLenCheck() {
parser.checkRowLen = true
parser.rowStartPos = parser.pos
}

func (parser *blockParser) endRowLenCheck() {
parser.checkRowLen = false
}

// SetPos changes the reported position and row ID.
func (parser *blockParser) SetPos(pos int64, rowID int64) error {
p, err := parser.reader.Seek(pos, io.SeekStart)
Expand Down

0 comments on commit 9c5c1f7

Please sign in to comment.