Skip to content

Commit

Permalink
Merge pull request #2 from andriyko/fix-client-conn
Browse files Browse the repository at this point in the history
Fix client conn
  • Loading branch information
andriyko authored Apr 1, 2018
2 parents bb8f7b6 + 388e641 commit d9a6e46
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 64 deletions.
25 changes: 2 additions & 23 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,6 @@ func newConn(conf connConfig, muted bool) (*conn, error) {
if err != nil {
return c, err
}
// When using UDP do a quick check to see if something is listening on the
// given port to return an error as soon as possible.
if c.network[:3] == "udp" {
for i := 0; i < 2; i++ {
_, err = c.w.Write(nil)
if err != nil {
_ = c.w.Close()
c.w = nil
return c, err
}
}
}

// To prevent a buffer overflow add some capacity to the buffer to allow for
// an additional metric.
Expand Down Expand Up @@ -166,30 +154,21 @@ func isNegative(v interface{}) bool {
switch n := v.(type) {
case int:
return n < 0
case uint:
return n < 0
case int64:
return n < 0
case uint64:
return n < 0
case int32:
return n < 0
case uint32:
return n < 0
case int16:
return n < 0
case uint16:
return n < 0
case int8:
return n < 0
case uint8:
return n < 0
case float64:
return n < 0
case float32:
return n < 0
default:
return false
}
return false
}

func (c *conn) appendBucket(prefix, bucket string, tags string) {
Expand Down
2 changes: 1 addition & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime"
"time"

"gopkg.in/alexcesaro/statsd.v2"
"github.com/andriyko/statsd"
)

var (
Expand Down
61 changes: 33 additions & 28 deletions statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Client) Count(bucket string, n interface{}) {
}

func (c *Client) skip() bool {
return c.muted || (c.rate != 1 && randFloat() > c.rate)
return nil == c || c.muted || (c.rate != 1 && randFloat() > c.rate)
}

// Increment increment the given bucket. It is equivalent to Count(bucket, 1).
Expand All @@ -108,7 +108,7 @@ func (c *Client) Timing(bucket string, value interface{}) {
c.conn.metric(c.prefix, bucket, value, "ms", c.rate, c.tags)
}

// TimingWithTags ...
// TimingWithTags sends a timing value with tags to a bucket.
func (c *Client) TimingWithTags(bucket string, value interface{}, tags map[string]string) {
if c.skip() {
return
Expand All @@ -125,32 +125,6 @@ func (c *Client) Histogram(bucket string, value interface{}) {
c.conn.metric(c.prefix, bucket, value, "h", c.rate, c.tags)
}

// A Timing is an helper object that eases sending timing values.
type Timing struct {
start time.Time
c *Client
}

// NewTiming creates a new Timing.
func (c *Client) NewTiming() Timing {
return Timing{start: now(), c: c}
}

// Send sends the time elapsed since the creation of the Timing.
func (t Timing) Send(bucket string) {
t.c.Timing(bucket, int(t.Duration()/time.Millisecond))
}

// SendWithTags sends the time elapsed since the creation of the Timing
func (t Timing) SendWithTags(bucket string, tags map[string]string) {
t.c.TimingWithTags(bucket, float64(time.Since(t.start))/float64(time.Millisecond), tags)
}

// Duration returns the time elapsed since the creation of the Timing.
func (t Timing) Duration() time.Duration {
return now().Sub(t.start)
}

// Unique sends the given value to a set bucket.
func (c *Client) Unique(bucket string, value string) {
if c.skip() {
Expand All @@ -159,6 +133,11 @@ func (c *Client) Unique(bucket string, value string) {
c.conn.unique(c.prefix, bucket, value, c.tags)
}

// Muted tells whether the Clent is muted
func (c *Client) Muted() bool {
return c.muted
}

// Flush flushes the Client's buffer.
func (c *Client) Flush() {
if c.muted {
Expand All @@ -181,3 +160,29 @@ func (c *Client) Close() {
c.conn.closed = true
c.conn.mu.Unlock()
}

// A Timing is an helper object that eases sending timing values.
type Timing struct {
start time.Time
c *Client
}

// NewTiming creates a new Timing.
func (c *Client) NewTiming() Timing {
return Timing{start: now(), c: c}
}

// Send sends the time elapsed since the creation of the Timing.
func (t Timing) Send(bucket string) {
t.c.Timing(bucket, int(t.Duration()/time.Millisecond))
}

// SendWithTags sends the time elapsed since the creation of the Timing
func (t Timing) SendWithTags(bucket string, tags map[string]string) {
t.c.TimingWithTags(bucket, float64(t.Duration())/float64(time.Millisecond), tags)
}

// Duration returns the time elapsed since the creation of the Timing.
func (t Timing) Duration() time.Duration {
return time.Since(t.start)
}
23 changes: 11 additions & 12 deletions statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"net"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -261,15 +262,15 @@ func TestMaxPacketSize(t *testing.T) {
}

c.Increment(testKey)
got = conn.buf.String()
got = strings.TrimSuffix(conn.buf.String(), "\n")
want := "test_key:1|c"
if got != want {
t.Errorf("Invalid output, got %q, want %q", got, want)
}
conn.buf.Reset()
c.Close()

got = conn.buf.String()
got = strings.TrimSuffix(conn.buf.String(), "\n")
if got != want {
t.Errorf("Invalid output, got %q, want %q", got, want)
}
Expand Down Expand Up @@ -342,10 +343,7 @@ func TestDialError(t *testing.T) {
}
defer func() { dialTimeout = net.DialTimeout }()

c, err := New()
if c == nil || !c.muted {
t.Error("New() did not return a muted client")
}
_, err := New()
if err == nil {
t.Error("New() did not return an error")
}
Expand All @@ -370,11 +368,11 @@ func TestUDPNotListening(t *testing.T) {
defer func() { dialTimeout = net.DialTimeout }()

c, err := New()
if c == nil || !c.muted {
t.Error("New() did not return a muted client")
if c.muted {
t.Error("client should not mute when attempting to connect to a non-listening port")
}
if err == nil {
t.Error("New should return an error")
if err != nil {
t.Error("attempting to connect to a non-listening port should not return an error")
}
}

Expand Down Expand Up @@ -461,7 +459,8 @@ func getOutput(c *Client) string {
if c.conn.w == nil {
return ""
}
return getBuffer(c).buf.String()
out := getBuffer(c).buf.String()
return strings.TrimSuffix(out, "\n")
}

func mockDial(string, string, time.Duration) (net.Conn, error) {
Expand All @@ -479,7 +478,7 @@ func TestTCP(t *testing.T) {
func testNetwork(t *testing.T, network string) {
received := make(chan bool)
server := newServer(t, network, testAddr, func(p []byte) {
s := string(p)
s := strings.TrimSuffix(string(p), "\n")
if s != "test_key:1|c" {
t.Errorf("invalid output: %q", s)
}
Expand Down

0 comments on commit d9a6e46

Please sign in to comment.