Skip to content

Commit

Permalink
implemented update subscription (#427)
Browse files Browse the repository at this point in the history
* implemented update subscription

* do not call addstream if it is existing, to prevent error propagation

* added comments

* added reconciler test

* added reconciler tests

* removed unused types

* added check for err
  • Loading branch information
astelmashenko authored Sep 13, 2023
1 parent 7bd5300 commit a7bb71a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 23 deletions.
27 changes: 27 additions & 0 deletions pkg/channel/jetstream/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfi

toAddSubs := expectedSubs.Difference(currentSubs)
toRemoveSubs := currentSubs.Difference(expectedSubs)
toUpdateSubs := currentSubs.Intersection(expectedSubs)

nextSubs := sets.NewString()
var subErrs commonerr.SubscriberErrors
Expand All @@ -160,6 +161,11 @@ func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfi
err error
)

if toUpdateSubs.Has(uid) {
subLogger.Debugw("updating existing subscription")
_ = d.updateSubscription(logging.WithLogger(ctx, subLogger), config, sub, isLeader)
}

if toAddSubs.Has(uid) {
subLogger.Debugw("subscription not configured for dispatcher, subscribing")
status, err = d.subscribe(logging.WithLogger(ctx, subLogger), config, sub, isLeader)
Expand All @@ -168,6 +174,8 @@ func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfi
status = SubscriberStatusTypeUpToDate
}

logger.Debugw("Subscription status after add/update", zap.Any("SubStatus", status))

switch status {
case SubscriberStatusTypeCreated, SubscriberStatusTypeUpToDate:
nextSubs.Insert(uid)
Expand Down Expand Up @@ -202,6 +210,25 @@ func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfi
return nil
}

func (d *Dispatcher) updateSubscription(ctx context.Context, config ChannelConfig, sub Subscription, isLeader bool) error {
logger := logging.FromContext(ctx)
d.consumers[sub.UID].sub = sub
consumerName := d.consumerNameFunc(string(sub.UID))

if isLeader {
deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID))
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate)

_, err := d.js.UpdateConsumer(config.StreamName, consumerConfig)
if err != nil {
logger.Errorw("failed to update queue subscription for consumer", zap.Error(err))
return err
}
}

return nil
}

func (d *Dispatcher) subscribe(ctx context.Context, config ChannelConfig, sub Subscription, isLeader bool) (SubscriberStatusType, error) {
logger := logging.FromContext(ctx)

Expand Down
71 changes: 71 additions & 0 deletions pkg/channel/jetstream/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,22 @@ limitations under the License.
package dispatcher

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
dispatchertesting "knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/testing"
"knative.dev/eventing-natss/pkg/client/injection/client"
fakeclientset "knative.dev/eventing-natss/pkg/client/injection/client/fake"
reconcilertesting "knative.dev/eventing-natss/pkg/reconciler/testing"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
logtesting "knative.dev/pkg/logging/testing"

"knative.dev/eventing-natss/pkg/apis/messaging/v1alpha1"
"knative.dev/eventing-natss/pkg/channel/jetstream/utils"
reconciletesting "knative.dev/eventing-natss/pkg/reconciler/testing"
Expand All @@ -36,6 +50,63 @@ func TestDispatcher_RegisterChannelHost(t *testing.T) {
}
}

func TestDispatcher_ReconcileConsumers(t *testing.T) {
ctx := logging.WithLogger(context.Background(), logtesting.TestLogger(t))

s := dispatchertesting.RunBasicJetstreamServer()
defer dispatchertesting.ShutdownJSServerAndRemoveStorage(t, s)
_, js := dispatchertesting.JsClient(t, s)

ls := reconcilertesting.NewListers([]runtime.Object{})

ctx, _ = fakekubeclient.With(ctx, ls.GetKubeObjects()...)
ctx, _ = fakeeventingclient.With(ctx, ls.GetEventingObjects()...)
ctx, _ = fakeclientset.With(ctx, ls.GetNatssObjects()...)

eventRecorder := record.NewFakeRecorder(10)
ctx = controller.WithEventRecorder(ctx, eventRecorder)

nc := reconciletesting.NewNatsJetStreamChannel(testNS, ncName, reconciletesting.WithNatsJetStreamChannelSubscribers(subscribers))
config := createChannelConfig(nc, Subscription{
UID: subscriber1UID,
})

d, err := NewDispatcher(ctx, NatsDispatcherArgs{
JetStream: js,
SubjectFunc: utils.PublishSubjectName,
ConsumerNameFunc: utils.ConsumerName,
ConsumerSubjectFunc: utils.ConsumerSubjectName,
PodName: "test",
ContainerName: "test",
})
require.NoError(t, err)

reconciler := &Reconciler{
clientSet: client.Get(ctx),
js: js,
dispatcher: d,
streamNameFunc: utils.StreamName,
consumerNameFunc: utils.ConsumerName,
}
_ = reconciler.reconcileStream(ctx, nc)

err = d.ReconcileConsumers(ctx, *config, true)
if err != nil {
t.Fatal(err)
}

configNew := createChannelConfig(nc, Subscription{
UID: subscriber1UID,
}, Subscription{
UID: subscriber2UID,
})

err = d.ReconcileConsumers(ctx, *configNew, true)
if err != nil {
t.Fatal(err)
}
}

