Skip to content

Commit

Permalink
Merge pull request lukasmartinelli#53 from evidnet/master
Browse files Browse the repository at this point in the history
Add null-delimiter for CSV file's parse options.
  • Loading branch information
lukasmartinelli authored Oct 20, 2018
2 parents 1af4d3c + cddfc8a commit 41bafc5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 29 deletions.
35 changes: 19 additions & 16 deletions csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"strings"
"unicode/utf8"

"github.com/cheggaaa/pb"
csv "github.com/JensRantil/go-csv"
"github.com/cheggaaa/pb"
)

func containsDelimiter(col string) bool {
Expand Down Expand Up @@ -57,7 +57,8 @@ func parseColumns(reader *csv.Reader, skipHeader bool, fields string) ([]string,

for _, col := range columns {
if containsDelimiter(col) {
return columns, errors.New("Please specify the correct delimiter with -d.\nHeader column contains a delimiter character: " + col)
return columns, errors.New("Please specify the correct delimiter with -d.\n" +
"Header column contains a delimiter character: " + col)
}
}

Expand All @@ -68,7 +69,8 @@ func parseColumns(reader *csv.Reader, skipHeader bool, fields string) ([]string,
return columns, nil
}

func copyCSVRows(i *Import, reader *csv.Reader, ignoreErrors bool, delimiter string, columns []string) (error, int, int) {
func copyCSVRows(i *Import, reader *csv.Reader, ignoreErrors bool,
delimiter string, columns []string, nullDelimiter string) (error, int, int) {
success := 0
failed := 0

Expand All @@ -88,7 +90,7 @@ func copyCSVRows(i *Import, reader *csv.Reader, ignoreErrors bool, delimiter str
os.Stderr.WriteString(string(line))
continue
} else {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
err = fmt.Errorf("%s: %s", err, line)
return err, success, failed
}
}
Expand All @@ -101,7 +103,7 @@ func copyCSVRows(i *Import, reader *csv.Reader, ignoreErrors bool, delimiter str
// cols[i] = col
}

err = i.AddRow(cols...)
err = i.AddRow(nullDelimiter, cols...)

if err != nil {
line := strings.Join(record, delimiter)
Expand All @@ -111,7 +113,7 @@ func copyCSVRows(i *Import, reader *csv.Reader, ignoreErrors bool, delimiter str
os.Stderr.WriteString(string(line))
continue
} else {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
err = fmt.Errorf("%s: %s", err, line)
return err, success, failed
}
}
Expand All @@ -122,7 +124,8 @@ func copyCSVRows(i *Import, reader *csv.Reader, ignoreErrors bool, delimiter str
return nil, success, failed
}

func importCSV(filename string, connStr string, schema string, tableName string, ignoreErrors bool, skipHeader bool, fields string, delimiter string, excel bool) error {
func importCSV(filename string, connStr string, schema string, tableName string, ignoreErrors bool,
skipHeader bool, fields string, delimiter string, excel bool, nullDelimiter string) error {

db, err := connect(connStr, schema)
if err != nil {
Expand Down Expand Up @@ -169,25 +172,25 @@ func importCSV(filename string, connStr string, schema string, tableName string,
var success, failed int
if filename != "" {
bar.Start()
err, success, failed = copyCSVRows(i, reader, ignoreErrors, delimiter, columns)
err, success, failed = copyCSVRows(i, reader, ignoreErrors, delimiter, columns, nullDelimiter)
bar.Finish()
} else {
err, success, failed = copyCSVRows(i, reader, ignoreErrors, delimiter, columns)
err, success, failed = copyCSVRows(i, reader, ignoreErrors, delimiter, columns, nullDelimiter)
}

if err != nil {
lineNumber := success + failed
if !skipHeader {
lineNumber++
}
return errors.New(fmt.Sprintf("line %d: %s", lineNumber, err))
} else {
fmt.Println(fmt.Sprintf("%d rows imported into %s.%s", success, schema, tableName))
return fmt.Errorf("line %d: %s", lineNumber, err)
}

if ignoreErrors && failed > 0 {
fmt.Println(fmt.Sprintf("%d rows could not be imported into %s.%s and have been written to stderr.", failed, schema, tableName))
}
fmt.Println(fmt.Sprintf("%d rows imported into %s.%s", success, schema, tableName))

return i.Commit()
if ignoreErrors && failed > 0 {
fmt.Println(fmt.Sprintf("%d rows could not be imported into %s.%s and have been written to stderr.", failed, schema, tableName))
}

return i.Commit()
}
10 changes: 9 additions & 1 deletion import.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ func newImport(db *sql.DB, schema string, tableName string, columns []string) (*
return &Import{txn, stmt}, nil
}

func (i *Import) AddRow(columns ...interface{}) error {
func (i *Import) AddRow(nullDelimiter string, columns ...interface{}) error {
for index := range columns {
column := columns[index]

if column == nullDelimiter {
columns[index] = nil
}
}

_, err := i.stmt.Exec(columns...)
return err
}
Expand Down
21 changes: 10 additions & 11 deletions json.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -32,7 +31,7 @@ func copyJSONRows(i *Import, reader *bufio.Reader, ignoreErrors bool) (error, in
}

if err != nil {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
err = fmt.Errorf("%s: %s", err, line)
return err, success, failed
}

Expand All @@ -43,7 +42,7 @@ func copyJSONRows(i *Import, reader *bufio.Reader, ignoreErrors bool) (error, in
os.Stderr.WriteString(string(line))
continue
} else {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
err = fmt.Errorf("%s: %s", err, line)
return err, success, failed
}
}
Expand All @@ -55,7 +54,7 @@ func copyJSONRows(i *Import, reader *bufio.Reader, ignoreErrors bool) (error, in
os.Stderr.WriteString(string(line))
continue
} else {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
err = fmt.Errorf("%s: %s", err, line)
return err, success, failed
}
}
Expand Down Expand Up @@ -99,14 +98,14 @@ func importJSON(filename string, connStr string, schema string, tableName string

if err != nil {
lineNumber := success + failed
return errors.New(fmt.Sprintf("line %d: %s", lineNumber, err))
} else {
fmt.Println(fmt.Sprintf("%d rows imported into %s.%s", success, schema, tableName))
return fmt.Errorf("line %d: %s", lineNumber, err)
}

if ignoreErrors && failed > 0 {
fmt.Println(fmt.Sprintf("%d rows could not be imported into %s.%s and have been written to stderr.", failed, schema, tableName))
}
fmt.Println(fmt.Sprintf("%d rows imported into %s.%s", success, schema, tableName))

return i.Commit()
if ignoreErrors && failed > 0 {
fmt.Println(fmt.Sprintf("%d rows could not be imported into %s.%s and have been written to stderr.", failed, schema, tableName))
}

return i.Commit()
}
8 changes: 7 additions & 1 deletion pgfutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func main() {
Value: ",",
Usage: "field delimiter",
},
cli.StringFlag{
Name: "null-delimiter, nd",
Value: "\\N",
Usage: "null delimiter",
},
cli.BoolFlag{
Name: "skip-parse-delimiter",
Usage: "skip parsing escape sequences in the given delimiter",
Expand All @@ -157,11 +162,12 @@ func main() {

skipHeader := c.Bool("skip-header")
fields := c.String("fields")
nullDelimiter := c.String("null-delimiter")
skipParseheader := c.Bool("skip-parse-delimiter")
excel := c.Bool("excel")
delimiter := parseDelimiter(c.String("delimiter"), skipParseheader)
connStr := parseConnStr(c)
err := importCSV(filename, connStr, schema, tableName, ignoreErrors, skipHeader, fields, delimiter, excel)
err := importCSV(filename, connStr, schema, tableName, ignoreErrors, skipHeader, fields, delimiter, excel, nullDelimiter)
return err
},
},
Expand Down

0 comments on commit 41bafc5

Please sign in to comment.