From 72608afadc34356811da2981aa48a618c2691d79 Mon Sep 17 00:00:00 2001 From: Lukas Martinelli Date: Wed, 19 Aug 2015 14:00:37 +0200 Subject: [PATCH] Now using shared import code --- csv.go | 76 +++++++++++++++++++++++++++------------------------------ json.go | 7 ++++-- 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/csv.go b/csv.go index 569120d..e40be94 100644 --- a/csv.go +++ b/csv.go @@ -4,12 +4,11 @@ import ( "encoding/csv" "fmt" "io" + "log" "os" "strings" - "github.com/cheggaaa/pb" "github.com/codegangsta/cli" - "github.com/lib/pq" ) func parseColumns(c *cli.Context, reader *csv.Reader) []string { @@ -41,20 +40,17 @@ func importCsv(c *cli.Context) { schema := c.GlobalString("schema") tableName := parseTableName(c, filename) - db, err := connect(parseConnStr(c), schema) - failOnError(err, "Could not connect to db") - defer db.Close() - file, err := os.Open(filename) failOnError(err, "Cannot open file") defer file.Close() - fi, err := file.Stat() - failOnError(err, "Could not find out file size of file") - total := fi.Size() - bar := pb.New64(total) - bar.SetUnits(pb.U_BYTES) - bar.Start() + db, err := connect(parseConnStr(c), schema) + failOnError(err, "Could not connect to db") + defer db.Close() + + success := 0 + failed := 0 + bar := NewProgressBar(file) reader := csv.NewReader(io.TeeReader(file, bar)) reader.Comma = rune(c.String("delimiter")[0]) @@ -64,24 +60,15 @@ func importCsv(c *cli.Context) { columns := parseColumns(c, reader) reader.FieldsPerRecord = len(columns) - table, err := createTable(db, schema, tableName, columns) - failOnError(err, "Could not create table statement") - - _, err = table.Exec() - failOnError(err, "Could not create table") - - txn, err := db.Begin() - failOnError(err, "Could not start transaction") - - stmt, err := txn.Prepare(pq.CopyInSchema(schema, tableName, columns...)) - failOnError(err, "Could not prepare copy in statement") - - successCount := 0 - failCount := 0 + i, err := NewCSVImport(db, schema, tableName, columns) + failOnError(err, "Could not prepare import") for { cols := make([]interface{}, len(columns)) record, err := reader.Read() + + //Loop ensures we don't insert too many values and that + //values are properly converted into empty interfaces for i, col := range record { cols[i] = col } @@ -89,21 +76,32 @@ func importCsv(c *cli.Context) { if err == io.EOF { break } - failOnError(err, "Could not read csv") - _, err = stmt.Exec(cols...) - failOnError(err, "Could add bulk insert") - successCount++ - } - _, err = stmt.Exec() - failOnError(err, "Could not exec the bulk copy") + //Todo: better error handling + failOnError(err, "Could not read csv") - err = stmt.Close() - failOnError(err, "Could not close") + err = i.AddRow(cols...) + if err != nil { + failed++ + line := strings.Join(record, c.GlobalString("delimiter")) + + if c.GlobalBool("ignore-errors") { + os.Stderr.WriteString(line) + } else { + msg := fmt.Sprintf("Could not import %s: %s", err, line) + log.Fatalln(msg) + panic(msg) + } + } else { + success++ + } + } - err = txn.Commit() - failOnError(err, "Could not commit transaction") + err = i.Commit() + failOnError(err, "Could not commit") + bar.Finish() + //refactore whole reporting stuff //print report fmt.Println(fmt.Sprintf("Successfully copied %d rows into %s")) @@ -113,9 +111,7 @@ func importCsv(c *cli.Context) { fmt.Println(fmt.Sprintf("%d rows could not be imported into %s.")) } - if failCount > 0 && c.GlobalBool("ignore-errors") { + if failed > 0 && c.GlobalBool("ignore-errors") { fmt.Println("You can specify the --ignore-errors flag to write errors to stderr, instead of aborting the transcation.") } - - bar.Finish() } diff --git a/json.go b/json.go index 009bf9b..aea0881 100644 --- a/json.go +++ b/json.go @@ -59,8 +59,10 @@ func importJSON(c *cli.Context) { bar := NewProgressBar(file) i, err := NewJSONImport(db, schema, tableName, "data") + failOnError(err, "Could not prepare import") reader := bufio.NewReader(io.TeeReader(file, bar)) + for { // We use ReadBytes because it can deal with very long lines // which happens often with big JSON objects @@ -74,7 +76,6 @@ func importJSON(c *cli.Context) { //todo: Better error handling so that db can close failOnError(err, "Could not read line") - //todo: not so happy with this part handleError := func() { failed++ if c.GlobalBool("ignore-errors") { @@ -99,6 +100,8 @@ func importJSON(c *cli.Context) { } - i.Commit() + // handle error + err = i.Commit() + failOnError(err, "Could not commit") bar.Finish() }