func createChannelConfig(nc *v1alpha1.NatsJetStreamChannel, subs ...Subscription) *ChannelConfig {
if subs == nil {
subs = []Subscription{}
Expand Down
64 changes: 50 additions & 14 deletions pkg/channel/jetstream/dispatcher/natsjetstreamchannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"fmt"
"testing"

"k8s.io/apimachinery/pkg/types"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgotesting "k8s.io/client-go/testing"
Expand All @@ -38,22 +41,12 @@ import (
reconciletesting "knative.dev/eventing-natss/pkg/reconciler/testing"
)

type failOnFatalAndErrorLogger struct {
*zap.Logger
t *testing.T
}

func (l *failOnFatalAndErrorLogger) Error(msg string, fields ...zap.Field) {
l.t.Fatalf("Error() called - msg: %s - fields: %v", msg, fields)
}

func (l *failOnFatalAndErrorLogger) Fatal(msg string, fields ...zap.Field) {
l.t.Fatalf("Fatal() called - msg: %s - fields: %v", msg, fields)
}

const (
testNS = "test-namespace"
ncName = "test-nc"

twoSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"},{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]}]`
channelServiceAddress = "test-nc-kn-jsm-channel.test-namespace.svc.cluster.local"
)

var (
Expand All @@ -62,6 +55,26 @@ var (
"FinalizerUpdate",
fmt.Sprintf(`Updated %q finalizers`, ncName),
)

subscriber1UID = types.UID("2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1")
subscriber2UID = types.UID("34c5aec8-deb6-11e8-9f32-f2801f1b9fd1")
subscriber1Generation = int64(1)
subscriber2Generation = int64(2)

subscriber1 = eventingduckv1.SubscriberSpec{
UID: subscriber1UID,
Generation: subscriber1Generation,
SubscriberURI: apis.HTTP("call1"),
ReplyURI: apis.HTTP("sink2"),
}

subscriber2 = eventingduckv1.SubscriberSpec{
UID: subscriber2UID,
Generation: subscriber2Generation,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}
subscribers = []eventingduckv1.SubscriberSpec{subscriber1, subscriber2}
)

func TestAllCases(t *testing.T) {
Expand Down Expand Up @@ -98,6 +111,29 @@ func TestAllCases(t *testing.T) {
makeFinalizerPatch(testNS, ncName),
},
},
{
Name: "reconcile ok: update consumer",
Key: ncKey,
Objects: []runtime.Object{
reconciletesting.NewNatsJetStreamChannel(ncName, testNS,
reconciletesting.WithNatsJetStreamInitChannelConditions,
reconciletesting.WithNatsJetStreamChannelAddress(channelServiceAddress),
reconciletesting.JetStreamAddressable(),
reconciletesting.WithNatsJetStreamChannelChannelServiceReady(),
reconciletesting.WithNatsJetStreamChannelServiceReady(),
reconciletesting.WithNatsJetStreamChannelEndpointsReady(),
reconciletesting.WithNatsJetStreamChannelDeploymentReady(),
reconciletesting.WithNatsJetStreamChannelStreamReady(),
reconciletesting.WithNatsJetStreamChannelSubscribers(subscribers)),
},
WantEvents: []string{
finalizerUpdatedEvent,
},
WantPatches: []clientgotesting.PatchActionImpl{
makeFinalizerPatch(testNS, ncName),
makePatch(testNS, ncName, twoSubscriberPatch),
},
},
}

table.Test(t, reconciletesting.MakeFactory(func(ctx context.Context, l *reconciletesting.Listers) controller.Reconciler {
Expand Down
19 changes: 10 additions & 9 deletions pkg/channel/jetstream/dispatcher/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,17 @@ func (r *Reconciler) reconcileStream(ctx context.Context, nc *v1alpha1.NatsJetSt
streamConfig := buildStreamConfig(streamName, primarySubject, nc.Spec.Stream.Config)
isCreating := existing == nil

// AddStream is idempotent if the config is identical to that on the server
info, err := r.js.AddStream(streamConfig)
if err != nil {
logger.Errorw("failed to add stream")
controller.GetEventRecorder(ctx).Event(nc, corev1.EventTypeWarning, ReasonJetstreamStreamFailed, err.Error())
nc.Status.MarkStreamFailed("DispatcherCreateStreamFailed", "Failed to create JetStream stream")
return err
}

// if stream is existing then it may return error and prevent update of subscriptions
if isCreating {
// AddStream is idempotent if the config is identical to that on the server
info, err := r.js.AddStream(streamConfig)
if err != nil {
logger.Errorw("failed to add stream")
controller.GetEventRecorder(ctx).Event(nc, corev1.EventTypeWarning, ReasonJetstreamStreamFailed, err.Error())
nc.Status.MarkStreamFailed("DispatcherCreateStreamFailed", "Failed to create JetStream stream")
return err
}

logger.Infow("jetstream stream created", zap.String("stream_name", info.Config.Name))
controller.GetEventRecorder(ctx).Event(nc, corev1.EventTypeNormal, ReasonJetstreamStreamCreated, "JetStream stream created")
}
Expand Down

0 comments on commit a7bb71a

Please sign in to comment.