Skip to content

Commit

Permalink
Multiple fixes (#73)
Browse files Browse the repository at this point in the history
Fixes #71 
Fixes #68
Fixes #66
Fixes #64
  • Loading branch information
nrvnrvn authored Jun 5, 2017
1 parent 1ebe2cc commit c0f7063
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 27 deletions.
2 changes: 2 additions & 0 deletions beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func (jb *Journalbeat) Run(b *beat.Beat) error {
event["type"] = jb.config.DefaultType
event["input_type"] = jb.config.DefaultType
event["@timestamp"] = common.Time(time.Unix(0, int64(rawEvent.RealtimeTimestamp)*1000))
// add _REALTIME_TIMESTAMP until https://github.com/elastic/elasticsearch/issues/12829 is closed
event["@realtime_timestamp"] = int64(rawEvent.RealtimeTimestamp)

ref := &eventReference{rawEvent.Cursor, event}
if jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed) {
Expand Down
60 changes: 35 additions & 25 deletions beater/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package beater

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -70,18 +72,18 @@ func (jb *Journalbeat) managePendingQueueLoop() {

// flush saves the map[string]common.MapStr to the JSON file on disk
flush := func(source map[string]common.MapStr, dest string) error {
file, err := ioutil.TempFile("", "journalbeat-pending-queue")
tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
if err != nil {
return err
}

if err = json.NewEncoder(file).Encode(source); err != nil {
_ = file.Close()
if err = json.NewEncoder(tempFile).Encode(source); err != nil {
_ = tempFile.Close()
return err
}

_ = file.Close()
return os.Rename(file.Name(), dest)
_ = tempFile.Close()
return os.Rename(tempFile.Name(), dest)
}

// load loads the map[string]common.MapStr from the JSON file on disk
Expand All @@ -99,12 +101,14 @@ func (jb *Journalbeat) managePendingQueueLoop() {
defer func() {
var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for evRef := range jb.pending {
pending[evRef.cursor] = evRef.body
}
}()

go func() {
defer wg.Done()
for evRef := range jb.completed {
Expand Down Expand Up @@ -134,10 +138,14 @@ func (jb *Journalbeat) managePendingQueueLoop() {
select {
case <-jb.done:
return
case p := <-jb.pending:
pending[p.cursor] = p.body
case c := <-jb.completed:
completed[c.cursor] = c.body
case p, ok := <-jb.pending:
if ok {
pending[p.cursor] = p.body
}
case c, ok := <-jb.completed:
if ok {
completed[c.cursor] = c.body
}
case <-tick:
result := diff(pending, completed)
if err := flush(result, jb.config.PendingQueue.File); err != nil {
Expand All @@ -156,23 +164,25 @@ func (jb *Journalbeat) writeCursorLoop() {

var cursor string
saveCursorState := func(cursor string) {
if cursor != "" {
file, err := ioutil.TempFile("", "journalbeat-cursor")
if err != nil {
logp.Err("Could not create cursor state file: %v", err)
return
}
if cursor == "" {
return
}

if _, err = file.WriteString(cursor); err != nil {
_ = file.Close()
logp.Err("Could not write to cursor state file: %v", err)
return
}
_ = file.Close()
if err := os.Rename(file.Name(), jb.config.CursorStateFile); err != nil {
logp.Err("Could not save cursor state file: %v", err)
return
}
tempFile, err := ioutil.TempFile(filepath.Dir(jb.config.CursorStateFile), fmt.Sprintf(".%s", filepath.Base(jb.config.CursorStateFile)))
if err != nil {
logp.Err("Could not create cursor state file: %v", err)
return
}

if _, err = tempFile.WriteString(cursor); err != nil {
_ = tempFile.Close()
logp.Err("Could not write to cursor state file: %v, cursor: %s", err, cursor)
return
}
_ = tempFile.Close()
if err := os.Rename(tempFile.Name(), jb.config.CursorStateFile); err != nil {
logp.Err("Could not save cursor to the state file: %v, cursor: %s", err, cursor)
return
}
}

Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config

import (
"fmt"
"path/filepath"
"regexp"
"time"
)
Expand Down Expand Up @@ -93,5 +94,15 @@ func (config *Config) Validate() error {
if _, ok := seekFallbackPositions[config.CursorSeekFallback]; !ok {
return fmt.Errorf("Invalid Cursor Seek Fallback Position: %v. Should be %s, %s or %s", config.SeekPosition, SeekPositionTail, SeekPositionHead, SeekPositionDefault)
}
if fp, err := filepath.Abs(config.PendingQueue.File); err != nil {
return fmt.Errorf("Invalid path %s: %v", config.PendingQueue.File, err)
} else {
config.PendingQueue.File = fp
}
if fp, err := filepath.Abs(config.CursorStateFile); err != nil {
return fmt.Errorf("Invalid path %s: %v", config.CursorStateFile, err)
} else {
config.CursorStateFile = fp
}
return nil
}
51 changes: 51 additions & 0 deletions etc/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ output.elasticsearch:
# Optional HTTP Path
#path: "/elasticsearch"

# Custom HTTP headers to add to each request
#headers:
# X-My-Header: Contents of the header

# Proxy server url
#proxy_url: http://proxy:3128

Expand Down Expand Up @@ -211,6 +215,14 @@ output.elasticsearch:
# Path to the Elasticsearch 2.x version of the template file.
#template.versions.2x.path: "${path.config}/beatname.template-es2x.json"

# If set to true, beatname checks the Elasticsearch version at connect time, and if it
# is 6.x, it loads the file specified by the template.versions.6x.path setting. The
# default is true.
#template.versions.6x.enabled: true

# Path to the Elasticsearch 6.x version of the template file.
#template.versions.6x.path: "${path.config}/beatname.template-es6x.json"

# Use SSL settings for HTTPS. Default is true.
#ssl.enabled: true

Expand Down Expand Up @@ -598,6 +610,45 @@ output.elasticsearch:
# the default for the logs path is a logs subdirectory inside the home path.
#path.logs: ${path.home}/logs

#============================== Dashboards =====================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here, or by using the `-setup` CLI flag.
#dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#dashboards.url:

# The directory from where to read the dashboards. It is used instead of the URL
# when it has a value.
#dashboards.directory:

# The file archive (zip file) from where to read the dashboards. It is used instead
# of the URL when it has a value.
#dashboards.file:

# If this option is enabled, the snapshot URL is used instead of the default URL.
#dashboards.snapshot: false

# The URL from where to download the snapshot version of the dashboards. By default
# this has a value which is computed based on the Beat name and version.
#dashboards.snapshot_url

# In case the archive contains the dashboards from multiple Beats, this lets you
# select which one to load. You can load all the dashboards in the archive by
# setting this to the empty string.
#dashboards.beat: beatname

# The name of the Kibana index to use for setting the configuration. Default is ".kibana"
#dashboards.kibana_index: .kibana

# The Elasticsearch index name. This overwrites the index name defined in the
# dashboards and index pattern. Example: testbeat-*
#dashboards.index:

#================================ Logging ======================================
# There are three options for the log output: syslog, file, stderr.
# Under Windows systems, the log files are per default sent to the file output,
Expand Down
15 changes: 13 additions & 2 deletions journal/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,22 @@ func Follow(journal *sdjournal.Journal, stop <-chan struct{}) <-chan *sdjournal.

process:
for {
select {
case <-stop:
return
default:
}

entry, err := readEntry(journal)
if err != nil && err != io.EOF {
logp.Err("Received unknown error when reading a new entry: %v", err)
return
if cursor, cerr := journal.GetCursor(); cerr != nil {
logp.Warn("Received unknown error when reading a new entry: %v, cursor read error: %v", err, cerr)
} else {
logp.Warn("Received unknown error when reading a new entry: %v, cursor: %s", err, cursor)
}
continue
}

if entry != nil {
if _, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE_ID]; ok {
if catalogEntry, err := journal.GetCatalog(); err == nil {
Expand Down

0 comments on commit c0f7063

Please sign in to comment.