Skip to content

Commit

Permalink
De-flake TestJetStreamAckExplicitMsgRemoval
Browse files Browse the repository at this point in the history
This test was ignoring errors from some calls that may possibly fail.
As a result the occasional failures could not be traced back exactly to an issue.

Refresh this tests:
 - Check for all errors
 - Increase the number of messages
 - Verify uniqueness of messages delivered
 - Comments
  • Loading branch information
mprimi committed Nov 15, 2024
1 parent baa0f80 commit 602ef34
Showing 1 changed file with 137 additions and 92 deletions.
229 changes: 137 additions & 92 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9589,156 +9589,201 @@ func TestJetStreamPubSubPerf(t *testing.T) {
}

func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
cases := []struct {
name string
mconfig *StreamConfig
}{
{"MemoryStore", &StreamConfig{
Name: "MY_STREAM",
Storage: MemoryStorage,
Subjects: []string{"foo.*"},
Retention: InterestPolicy,
}},
{"FileStore", &StreamConfig{
Name: "MY_STREAM",
Storage: FileStorage,
Subjects: []string{"foo.*"},
Retention: InterestPolicy,
}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
const burstSize = 20

for _, storageType := range []StorageType{
FileStorage,
MemoryStorage,
} {
t.Run(storageType.String(), func(t *testing.T) {
// Start a server
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

mset, err := s.GlobalAccount().addStream(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
// Create a stream
cfg := &StreamConfig{
Name: "MY_STREAM",
Storage: storageType,
Subjects: []string{"foo.*"},
Retention: InterestPolicy,
}
mset, err := s.GlobalAccount().addStream(cfg)
require_NoError(t, err)
defer mset.delete()

// Create 2 connections
nc1 := clientConnectToServer(t, s)
defer nc1.Close()

nc2 := clientConnectToServer(t, s)
defer nc2.Close()

// Create two durable consumers on the same subject
sub1, _ := nc1.SubscribeSync(nats.NewInbox())
// Create 2 subscription inboxes (for durable consumers DeliverSubject forwarding)
sub1, err := nc1.SubscribeSync(nats.NewInbox())
require_NoError(t, err)
defer sub1.Unsubscribe()
nc1.Flush()
err = nc1.Flush()
require_NoError(t, err)

o1, err := mset.addConsumer(&ConsumerConfig{
sub2, err := nc2.SubscribeSync(nats.NewInbox())
require_NoError(t, err)
defer sub2.Unsubscribe()
err = nc2.Flush()
require_NoError(t, err)

// Create 2 durable consumers (that forward messages to the inboxes above)
dc1, err := mset.addConsumer(&ConsumerConfig{
Durable: "dur1",
DeliverSubject: sub1.Subject,
FilterSubject: "foo.bar",
AckPolicy: AckExplicit,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o1.delete()

sub2, _ := nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
nc2.Flush()
require_NoError(t, err)
defer dc1.delete()

o2, err := mset.addConsumer(&ConsumerConfig{
dc2, err := mset.addConsumer(&ConsumerConfig{
Durable: "dur2",
DeliverSubject: sub2.Subject,
FilterSubject: "foo.bar",
AckPolicy: AckExplicit,
AckWait: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o2.delete()
require_NoError(t, err)
defer dc2.delete()

// Send 2 messages
toSend := 2
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1))
// Publish N messages (with subject matching the stream created above)
for i := 1; i <= burstSize; i++ {
pubAck := sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg_%d", i))
require_Equal(t, int(pubAck.Sequence), i)
}

// Check that the stream contains 2 messages
state := mset.state()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
if state.Msgs != uint64(burstSize) {
t.Fatalf("Expected %d messages in stream, found %d", burstSize, state.Msgs)
}

// Receive the messages and ack them.
subs := []*nats.Subscription{sub1, sub2}
for _, sub := range subs {
for i := 0; i < toSend; i++ {
// Receive and ack both messages from the 2 subscriptions
for subIndex, sub := range []*nats.Subscription{sub1, sub2} {
sequences := make(map[uint64]bool, burstSize)

for i := 1; i <= burstSize; i++ {
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
require_NoError(t, err)
err = m.Respond(nil)
require_NoError(t, err)
metadata, err := m.Metadata()
require_NoError(t, err)
sequences[metadata.Sequence.Stream] = true
}

// Verify all known sequence numbers are delivered (as opposed to duplicates)
for i := uint64(1); i <= burstSize; i++ {
_, ok := sequences[i]
if !ok {
t.Fatalf("Subscriber %d/2 did not deliver message sequence %d", subIndex+1, i)
}
m.Respond(nil)
}
}
// To make sure acks are processed for checking state after sending new ones.
checkFor(t, time.Second, 25*time.Millisecond, func() error {

// Flush both connections which may have pending ACKs
err = nc1.Flush()
require_NoError(t, err)
err = nc2.Flush()
require_NoError(t, err)

// Verify all messages are eventually dropped from the stream (since all known consumers ack'd them)
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
if state = mset.state(); state.Msgs != 0 {
return fmt.Errorf("Stream still has messages")
return fmt.Errorf("expected empty stream, but found %d messages", state.Msgs)
}
return nil
})

// Now close the 2nd subscription...
sub2.Unsubscribe()
nc2.Flush()
// Shut down one of the subscriptions
err = sub2.Unsubscribe()
require_NoError(t, err)
err = nc2.Flush()
require_NoError(t, err)

// Send 2 more new messages
for i := 0; i < toSend; i++ {
sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1))
// Publish N additional messages
for i := burstSize + 1; i <= burstSize+burstSize; i++ {
pubAck := sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg_%d", i))
require_Equal(t, i, int(pubAck.Sequence))
}

// Check that the stream contains N messages
state = mset.state()
if state.Msgs != uint64(toSend) {
t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs)
if state.Msgs != uint64(burstSize) {
t.Fatalf("Expected %d messages in stream, found %d", burstSize, state.Msgs)
}

// first subscription should get it and will ack it.
for i := 0; i < toSend; i++ {
// Receive and ack the new messages using the active subscription/consumer
sequences := make(map[uint64]bool, burstSize)
for i := 1; i <= burstSize; i++ {
m, err := sub1.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error getting message to ack: %v", err)
require_NoError(t, err)
err = m.Respond(nil)
require_NoError(t, err)
metadata, err := m.Metadata()
require_NoError(t, err)
sequences[metadata.Sequence.Stream] = true
}
// Verify all known sequence numbers are delivered (as opposed to duplicates)
for i := uint64(burstSize) + 1; i <= burstSize+burstSize; i++ {
_, ok := sequences[i]
if !ok {
t.Fatalf("Subscriber 1/2 did not deliver message sequence %d", i)
}
m.Respond(nil)
}
// For acks from m.Respond above
nc1.Flush()

// Now recreate the subscription for the 2nd JS consumer
sub2, _ = nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
// Create a different subscription inbox
newSub2, err := nc2.SubscribeSync(nats.NewInbox())
require_NoError(t, err)
defer newSub2.Unsubscribe()

o2, err = mset.addConsumer(&ConsumerConfig{
// Update the second durable consumer with the new inbox subscription
dc2, err = mset.addConsumer(&ConsumerConfig{
Durable: "dur2",
DeliverSubject: sub2.Subject,
DeliverSubject: newSub2.Subject,
FilterSubject: "foo.bar",
AckPolicy: AckExplicit,
AckWait: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
defer o2.delete()
require_NoError(t, err)
defer dc2.delete()

// Those messages should be redelivered to the 2nd consumer
for i := 1; i <= toSend; i++ {
m, err := sub2.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving message %d: %v", i, err)
// Receive and ack the new messages using the recreated/updated subscription/consumer
sequences = make(map[uint64]bool, burstSize)
for i := 1; i <= burstSize; i++ {
m, err := newSub2.NextMsg(time.Second)
require_NoError(t, err)
err = m.Respond(nil)
require_NoError(t, err)
metadata, err := m.Metadata()
require_NoError(t, err)
sequences[metadata.Sequence.Stream] = true
}
// Verify all known sequence numbers are delivered (as opposed to duplicates)
for i := uint64(burstSize) + 1; i <= burstSize+burstSize; i++ {
_, ok := sequences[i]
if !ok {
t.Fatalf("Subscriber 2/2 did not deliver message sequence %d", i)
}
m.Respond(nil)
}

// Flush both connections which may have pending ACKs
err = nc1.Flush()
require_NoError(t, err)
err = nc2.Flush()
require_NoError(t, err)

sseq := o2.streamSeqFromReply(m.Reply)
// Depending on timing from above we could receive stream sequences out of order but
// we know we want 3 & 4.
if sseq != 3 && sseq != 4 {
t.Fatalf("Expected stream sequence of 3 or 4 but got %d", sseq)
// Verify all messages are eventually dropped from the stream, since all known consumers ack'd them
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
if state = mset.state(); state.Msgs != 0 {
return fmt.Errorf("expected empty stream, but found %d messages", state.Msgs)
}
}
return nil
})
})
}
}
Expand Down

0 comments on commit 602ef34

Please sign in to comment.