From 602ef341e670a092dced36ff7c07e6e0905c3b54 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Thu, 14 Nov 2024 16:25:50 -0800 Subject: [PATCH] De-flake TestJetStreamAckExplicitMsgRemoval This test was ignoring errors from some calls that may possibly fail. As a result the occasional failures could not be traced back exactly to an issue. Refresh this tests: - Check for all errors - Increase the number of messages - Verify uniqueness of messages delivered - Comments --- server/jetstream_test.go | 229 +++++++++++++++++++++++---------------- 1 file changed, 137 insertions(+), 92 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 89b3d29b9c..42c4f00f38 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -9589,156 +9589,201 @@ func TestJetStreamPubSubPerf(t *testing.T) { } func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { - cases := []struct { - name string - mconfig *StreamConfig - }{ - {"MemoryStore", &StreamConfig{ - Name: "MY_STREAM", - Storage: MemoryStorage, - Subjects: []string{"foo.*"}, - Retention: InterestPolicy, - }}, - {"FileStore", &StreamConfig{ - Name: "MY_STREAM", - Storage: FileStorage, - Subjects: []string{"foo.*"}, - Retention: InterestPolicy, - }}, - } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { + const burstSize = 20 + + for _, storageType := range []StorageType{ + FileStorage, + MemoryStorage, + } { + t.Run(storageType.String(), func(t *testing.T) { + // Start a server s := RunBasicJetStreamServer(t) defer s.Shutdown() - mset, err := s.GlobalAccount().addStream(c.mconfig) - if err != nil { - t.Fatalf("Unexpected error adding stream: %v", err) + // Create a stream + cfg := &StreamConfig{ + Name: "MY_STREAM", + Storage: storageType, + Subjects: []string{"foo.*"}, + Retention: InterestPolicy, } + mset, err := s.GlobalAccount().addStream(cfg) + require_NoError(t, err) defer mset.delete() + // Create 2 connections nc1 := clientConnectToServer(t, s) defer nc1.Close() - nc2 := clientConnectToServer(t, s) defer nc2.Close() - // Create two durable consumers on the same subject - sub1, _ := nc1.SubscribeSync(nats.NewInbox()) + // Create 2 subscription inboxes (for durable consumers DeliverSubject forwarding) + sub1, err := nc1.SubscribeSync(nats.NewInbox()) + require_NoError(t, err) defer sub1.Unsubscribe() - nc1.Flush() + err = nc1.Flush() + require_NoError(t, err) - o1, err := mset.addConsumer(&ConsumerConfig{ + sub2, err := nc2.SubscribeSync(nats.NewInbox()) + require_NoError(t, err) + defer sub2.Unsubscribe() + err = nc2.Flush() + require_NoError(t, err) + + // Create 2 durable consumers (that forward messages to the inboxes above) + dc1, err := mset.addConsumer(&ConsumerConfig{ Durable: "dur1", DeliverSubject: sub1.Subject, FilterSubject: "foo.bar", AckPolicy: AckExplicit, }) - if err != nil { - t.Fatalf("Unexpected error adding consumer: %v", err) - } - defer o1.delete() - - sub2, _ := nc2.SubscribeSync(nats.NewInbox()) - defer sub2.Unsubscribe() - nc2.Flush() + require_NoError(t, err) + defer dc1.delete() - o2, err := mset.addConsumer(&ConsumerConfig{ + dc2, err := mset.addConsumer(&ConsumerConfig{ Durable: "dur2", DeliverSubject: sub2.Subject, FilterSubject: "foo.bar", AckPolicy: AckExplicit, AckWait: 100 * time.Millisecond, }) - if err != nil { - t.Fatalf("Unexpected error adding consumer: %v", err) - } - defer o2.delete() + require_NoError(t, err) + defer dc2.delete() - // Send 2 messages - toSend := 2 - for i := 0; i < toSend; i++ { - sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", i+1)) + // Publish N messages (with subject matching the stream created above) + for i := 1; i <= burstSize; i++ { + pubAck := sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg_%d", i)) + require_Equal(t, int(pubAck.Sequence), i) } + + // Check that the stream contains 2 messages state := mset.state() - if state.Msgs != uint64(toSend) { - t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + if state.Msgs != uint64(burstSize) { + t.Fatalf("Expected %d messages in stream, found %d", burstSize, state.Msgs) } - // Receive the messages and ack them. - subs := []*nats.Subscription{sub1, sub2} - for _, sub := range subs { - for i := 0; i < toSend; i++ { + // Receive and ack both messages from the 2 subscriptions + for subIndex, sub := range []*nats.Subscription{sub1, sub2} { + sequences := make(map[uint64]bool, burstSize) + + for i := 1; i <= burstSize; i++ { m, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error acking message: %v", err) + require_NoError(t, err) + err = m.Respond(nil) + require_NoError(t, err) + metadata, err := m.Metadata() + require_NoError(t, err) + sequences[metadata.Sequence.Stream] = true + } + + // Verify all known sequence numbers are delivered (as opposed to duplicates) + for i := uint64(1); i <= burstSize; i++ { + _, ok := sequences[i] + if !ok { + t.Fatalf("Subscriber %d/2 did not deliver message sequence %d", subIndex+1, i) } - m.Respond(nil) } } - // To make sure acks are processed for checking state after sending new ones. - checkFor(t, time.Second, 25*time.Millisecond, func() error { + + // Flush both connections which may have pending ACKs + err = nc1.Flush() + require_NoError(t, err) + err = nc2.Flush() + require_NoError(t, err) + + // Verify all messages are eventually dropped from the stream (since all known consumers ack'd them) + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { if state = mset.state(); state.Msgs != 0 { - return fmt.Errorf("Stream still has messages") + return fmt.Errorf("expected empty stream, but found %d messages", state.Msgs) } return nil }) - // Now close the 2nd subscription... - sub2.Unsubscribe() - nc2.Flush() + // Shut down one of the subscriptions + err = sub2.Unsubscribe() + require_NoError(t, err) + err = nc2.Flush() + require_NoError(t, err) - // Send 2 more new messages - for i := 0; i < toSend; i++ { - sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg%v", 2+i+1)) + // Publish N additional messages + for i := burstSize + 1; i <= burstSize+burstSize; i++ { + pubAck := sendStreamMsg(t, nc1, "foo.bar", fmt.Sprintf("msg_%d", i)) + require_Equal(t, i, int(pubAck.Sequence)) } + + // Check that the stream contains N messages state = mset.state() - if state.Msgs != uint64(toSend) { - t.Fatalf("Expected %v messages, got %d", toSend, state.Msgs) + if state.Msgs != uint64(burstSize) { + t.Fatalf("Expected %d messages in stream, found %d", burstSize, state.Msgs) } - // first subscription should get it and will ack it. - for i := 0; i < toSend; i++ { + // Receive and ack the new messages using the active subscription/consumer + sequences := make(map[uint64]bool, burstSize) + for i := 1; i <= burstSize; i++ { m, err := sub1.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error getting message to ack: %v", err) + require_NoError(t, err) + err = m.Respond(nil) + require_NoError(t, err) + metadata, err := m.Metadata() + require_NoError(t, err) + sequences[metadata.Sequence.Stream] = true + } + // Verify all known sequence numbers are delivered (as opposed to duplicates) + for i := uint64(burstSize) + 1; i <= burstSize+burstSize; i++ { + _, ok := sequences[i] + if !ok { + t.Fatalf("Subscriber 1/2 did not deliver message sequence %d", i) } - m.Respond(nil) } - // For acks from m.Respond above - nc1.Flush() - // Now recreate the subscription for the 2nd JS consumer - sub2, _ = nc2.SubscribeSync(nats.NewInbox()) - defer sub2.Unsubscribe() + // Create a different subscription inbox + newSub2, err := nc2.SubscribeSync(nats.NewInbox()) + require_NoError(t, err) + defer newSub2.Unsubscribe() - o2, err = mset.addConsumer(&ConsumerConfig{ + // Update the second durable consumer with the new inbox subscription + dc2, err = mset.addConsumer(&ConsumerConfig{ Durable: "dur2", - DeliverSubject: sub2.Subject, + DeliverSubject: newSub2.Subject, FilterSubject: "foo.bar", AckPolicy: AckExplicit, AckWait: 100 * time.Millisecond, }) - if err != nil { - t.Fatalf("Unexpected error adding consumer: %v", err) - } - defer o2.delete() + require_NoError(t, err) + defer dc2.delete() - // Those messages should be redelivered to the 2nd consumer - for i := 1; i <= toSend; i++ { - m, err := sub2.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving message %d: %v", i, err) + // Receive and ack the new messages using the recreated/updated subscription/consumer + sequences = make(map[uint64]bool, burstSize) + for i := 1; i <= burstSize; i++ { + m, err := newSub2.NextMsg(time.Second) + require_NoError(t, err) + err = m.Respond(nil) + require_NoError(t, err) + metadata, err := m.Metadata() + require_NoError(t, err) + sequences[metadata.Sequence.Stream] = true + } + // Verify all known sequence numbers are delivered (as opposed to duplicates) + for i := uint64(burstSize) + 1; i <= burstSize+burstSize; i++ { + _, ok := sequences[i] + if !ok { + t.Fatalf("Subscriber 2/2 did not deliver message sequence %d", i) } - m.Respond(nil) + } + + // Flush both connections which may have pending ACKs + err = nc1.Flush() + require_NoError(t, err) + err = nc2.Flush() + require_NoError(t, err) - sseq := o2.streamSeqFromReply(m.Reply) - // Depending on timing from above we could receive stream sequences out of order but - // we know we want 3 & 4. - if sseq != 3 && sseq != 4 { - t.Fatalf("Expected stream sequence of 3 or 4 but got %d", sseq) + // Verify all messages are eventually dropped from the stream, since all known consumers ack'd them + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { + if state = mset.state(); state.Msgs != 0 { + return fmt.Errorf("expected empty stream, but found %d messages", state.Msgs) } - } + return nil + }) }) } }