Skip to content

Commit

Permalink
fix(sandside): #197 improve tests and fix NotificationServiceImple
Browse files Browse the repository at this point in the history
close #197
  • Loading branch information
Marthym committed Dec 31, 2023
1 parent 2d75fbe commit d97288b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public NotifyServiceImpl(
this.cache = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(30))
.maximumSize(1000)
.<String, ByUserEventPublisherCacheEntry>evictionListener((key, value, cause) -> {
.<String, ByUserEventPublisherCacheEntry>removalListener((key, value, cause) -> {
if (value != null) {
log.atTrace().addArgument(key).log("Remove {} from the cache");
value.sink().tryEmitComplete();
Subscription subscription = value.subscription().getAndSet(null);
if (subscription != null) {
Expand All @@ -68,17 +69,20 @@ public Flux<ServerEvent> subscribe() {
}
return authFacade.getConnectedUser().flatMapMany(u ->
Objects.requireNonNull(cache.get(u.id, id -> {
Sinks.Many<ServerEvent> sink = Sinks.many().multicast().directBestEffort();
Sinks.Many<ServerEvent> sink = Sinks.many().unicast().onBackpressureBuffer();
AtomicReference<Subscription> subscription = new AtomicReference<>();
Flux<ServerEvent> multicastFlux = this.multicast.asFlux().doOnSubscribe(subscription::set);
Flux<ServerEvent> multicastFlux = this.multicast.asFlux()
.doOnSubscribe(subscription::set);
log.atDebug().addArgument(u.id)
.log("Subscribe notification for {}");
Flux<ServerEvent> eventPublisher = Flux.merge(
notificationPersistence.consume(u.id),
sink.asFlux(),
multicastFlux
)
.takeWhile(e -> cache.asMap().containsKey(id))
.map(e -> {
log.debug("Event: {}", e);
log.atDebug().addArgument(u.id).addArgument(e).log("{} receive Event: {}");
return e;
}).cache(0);
return new ByUserEventPublisherCacheEntry(subscription, sink, eventPublisher);
Expand All @@ -90,7 +94,7 @@ public Mono<Boolean> unsubscribe() {
return authFacade.getConnectedUser()
.filter(u -> cache.asMap().containsKey(u.id))
.map(u -> {
log.debug("Dispose SSE Subscription for {}", u.self.login);
log.atDebug().addArgument(u.id).log("Dispose SSE Subscription for {}");
cache.invalidate(u.id);
return true;
});
Expand All @@ -101,6 +105,8 @@ public void close() {
this.multicast.tryEmitComplete();
this.cache.invalidateAll();
this.cache.cleanUp();
log.atWarn().addArgument(this.multicast.currentSubscriberCount())
.log("Close multicast notifications channel ({} indisposed subscription(s))!");
}

@Override
Expand Down Expand Up @@ -153,15 +159,15 @@ public long countCacheEntries() {

private void emit(@Nullable Sinks.Many<ServerEvent> sink, ServerEvent event) {
if (sink == null) {
log.debug("No subscriber listening the SSE entry point.");
log.atDebug().log("No subscriber listening the SSE entry point.");
return;
}
EmitResult result = sink.tryEmitNext(event);
if (result.isFailure()) {
if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
log.debug("No subscriber listening the SSE entry point.");
log.atDebug().log("No subscriber listening the SSE entry point.");
} else {
log.warn("{} on emit notification", result);
log.atWarn().addArgument(result).log("{} on emit notification");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package fr.ght1pc9kc.baywatch.notify.domain;

import com.github.f4b6a3.ulid.Ulid;
import fr.ght1pc9kc.baywatch.notify.api.model.BasicEvent;
import fr.ght1pc9kc.baywatch.notify.api.model.EventType;
import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import fr.ght1pc9kc.baywatch.tests.samples.UserSamples;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand All @@ -17,29 +19,36 @@
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import static fr.ght1pc9kc.baywatch.tests.samples.UserSamples.LUKE;
import static fr.ght1pc9kc.baywatch.tests.samples.UserSamples.OBIWAN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class NotifyServiceImplTest {

private NotifyServiceImpl tested;
private final AuthenticationFacade authFacadeMock = mock(AuthenticationFacade.class);
private final NotificationPersistencePort notificationPersistenceMock = mock(NotificationPersistencePort.class);

@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
AuthenticationFacade authFacadeMock = mock(AuthenticationFacade.class);
when(authFacadeMock.getConnectedUser()).thenReturn(
Mono.just(UserSamples.OBIWAN),
Mono.just(UserSamples.LUKE),
Mono.just(UserSamples.OBIWAN),
Mono.just(UserSamples.LUKE),
Mono.just(UserSamples.OBIWAN),
Mono.just(UserSamples.LUKE));
NotificationPersistencePort notificationPersistenceMock = mock(NotificationPersistencePort.class);
Mono.just(OBIWAN),
Mono.just(LUKE),
Mono.just(OBIWAN),
Mono.just(LUKE),
Mono.just(OBIWAN),
Mono.just(LUKE));

when(notificationPersistenceMock.consume(anyString())).thenReturn(Flux.empty());
when(notificationPersistenceMock.persist(any())).thenReturn(Mono.empty());
tested = new NotifyServiceImpl(authFacadeMock, notificationPersistenceMock, Clock.fixed(Instant.parse("2023-12-12T10:10:10Z"), ZoneOffset.UTC));
}

Expand All @@ -49,39 +58,94 @@ void should_broadcast_notification() {
List<ServerEvent> actualLuke = new CopyOnWriteArrayList<>();
List<Throwable> errorsObiwan = new CopyOnWriteArrayList<>();
List<Throwable> errorsLuke = new CopyOnWriteArrayList<>();
AtomicBoolean isObiwanComplete = new AtomicBoolean(false);
AtomicBoolean isLukeComplete = new AtomicBoolean(false);

tested.subscribe().doOnTerminate(() -> isObiwanComplete.set(true)).subscribe(
actualObiwan::add, errorsObiwan::add,
() -> isObiwanComplete.set(true)
);
tested.subscribe().doOnCancel(() -> isLukeComplete.set(true)).subscribe(
actualLuke::add, errorsLuke::add,
() -> isLukeComplete.set(true)
);

Disposable disposableObiwan = tested.subscribe().subscribe(
actualObiwan::add, errorsObiwan::add);
Disposable disposableLuke = tested.subscribe().subscribe(
actualLuke::add, errorsLuke::add);

ServerEvent eventBroadcast = tested.broadcast(EventType.NEWS_UPDATE, 42);
ServerEvent eventUser = tested.send(UserSamples.LUKE.id, EventType.USER_NOTIFICATION, "I'm your father");
tested.unsubscribe().block();
tested.unsubscribe().block();
ServerEvent eventUser = tested.send(LUKE.id, EventType.USER_NOTIFICATION, "I'm your father");

assertThat(disposableObiwan.isDisposed()).isFalse();
assertThat(disposableLuke.isDisposed()).isFalse();

tested.broadcast(EventType.NEWS_UPDATE, Mono.just(66));
StepVerifier.create(tested.unsubscribe()).expectNext(true).verifyComplete();
StepVerifier.create(tested.unsubscribe()).expectNext(true).verifyComplete();

tested.broadcast(EventType.NEWS_UPDATE, 66);
tested.close();

Assertions.assertThat(actualObiwan).containsExactly(eventBroadcast);
Assertions.assertThat(actualLuke).containsExactly(eventBroadcast, eventUser);
assertThat(actualObiwan).containsExactly(eventBroadcast);
assertThat(actualLuke).containsExactly(eventBroadcast, eventUser);

Assertions.assertThat(errorsObiwan).isEmpty();
Assertions.assertThat(errorsLuke).isEmpty();
assertThat(errorsObiwan).isEmpty();
assertThat(errorsLuke).isEmpty();

Assertions.assertThat(isObiwanComplete).isTrue();
Assertions.assertThat(isLukeComplete).isTrue();
disposableObiwan.dispose();
disposableLuke.dispose();
assertThat(disposableObiwan.isDisposed()).isTrue();
assertThat(disposableLuke.isDisposed()).isTrue();
}

@Test
void should_send_notification_without_subscriber() {
void should_broadcast_notification_without_subscriber() {
tested.broadcast(EventType.NEWS_UPDATE, Mono.just(42));
tested.close();
StepVerifier.create(tested.subscribe()).verifyError();
}

@Test
void should_send_notification_to_user() {
when(authFacadeMock.getConnectedUser()).thenReturn(Mono.just(OBIWAN));

List<ServerEvent> actualObiwan = new CopyOnWriteArrayList<>();
List<Throwable> errorsObiwan = new CopyOnWriteArrayList<>();
Disposable disposable = tested.subscribe().subscribe(
actualObiwan::add, errorsObiwan::add);

ServerEvent eventUser = tested.send(UserSamples.OBIWAN.id, EventType.USER_NOTIFICATION, "I'm your father");

tested.close();

assertThat(actualObiwan).containsExactly(eventUser);
assertThat(errorsObiwan).isEmpty();
assertThat(disposable.isDisposed()).isTrue();
}

@Test
void should_get_pending_notifications_on_subscribe() {
when(authFacadeMock.getConnectedUser()).thenReturn(Mono.just(OBIWAN));
BasicEvent<String> event = new BasicEvent<>(Ulid.fast().toString(), EventType.USER_NOTIFICATION, "I'm your father");
when(notificationPersistenceMock.consume(anyString()))
.thenReturn(Flux.just(event));

List<ServerEvent> actualObiwan = new CopyOnWriteArrayList<>();
List<Throwable> errorsObiwan = new CopyOnWriteArrayList<>();
Disposable disposable = tested.subscribe().subscribe(
actualObiwan::add, errorsObiwan::add);

tested.close();

assertThat(actualObiwan).containsExactly(event);
assertThat(errorsObiwan).isEmpty();
assertThat(disposable.isDisposed()).isTrue();
}

@Test
void should_store_notification_when_user_absent() {
BasicEvent<String> event = new BasicEvent<>(Ulid.fast().toString(), EventType.USER_NOTIFICATION, "I'm your father");
when(authFacadeMock.getConnectedUser()).thenReturn(Mono.just(OBIWAN));

tested.send(UserSamples.OBIWAN.id, EventType.USER_NOTIFICATION, "I'm your father");

verify(notificationPersistenceMock).persist(any());

reset(notificationPersistenceMock);
when(notificationPersistenceMock.persist(any())).thenReturn(Mono.just(event));

tested.send(UserSamples.OBIWAN.id, EventType.USER_NOTIFICATION, Mono.just("I'm your father"));

verify(notificationPersistenceMock).persist(any());
}
}

0 comments on commit d97288b

Please sign in to comment.