Skip to content

Commit

Permalink
Add error printing stubs (#80)
Browse files Browse the repository at this point in the history
* Add error printing stubs

* Add logging for other components
  • Loading branch information
zxia-wish authored Jul 30, 2021
1 parent b38719b commit c8ae3d2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
4 changes: 4 additions & 0 deletions cmd/eventmaster/eventmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
)

func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
var config em.Flags
parser := flags.NewParser(&config, flags.Default)
a, err := parser.Parse()
if err != nil {
log.Error(err)
os.Exit(1)
}

Expand Down
4 changes: 3 additions & 1 deletion jh/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package jh

import (
"encoding/json"
"log"
"net/http"

log "github.com/sirupsen/logrus"

"github.com/julienschmidt/httprouter"
)

Expand All @@ -27,6 +28,7 @@ func Adapter(jh JSONHandler) httprouter.Handle {

r, err := jh(w, req, ps)
if err != nil {
log.Error(err)
status := http.StatusInternalServerError
switch err := err.(type) {
case Error:
Expand Down
35 changes: 34 additions & 1 deletion postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ func getPasswordFromVault(c VaultConfig) (*string, error) {
}
client, err := vaultApi.NewClient(config)
if err != nil {
log.Errorf(err.Error())
return nil, err
}

client.SetToken(c.Token)
secret, err := client.Logical().Read(c.Path)
if err != nil {
log.Errorf(err.Error())
return nil, err
}

Expand All @@ -71,6 +73,7 @@ func getPassword(c PostgresConfig) (*string, error) {
} else {
password, err = getPasswordFromVault(c.Vault)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
}
Expand All @@ -88,17 +91,20 @@ func NewPostgresStore(c PostgresConfig) (*PostgresStore, error) {
log.Infof("Connecting to postgres: %v", host)
password, err := getPassword(c)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
"password=%s dbname=%s sslmode=disable",
host, c.Port, c.Username, *password, c.Database)
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
log.Errorf(err.Error())
return nil, errors.Wrap(err, "Error creating postgres session")
}
err = db.Ping()
if err != nil {
log.Errorf(err.Error())
return nil, errors.Wrap(err, "Error creating postgres session")
}
log.Infof("Successfully connected to postgres %s", host)
Expand All @@ -114,13 +120,15 @@ func (p *PostgresStore) AddEvent(e *Event) error {
if e.Data != nil {
dataBytes, err := json.Marshal(e.Data)
if err != nil {
log.Errorf(err.Error())
return err
}
data = string(dataBytes)
}

tx, err := p.db.Begin()
if err != nil {
log.Errorf(err.Error())
return err
}

Expand All @@ -131,6 +139,7 @@ func (p *PostgresStore) AddEvent(e *Event) error {
e.EventID, e.ParentEventID, e.DCID, e.TopicID, e.Host, pq.Array(e.TargetHosts), e.User, e.EventTime/1000, pq.Array(e.Tags), e.ReceivedTime/1000)

if err != nil {
log.Errorf(err.Error())
rollBackErr := tx.Rollback()
if rollBackErr != nil {
return errors.Wrap(err, "Rollback failure")
Expand All @@ -140,6 +149,7 @@ func (p *PostgresStore) AddEvent(e *Event) error {

_, err = tx.Exec("INSERT INTO event_metadata (event_id, data_json) VALUES ($1, $2)", e.EventID, data)
if err != nil {
log.Errorf(err.Error())
rollBackErr := tx.Rollback()
if rollBackErr != nil {
return errors.Wrap(err, "Rollback failure")
Expand Down Expand Up @@ -260,12 +270,14 @@ func (p *PostgresStore) Find(q *eventmaster.Query, topicIDs []string, dcIDs []st

rows, err := query.RunWith(p.db).Query()
if err != nil {
log.Errorf(err.Error())
return nil, err
}

for rows.Next() {
err = rows.Scan(fields...)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
event := &Event{
Expand All @@ -282,6 +294,7 @@ func (p *PostgresStore) Find(q *eventmaster.Query, topicIDs []string, dcIDs []st
}
err = json.Unmarshal([]byte(data), &event.Data)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
events = append(events, event)
Expand All @@ -297,6 +310,7 @@ func (p *PostgresStore) FindByID(id string, inclData bool) (*Event, error) {
"FROM event WHERE event_id=$1 LIMIT 1", id)

if err != nil {
log.Errorf(err.Error())
return nil, err
}

Expand All @@ -307,6 +321,7 @@ func (p *PostgresStore) FindByID(id string, inclData bool) (*Event, error) {
&event.TopicID, &event.Host, pq.Array(&event.TargetHosts), &event.User,
&eventTime, pq.Array(&event.Tags), &receivedTime)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
event.ReceivedTime = receivedTime.Unix()
Expand All @@ -316,22 +331,26 @@ func (p *PostgresStore) FindByID(id string, inclData bool) (*Event, error) {
}

if err != nil {
log.Errorf(err.Error())
return nil, err
}

if inclData {
rows, err = p.db.Query("SELECT data_json FROM event_metadata WHERE event_id=$1 LIMIT 1", id)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
var data []byte
if rows.Next() {
err = rows.Scan(&data)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
err = json.Unmarshal(data, &event.Data)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
} else {
Expand All @@ -356,17 +375,20 @@ func (p *PostgresStore) FindIDs(query *eventmaster.TimeQuery, handle HandleEvent
" AND event_time <= to_timestamp($2) AT TIME ZONE 'UTC' ORDER BY event_time "+
order+" LIMIT $3 ", query.StartEventTime, query.EndEventTime, query.Limit)
if err != nil {
log.Errorf(err.Error())
return err
}

for rows.Next() {
var id string
err = rows.Scan(&id)
if err != nil {
log.Errorf(err.Error())
return err
}
err = handle(id)
if err != nil {
log.Errorf(err.Error())
rows.Close()
return err
}
Expand All @@ -378,6 +400,7 @@ func (p *PostgresStore) FindIDs(query *eventmaster.TimeQuery, handle HandleEvent
func (p *PostgresStore) GetTopics() ([]Topic, error) {
rows, err := p.db.Query("SELECT topic_id, topic_name, data_schema FROM event_topic")
if err != nil {
log.Errorf(err.Error())
return nil, err
}
defer rows.Close()
Expand All @@ -388,11 +411,13 @@ func (p *PostgresStore) GetTopics() ([]Topic, error) {
for rows.Next() {
err = rows.Scan(&id, &name, &schema)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
var s map[string]interface{}
err := json.Unmarshal([]byte(schema), &s)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
topics = append(topics, Topic{
Expand Down Expand Up @@ -427,6 +452,7 @@ func (p *PostgresStore) DeleteTopic(id string) error {
func (p *PostgresStore) GetDCs() ([]DC, error) {
rows, err := p.db.Query("SELECT dc_id, dc from event_dc")
if err != nil {
log.Errorf(err.Error())
return nil, err
}
defer rows.Close()
Expand All @@ -436,6 +462,7 @@ func (p *PostgresStore) GetDCs() ([]DC, error) {
for rows.Next() {
err = rows.Scan(&dc_id, &dc)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
dcs = append(dcs, DC{
Expand All @@ -450,10 +477,12 @@ func (p *PostgresStore) GetDCs() ([]DC, error) {
func (p *PostgresStore) AddDC(dc DC) error {
stmt, err := p.db.Prepare("INSERT INTO event_dc (dc_id, dc) VALUES ($1, $2)")
if err != nil {
log.Errorf(err.Error())
return errors.Wrap(err, "Failed preparing insert DC statement")
}
_, err = stmt.Exec(dc.ID, dc.Name)
if err != nil {
log.Errorf(err.Error())
return err
}

Expand All @@ -463,7 +492,11 @@ func (p *PostgresStore) AddDC(dc DC) error {
// Update datacenter with given ID
func (p *PostgresStore) UpdateDC(id string, newName string) error {
_, err := p.db.Exec("UPDATE event_dc SET dc=$1 WHERE dc_id=$2", newName, id)
return err
if err != nil {
log.Errorf(err.Error())
return err
}
return nil
}

// Close the database session
Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func registerRoutes(srv *Server) http.Handler {

func latency(prefix string, h httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
log.Infof("request received for %s", prefix)
start := time.Now()
defer func() {
metrics.HTTPLatency(prefix, start)
Expand Down

0 comments on commit c8ae3d2

Please sign in to comment.