From 2ceff7e12376b95053212d5184aca238bdf7abfb Mon Sep 17 00:00:00 2001 From: Yadvendar Champawat Date: Tue, 14 Feb 2017 13:41:12 +0530 Subject: [PATCH] passing tests --- config.go | 3 --- job.go | 2 +- pool.go | 6 +++--- pool_test.go | 6 +++--- redis_keys.go | 12 +++++++++--- scripts_test.go | 17 +++++++++-------- test_utils.go | 8 ++++---- 7 files changed, 29 insertions(+), 25 deletions(-) diff --git a/config.go b/config.go index 1b90e3a..8331ba3 100644 --- a/config.go +++ b/config.go @@ -45,9 +45,6 @@ func (c *configType) SetEnvPrefix(p string) { return } c.envPrefix = p - Keys.JobsTimeIndex = c.GetKeyPrefix() + Keys.JobsTimeIndex - Keys.JobsTemp = c.GetKeyPrefix() + Keys.JobsTemp - Keys.ActivePools = c.GetKeyPrefix() + Keys.ActivePools } func (c *configType) GetKeyPrefix() string { var hardCodedPrefix = "jobs:" diff --git a/job.go b/job.go index 5737fe8..1bcb08b 100644 --- a/job.go +++ b/job.go @@ -177,7 +177,7 @@ func (t *transaction) saveJob(job *Job) { // add the job id to the time index with a score equal to the job's time field. // If the job has been destroyed, addJobToTimeIndex will have no effect. func (t *transaction) addJobToTimeIndex(job *Job) { - t.addJobToSet(job, Keys.JobsTimeIndex, float64(job.time)) + t.addJobToSet(job, Keys.JobsTimeIndex.Key(), float64(job.time)) } // Refresh mutates the job by setting its fields to the most recent data diff --git a/pool.go b/pool.go index f99cdb8..9149b55 100644 --- a/pool.go +++ b/pool.go @@ -136,7 +136,7 @@ func (p *Pool) addToPoolSet() error { p.RLock() thisId := p.id p.RUnlock() - if _, err := conn.Do("SADD", Keys.ActivePools, thisId); err != nil { + if _, err := conn.Do("SADD", Keys.ActivePools.Key(), thisId); err != nil { return err } return nil @@ -150,7 +150,7 @@ func (p *Pool) removeFromPoolSet() error { p.RLock() thisId := p.id p.RUnlock() - if _, err := conn.Do("SREM", Keys.ActivePools, thisId); err != nil { + if _, err := conn.Do("SREM", Keys.ActivePools.Key(), thisId); err != nil { return err } return nil @@ -210,7 +210,7 @@ func (p *Pool) pongKey() string { func (p *Pool) purgeStalePools() error { conn := redisPool.Get() defer conn.Close() - poolIds, err := redis.Strings(conn.Do("SMEMBERS", Keys.ActivePools)) + poolIds, err := redis.Strings(conn.Do("SMEMBERS", Keys.ActivePools.Key())) if err != nil { return err } diff --git a/pool_test.go b/pool_test.go index c48b1a7..4da67bd 100644 --- a/pool_test.go +++ b/pool_test.go @@ -30,12 +30,12 @@ func TestPoolIdSet(t *testing.T) { if err := pool.Start(); err != nil { t.Errorf("Unexpected error in pool.Start(): %s", err.Error()) } - expectSetContains(t, Keys.ActivePools, pool.id) + expectSetContains(t, Keys.ActivePools.Key(), pool.id) pool.Close() if err := pool.Wait(); err != nil { t.Errorf("Unexpected error in pool.Wait(): %s", err.Error()) } - expectSetDoesNotContain(t, Keys.ActivePools, pool.id) + expectSetDoesNotContain(t, Keys.ActivePools.Key(), pool.id) } // TestGetNextJobs tests the getNextJobs function, which queries the database to find @@ -940,7 +940,7 @@ func TestStalePoolsArePurged(t *testing.T) { } // At this point, the stale pool should have been fully purged. - expectSetDoesNotContain(t, Keys.ActivePools, oldId) + expectSetDoesNotContain(t, Keys.ActivePools.Key(), oldId) expectJobFieldEquals(t, job, "poolId", newPool.id, stringConverter) } diff --git a/redis_keys.go b/redis_keys.go index 796ec2e..6e54aeb 100644 --- a/redis_keys.go +++ b/redis_keys.go @@ -4,20 +4,26 @@ package jobs +type redisKey string + // keys stores any constant redis keys. By storing them all here, // we avoid using string literals which are prone to typos. var Keys = struct { // jobsTimeIndex is the key for a sorted set which keeps all outstanding // jobs sorted by their time field. - JobsTimeIndex string + JobsTimeIndex redisKey // jobsTemp is the key for a temporary set which is created and then destroyed // during the process of getting the next jobs in the queue. - JobsTemp string + JobsTemp redisKey // activePools is the key for a set which holds the pool ids for all active // pools. - ActivePools string + ActivePools redisKey }{ JobsTimeIndex: "time", JobsTemp: "temp", ActivePools: "pools:active", } + +func (rk *redisKey) Key() string { + return Config.GetKeyPrefix() + string(*rk) +} diff --git a/scripts_test.go b/scripts_test.go index e188317..1f957f3 100644 --- a/scripts_test.go +++ b/scripts_test.go @@ -5,10 +5,11 @@ package jobs import ( - "github.com/garyburd/redigo/redis" "reflect" "testing" "time" + + "github.com/garyburd/redigo/redis" ) func TestPopNextJobsScript(t *testing.T) { @@ -21,7 +22,7 @@ func TestPopNextJobsScript(t *testing.T) { // Set up the database tx0 := newTransaction() // One set will mimic the ready and sorted jobs - tx0.command("ZADD", redis.Args{Keys.JobsTimeIndex, pastTime, "two", pastTime, "four"}, nil) + tx0.command("ZADD", redis.Args{Keys.JobsTimeIndex.Key(), pastTime, "two", pastTime, "four"}, nil) // One set will mimic the queued set tx0.command("ZADD", redis.Args{StatusQueued.Key(), 1, "one", 2, "two", 3, "three", 4, "four"}, nil) // One set will mimic the executing set @@ -67,7 +68,7 @@ func TestPopNextJobsScript(t *testing.T) { if !reflect.DeepEqual(expectedQueued, gotQueued) { t.Errorf("Ids in the queued set were incorrect.\n\tExpected: %v\n\tBut got: %v", expectedQueued, gotQueued) } - expectKeyNotExists(t, Keys.JobsTemp) + expectKeyNotExists(t, Keys.JobsTemp.Key()) } func TestRetryOrFailJobScript(t *testing.T) { @@ -218,7 +219,7 @@ func TestPurgeStalePoolScript(t *testing.T) { // Add both pools to the set of active pools conn := redisPool.Get() defer conn.Close() - if _, err := conn.Do("SADD", Keys.ActivePools, stalePoolId, activePoolId); err != nil { + if _, err := conn.Do("SADD", Keys.ActivePools.Key(), stalePoolId, activePoolId); err != nil { t.Errorf("Unexpected error adding pools to set: %s", err) } @@ -231,8 +232,8 @@ func TestPurgeStalePoolScript(t *testing.T) { // Check the result // The active pools set should contain only the activePoolId - expectSetDoesNotContain(t, Keys.ActivePools, stalePoolId) - expectSetContains(t, Keys.ActivePools, activePoolId) + expectSetDoesNotContain(t, Keys.ActivePools.Key(), stalePoolId) + expectSetContains(t, Keys.ActivePools.Key(), activePoolId) // All the active jobs should still be executing for _, job := range activeJobs { if err := job.Refresh(); err != nil { @@ -338,7 +339,7 @@ func TestAddJobToSetScript(t *testing.T) { // Add the job to the time index with a score of 7 days ago tx := newTransaction() expectedScore := float64(time.Now().Add(-7 * 24 * time.Hour).UTC().UnixNano()) - tx.addJobToSet(job, Keys.JobsTimeIndex, expectedScore) + tx.addJobToSet(job, Keys.JobsTimeIndex.Key(), expectedScore) if err := tx.exec(); err != nil { t.Errorf("Unexpected err in tx.exec(): %s", err.Error()) } @@ -346,7 +347,7 @@ func TestAddJobToSetScript(t *testing.T) { // Make sure the job was added to the set properly conn := redisPool.Get() defer conn.Close() - score, err := redis.Float64(conn.Do("ZSCORE", Keys.JobsTimeIndex, job.id)) + score, err := redis.Float64(conn.Do("ZSCORE", Keys.JobsTimeIndex.Key(), job.id)) if err != nil { t.Errorf("Unexpected error in ZSCORE: %s", err.Error()) } diff --git a/test_utils.go b/test_utils.go index 1e87344..7018e8b 100644 --- a/test_utils.go +++ b/test_utils.go @@ -226,7 +226,7 @@ func expectJobInStatusSet(t *testing.T, j *Job, status Status) { func expectJobInTimeIndex(t *testing.T, j *Job) { conn := redisPool.Get() defer conn.Close() - gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex, j.time, j.time)) + gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex.Key(), j.time, j.time)) if err != nil { t.Errorf("Unexpected error: %s", err.Error()) } @@ -237,7 +237,7 @@ func expectJobInTimeIndex(t *testing.T, j *Job) { } } // If we reached here, we did not find the job we were looking for - t.Errorf("job:%s was not found in set %s", j.id, Keys.JobsTimeIndex) + t.Errorf("job:%s was not found in set %s", j.id, Keys.JobsTimeIndex.Key()) } // expectJobNotInStatusSet sets an error via t.Errorf if job is in the status set @@ -262,14 +262,14 @@ func expectJobNotInStatusSet(t *testing.T, j *Job, status Status) { func expectJobNotInTimeIndex(t *testing.T, j *Job) { conn := redisPool.Get() defer conn.Close() - gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex, j.time, j.time)) + gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex.Key(), j.time, j.time)) if err != nil { t.Errorf("Unexpected error: %s", err.Error()) } for _, id := range gotIds { if id == j.id { // We found the job, but it wasn't supposed to be here! - t.Errorf("job:%s was found in set %s but expected it to be removed", j.id, Keys.JobsTimeIndex) + t.Errorf("job:%s was found in set %s but expected it to be removed", j.id, Keys.JobsTimeIndex.Key()) } } }