Skip to content

Commit

Permalink
json import now uses new import type
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmartinelli committed Aug 19, 2015
1 parent 16badd5 commit 62909e1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 40 deletions.
2 changes: 1 addition & 1 deletion import.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newImport(db *sql.DB, schema string, tableName string, columns []string) (*
}

func (i *Import) AddRow(columns ...interface{}) error {
_, err := i.stmt.Exec(columns)
_, err := i.stmt.Exec(columns...)
return err
}

Expand Down
71 changes: 32 additions & 39 deletions json.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/cheggaaa/pb"
"github.com/codegangsta/cli"
"github.com/lib/pq"
)

func isValidJSON(b []byte) bool {
Expand All @@ -20,6 +19,21 @@ func isValidJSON(b []byte) bool {
return err == nil
}

// NewProgressBar initializes new progress bar based on size of file
func NewProgressBar(file *os.File) *pb.ProgressBar {
fi, err := file.Stat()

total := int64(0)
if err == nil {
total = fi.Size()
}

bar := pb.New64(total)
bar.SetUnits(pb.U_BYTES)
bar.Start()
return bar
}

func importJSON(c *cli.Context) {
cli.CommandHelpTemplate = strings.Replace(cli.CommandHelpTemplate, "[arguments...]", "<json-file>", -1)

Expand All @@ -32,49 +46,37 @@ func importJSON(c *cli.Context) {
schema := c.GlobalString("schema")
tableName := parseTableName(c, filename)

db, err := connect(parseConnStr(c), schema)
failOnError(err, "Could not connect to db")
defer db.Close()

columns := []string{"data"}
table, err := createJSONTable(db, schema, tableName, columns[0])
failOnError(err, "Could not create table statement")

_, err = table.Exec()
failOnError(err, "Could not create table")

txn, err := db.Begin()
failOnError(err, "Could not start transaction")

stmt, err := txn.Prepare(pq.CopyInSchema(schema, tableName, columns...))
failOnError(err, "Could not prepare copy in statement")

file, err := os.Open(filename)
failOnError(err, "Cannot open file")
defer file.Close()

fi, err := file.Stat()
failOnError(err, "Could not find out file size of file")
total := fi.Size()
bar := pb.New64(total)
bar.SetUnits(pb.U_BYTES)
bar.Start()
db, err := connect(parseConnStr(c), schema)
failOnError(err, "Could not connect to db")
defer db.Close()

successCount := 0
failCount := 0
success := 0
failed := 0
bar := NewProgressBar(file)

i, err := NewJSONImport(db, schema, tableName, "data")

reader := bufio.NewReader(io.TeeReader(file, bar))
for {
// We use ReadBytes because it can deal with very long lines
// which happens often with big JSON objects
line, err := reader.ReadBytes('\n')

if err == io.EOF {
err = nil
break
}

//todo: Better error handling so that db can close
failOnError(err, "Could not read line")

//todo: not so happy with this part
handleError := func() {
failCount++
failed++
if c.GlobalBool("ignore-errors") {
os.Stderr.WriteString(string(line))
} else {
Expand All @@ -88,24 +90,15 @@ func importJSON(c *cli.Context) {
handleError()
}

_, err = stmt.Exec(string(line))
err = i.AddRow(string(line))
if err != nil {
handleError()
} else {
successCount++
success++
}

failOnError(err, "Could add bulk insert")
}

_, err = stmt.Exec()
failOnError(err, "Could not exec the bulk copy")

err = stmt.Close()
failOnError(err, "Could not close")

err = txn.Commit()
failOnError(err, "Could not commit transaction")

i.Commit()
bar.Finish()
}

0 comments on commit 62909e1

Please sign in to comment.