diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java index ca4f7d31..1351e7bb 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java @@ -49,8 +49,9 @@ public NotifyServiceImpl( this.cache = Caffeine.newBuilder() .expireAfterAccess(Duration.ofMinutes(30)) .maximumSize(1000) - .evictionListener((key, value, cause) -> { + .removalListener((key, value, cause) -> { if (value != null) { + log.atTrace().addArgument(key).log("Remove {} from the cache"); value.sink().tryEmitComplete(); Subscription subscription = value.subscription().getAndSet(null); if (subscription != null) { @@ -68,9 +69,12 @@ public Flux subscribe() { } return authFacade.getConnectedUser().flatMapMany(u -> Objects.requireNonNull(cache.get(u.id, id -> { - Sinks.Many sink = Sinks.many().multicast().directBestEffort(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); AtomicReference subscription = new AtomicReference<>(); - Flux multicastFlux = this.multicast.asFlux().doOnSubscribe(subscription::set); + Flux multicastFlux = this.multicast.asFlux() + .doOnSubscribe(subscription::set); + log.atDebug().addArgument(u.id) + .log("Subscribe notification for {}"); Flux eventPublisher = Flux.merge( notificationPersistence.consume(u.id), sink.asFlux(), @@ -78,7 +82,7 @@ public Flux subscribe() { ) .takeWhile(e -> cache.asMap().containsKey(id)) .map(e -> { - log.debug("Event: {}", e); + log.atDebug().addArgument(u.id).addArgument(e).log("{} receive Event: {}"); return e; }).cache(0); return new ByUserEventPublisherCacheEntry(subscription, sink, eventPublisher); @@ -90,7 +94,7 @@ public Mono unsubscribe() { return authFacade.getConnectedUser() .filter(u -> cache.asMap().containsKey(u.id)) .map(u -> { - log.debug("Dispose SSE Subscription for {}", u.self.login); + log.atDebug().addArgument(u.id).log("Dispose SSE Subscription for {}"); cache.invalidate(u.id); return true; }); @@ -101,6 +105,8 @@ public void close() { this.multicast.tryEmitComplete(); this.cache.invalidateAll(); this.cache.cleanUp(); + log.atWarn().addArgument(this.multicast.currentSubscriberCount()) + .log("Close multicast notifications channel ({} indisposed subscription(s))!"); } @Override @@ -153,15 +159,15 @@ public long countCacheEntries() { private void emit(@Nullable Sinks.Many sink, ServerEvent event) { if (sink == null) { - log.debug("No subscriber listening the SSE entry point."); + log.atDebug().log("No subscriber listening the SSE entry point."); return; } EmitResult result = sink.tryEmitNext(event); if (result.isFailure()) { if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { - log.debug("No subscriber listening the SSE entry point."); + log.atDebug().log("No subscriber listening the SSE entry point."); } else { - log.warn("{} on emit notification", result); + log.atWarn().addArgument(result).log("{} on emit notification"); } } } diff --git a/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java index 0ba7b66a..e4037d1b 100644 --- a/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java +++ b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java @@ -1,13 +1,15 @@ package fr.ght1pc9kc.baywatch.notify.domain; +import com.github.f4b6a3.ulid.Ulid; +import fr.ght1pc9kc.baywatch.notify.api.model.BasicEvent; import fr.ght1pc9kc.baywatch.notify.api.model.EventType; import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent; import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort; import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade; import fr.ght1pc9kc.baywatch.tests.samples.UserSamples; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -17,29 +19,36 @@ import java.time.ZoneOffset; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import static fr.ght1pc9kc.baywatch.tests.samples.UserSamples.LUKE; +import static fr.ght1pc9kc.baywatch.tests.samples.UserSamples.OBIWAN; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class NotifyServiceImplTest { private NotifyServiceImpl tested; + private final AuthenticationFacade authFacadeMock = mock(AuthenticationFacade.class); + private final NotificationPersistencePort notificationPersistenceMock = mock(NotificationPersistencePort.class); @BeforeEach @SuppressWarnings("unchecked") void setUp() { - AuthenticationFacade authFacadeMock = mock(AuthenticationFacade.class); when(authFacadeMock.getConnectedUser()).thenReturn( - Mono.just(UserSamples.OBIWAN), - Mono.just(UserSamples.LUKE), - Mono.just(UserSamples.OBIWAN), - Mono.just(UserSamples.LUKE), - Mono.just(UserSamples.OBIWAN), - Mono.just(UserSamples.LUKE)); - NotificationPersistencePort notificationPersistenceMock = mock(NotificationPersistencePort.class); + Mono.just(OBIWAN), + Mono.just(LUKE), + Mono.just(OBIWAN), + Mono.just(LUKE), + Mono.just(OBIWAN), + Mono.just(LUKE)); + when(notificationPersistenceMock.consume(anyString())).thenReturn(Flux.empty()); + when(notificationPersistenceMock.persist(any())).thenReturn(Mono.empty()); tested = new NotifyServiceImpl(authFacadeMock, notificationPersistenceMock, Clock.fixed(Instant.parse("2023-12-12T10:10:10Z"), ZoneOffset.UTC)); } @@ -49,39 +58,94 @@ void should_broadcast_notification() { List actualLuke = new CopyOnWriteArrayList<>(); List errorsObiwan = new CopyOnWriteArrayList<>(); List errorsLuke = new CopyOnWriteArrayList<>(); - AtomicBoolean isObiwanComplete = new AtomicBoolean(false); - AtomicBoolean isLukeComplete = new AtomicBoolean(false); - - tested.subscribe().doOnTerminate(() -> isObiwanComplete.set(true)).subscribe( - actualObiwan::add, errorsObiwan::add, - () -> isObiwanComplete.set(true) - ); - tested.subscribe().doOnCancel(() -> isLukeComplete.set(true)).subscribe( - actualLuke::add, errorsLuke::add, - () -> isLukeComplete.set(true) - ); + + Disposable disposableObiwan = tested.subscribe().subscribe( + actualObiwan::add, errorsObiwan::add); + Disposable disposableLuke = tested.subscribe().subscribe( + actualLuke::add, errorsLuke::add); + ServerEvent eventBroadcast = tested.broadcast(EventType.NEWS_UPDATE, 42); - ServerEvent eventUser = tested.send(UserSamples.LUKE.id, EventType.USER_NOTIFICATION, "I'm your father"); - tested.unsubscribe().block(); - tested.unsubscribe().block(); + ServerEvent eventUser = tested.send(LUKE.id, EventType.USER_NOTIFICATION, "I'm your father"); + + assertThat(disposableObiwan.isDisposed()).isFalse(); + assertThat(disposableLuke.isDisposed()).isFalse(); - tested.broadcast(EventType.NEWS_UPDATE, Mono.just(66)); + StepVerifier.create(tested.unsubscribe()).expectNext(true).verifyComplete(); + StepVerifier.create(tested.unsubscribe()).expectNext(true).verifyComplete(); + + tested.broadcast(EventType.NEWS_UPDATE, 66); tested.close(); - Assertions.assertThat(actualObiwan).containsExactly(eventBroadcast); - Assertions.assertThat(actualLuke).containsExactly(eventBroadcast, eventUser); + assertThat(actualObiwan).containsExactly(eventBroadcast); + assertThat(actualLuke).containsExactly(eventBroadcast, eventUser); - Assertions.assertThat(errorsObiwan).isEmpty(); - Assertions.assertThat(errorsLuke).isEmpty(); + assertThat(errorsObiwan).isEmpty(); + assertThat(errorsLuke).isEmpty(); - Assertions.assertThat(isObiwanComplete).isTrue(); - Assertions.assertThat(isLukeComplete).isTrue(); + disposableObiwan.dispose(); + disposableLuke.dispose(); + assertThat(disposableObiwan.isDisposed()).isTrue(); + assertThat(disposableLuke.isDisposed()).isTrue(); } @Test - void should_send_notification_without_subscriber() { + void should_broadcast_notification_without_subscriber() { tested.broadcast(EventType.NEWS_UPDATE, Mono.just(42)); tested.close(); StepVerifier.create(tested.subscribe()).verifyError(); } + + @Test + void should_send_notification_to_user() { + when(authFacadeMock.getConnectedUser()).thenReturn(Mono.just(OBIWAN)); + + List actualObiwan = new CopyOnWriteArrayList<>(); + List errorsObiwan = new CopyOnWriteArrayList<>(); + Disposable disposable = tested.subscribe().subscribe( + actualObiwan::add, errorsObiwan::add); + + ServerEvent eventUser = tested.send(UserSamples.OBIWAN.id, EventType.USER_NOTIFICATION, "I'm your father"); + + tested.close(); + + assertThat(actualObiwan).containsExactly(eventUser); + assertThat(errorsObiwan).isEmpty(); + assertThat(disposable.isDisposed()).isTrue(); + } + + @Test + void should_get_pending_notifications_on_subscribe() { + when(authFacadeMock.getConnectedUser()).thenReturn(Mono.just(OBIWAN)); + BasicEvent event = new BasicEvent<>(Ulid.fast().toString(), EventType.USER_NOTIFICATION, "I'm your father"); + when(notificationPersistenceMock.consume(anyString())) + .thenReturn(Flux.just(event)); + + List actualObiwan = new CopyOnWriteArrayList<>(); + List errorsObiwan = new CopyOnWriteArrayList<>(); + Disposable disposable = tested.subscribe().subscribe( + actualObiwan::add, errorsObiwan::add); + + tested.close(); + + assertThat(actualObiwan).containsExactly(event); + assertThat(errorsObiwan).isEmpty(); + assertThat(disposable.isDisposed()).isTrue(); + } + + @Test + void should_store_notification_when_user_absent() { + BasicEvent event = new BasicEvent<>(Ulid.fast().toString(), EventType.USER_NOTIFICATION, "I'm your father"); + when(authFacadeMock.getConnectedUser()).thenReturn(Mono.just(OBIWAN)); + + tested.send(UserSamples.OBIWAN.id, EventType.USER_NOTIFICATION, "I'm your father"); + + verify(notificationPersistenceMock).persist(any()); + + reset(notificationPersistenceMock); + when(notificationPersistenceMock.persist(any())).thenReturn(Mono.just(event)); + + tested.send(UserSamples.OBIWAN.id, EventType.USER_NOTIFICATION, Mono.just("I'm your father")); + + verify(notificationPersistenceMock).persist(any()); + } } \ No newline at end of file