diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/EventType.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/EventType.java index f1f55271..5a68bfdc 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/EventType.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/EventType.java @@ -2,12 +2,12 @@ import lombok.Getter; +@Getter public enum EventType { NEWS_UPDATE("newsUpdate"), USER_NOTIFICATION("userNotification"), NEWS_ADD("newsAdd"); - @Getter private final String name; EventType(String name) { diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/UserNotification.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/UserNotification.java index 61671d75..542c2431 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/UserNotification.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/api/model/UserNotification.java @@ -6,6 +6,23 @@ import java.time.Duration; +/** + * Notification to the user. Can be anything targeted by a User. + * Must by a {@link ServerEvent} of type {@link EventType#USER_NOTIFICATION} + * + * @param code The notification code + * @param severity The severity, on the front this define the color of notification + * @param title The Title of the notification + * @param message The message + * @param delay The delay the notification stay displayed before automatic acknowledge, in millis, default 5 000. + * @param actions Possible actions to be done on the target entity. For news : + * + * @param target The id of the targeted entity of actions + */ @Builder(toBuilder = true) public record UserNotification( @NonNull String code, diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java index 3f02c020..a7b75464 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImpl.java @@ -10,6 +10,7 @@ import fr.ght1pc9kc.baywatch.notify.api.model.ReactiveEvent; import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent; import fr.ght1pc9kc.baywatch.notify.domain.model.ByUserEventPublisherCacheEntry; +import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort; import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.Nullable; @@ -27,12 +28,17 @@ @Slf4j public class NotifyServiceImpl implements NotifyService, NotifyManager { + private static final String PREFIX = "EV"; + private final AuthenticationFacade authFacade; + private final NotificationPersistencePort notificationPersistence; private final Sinks.Many multicast; private final Cache cache; - public NotifyServiceImpl(AuthenticationFacade authenticationFacade) { + public NotifyServiceImpl( + AuthenticationFacade authenticationFacade, NotificationPersistencePort notificationPersistence) { + this.notificationPersistence = notificationPersistence; this.authFacade = authenticationFacade; this.multicast = Sinks.many().multicast().directBestEffort(); this.cache = Caffeine.newBuilder() @@ -60,7 +66,11 @@ public Flux subscribe() { Sinks.Many sink = Sinks.many().multicast().directBestEffort(); AtomicReference subscription = new AtomicReference<>(); Flux multicastFlux = this.multicast.asFlux().doOnSubscribe(subscription::set); - Flux eventPublisher = Flux.merge(sink.asFlux(), multicastFlux) + Flux eventPublisher = Flux.merge( + notificationPersistence.consume(u.id), + sink.asFlux(), + multicastFlux + ) .takeWhile(e -> cache.asMap().containsKey(id)) .map(e -> { log.debug("Event: {}", e); @@ -90,7 +100,7 @@ public void close() { @Override public BasicEvent send(String userId, EventType type, T data) { - BasicEvent event = new BasicEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data); + BasicEvent event = new BasicEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data); Optional.ofNullable(cache.getIfPresent(userId)) .map(ByUserEventPublisherCacheEntry::sink) .ifPresent(sk -> emit(sk, event)); @@ -99,7 +109,7 @@ public BasicEvent send(String userId, EventType type, T data) { @Override public ReactiveEvent send(String userId, EventType type, Mono data) { - ReactiveEvent event = new ReactiveEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data); + ReactiveEvent event = new ReactiveEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data); Optional.ofNullable(cache.getIfPresent(userId)) .map(ByUserEventPublisherCacheEntry::sink) .ifPresent(sk -> emit(sk, event)); @@ -108,14 +118,14 @@ public ReactiveEvent send(String userId, EventType type, Mono data) { @Override public BasicEvent broadcast(EventType type, T data) { - BasicEvent event = new BasicEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data); + BasicEvent event = new BasicEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data); emit(this.multicast, event); return event; } @Override public ReactiveEvent broadcast(EventType type, Mono data) { - ReactiveEvent event = new ReactiveEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data); + ReactiveEvent event = new ReactiveEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data); emit(this.multicast, event); return event; } diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/ports/NotificationPersistencePort.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/ports/NotificationPersistencePort.java new file mode 100644 index 00000000..a78cfb4f --- /dev/null +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/domain/ports/NotificationPersistencePort.java @@ -0,0 +1,11 @@ +package fr.ght1pc9kc.baywatch.notify.domain.ports; + +import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface NotificationPersistencePort { + Mono persist(ServerEvent event); + + Flux consume(String userId); +} diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotificationPersistenceAdapter.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotificationPersistenceAdapter.java new file mode 100644 index 00000000..77d4eca1 --- /dev/null +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotificationPersistenceAdapter.java @@ -0,0 +1,76 @@ +package fr.ght1pc9kc.baywatch.notify.infra.adapters; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapLikeType; +import fr.ght1pc9kc.baywatch.common.infra.DatabaseQualifier; +import fr.ght1pc9kc.baywatch.dsl.tables.records.NotificationsRecord; +import fr.ght1pc9kc.baywatch.notify.api.model.BasicEvent; +import fr.ght1pc9kc.baywatch.notify.api.model.EventType; +import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent; +import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jooq.DSLContext; +import org.springframework.stereotype.Repository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +import java.util.Map; +import java.util.Optional; + +import static fr.ght1pc9kc.baywatch.dsl.tables.Notifications.NOTIFICATIONS; + +@Slf4j +@Repository +@RequiredArgsConstructor +public class NotificationPersistenceAdapter implements NotificationPersistencePort { + private final @DatabaseQualifier Scheduler databaseScheduler; + private final DSLContext dsl; + private final ObjectMapper jsonMapper; + + @Override + public Mono persist(ServerEvent event) { + return null; + } + + @Override + @SuppressWarnings({"BlockingMethodInNonBlockingContext", "resource"}) + public Flux consume(String userId) { + var select = dsl.selectFrom(NOTIFICATIONS).where(NOTIFICATIONS.NOTI_USER_ID.eq(userId)); + + return Flux.create(sink -> { + var cursor = select.fetchLazy(); + sink.onRequest(n -> { + int count = (int) n; + var rs = cursor.fetchNext(count); + rs.forEach(sink::next); + if (rs.size() < count) { + sink.complete(); + } + }).onDispose(cursor::close); + }).limitRate(Integer.MAX_VALUE - 1).subscribeOn(databaseScheduler) + .flatMap(r -> Mono.fromCallable(() -> + dsl.deleteFrom(NOTIFICATIONS) + .where(NOTIFICATIONS.NOTI_ID.eq(r.getNotiId())) + .execute()) + .then(Mono.just(r))) + .map(this::buildServerEvent); + } + + private ServerEvent buildServerEvent(NotificationsRecord noti) { + MapLikeType dataType = jsonMapper.getTypeFactory().constructMapLikeType( + Map.class, String.class, Object.class); + Map message = Optional.ofNullable(noti.getNotiData()).map(data -> { + try { + return jsonMapper.readValue(noti.getNotiData(), dataType); + } catch (Exception e) { + return Map.of("message", data); + } + }).orElse(Map.of()); + return new BasicEvent<>( + noti.getNotiId(), + EventType.valueOf(noti.getNotiEventType()), + message); + } +} diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/NotifyServiceAdapter.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotifyServiceAdapter.java similarity index 81% rename from sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/NotifyServiceAdapter.java rename to sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotifyServiceAdapter.java index 1edf7065..72607a25 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/NotifyServiceAdapter.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotifyServiceAdapter.java @@ -1,4 +1,4 @@ -package fr.ght1pc9kc.baywatch.notify.infra; +package fr.ght1pc9kc.baywatch.notify.infra.adapters; import fr.ght1pc9kc.baywatch.admin.api.model.Counter; import fr.ght1pc9kc.baywatch.admin.api.model.CounterGroup; @@ -7,6 +7,7 @@ import fr.ght1pc9kc.baywatch.notify.api.NotifyManager; import fr.ght1pc9kc.baywatch.notify.api.NotifyService; import fr.ght1pc9kc.baywatch.notify.domain.NotifyServiceImpl; +import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort; import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; @@ -22,8 +23,9 @@ public class NotifyServiceAdapter implements NotifyService, NotifyManager, Count @Delegate private final NotifyServiceImpl delegate; - public NotifyServiceAdapter(AuthenticationFacade authFacade, MeterRegistry registry) { - this.delegate = new NotifyServiceImpl(authFacade); + public NotifyServiceAdapter( + AuthenticationFacade authFacade, NotificationPersistencePort notificationPersistence, MeterRegistry registry) { + this.delegate = new NotifyServiceImpl(authFacade, notificationPersistence); Gauge.builder("bw.session_count", delegate::countCacheEntries) .baseUnit("gauge") diff --git a/sandside/src/main/resources/application.yaml b/sandside/src/main/resources/application.yaml index c045512f..e5aa8f35 100644 --- a/sandside/src/main/resources/application.yaml +++ b/sandside/src/main/resources/application.yaml @@ -9,6 +9,7 @@ spring: password: hasselhoff hikari: data-source-properties.foreign_keys=true: true + # maximum-pool-size: 1 flyway: baseline-on-migrate: true diff --git a/sandside/src/main/resources/db/migration/V2_1_202312091459__add_notifications.sql b/sandside/src/main/resources/db/migration/V2_1_202312091459__add_notifications.sql new file mode 100644 index 00000000..a76e8e88 --- /dev/null +++ b/sandside/src/main/resources/db/migration/V2_1_202312091459__add_notifications.sql @@ -0,0 +1,11 @@ +create table NOTIFICATIONS +( + NOTI_ID VARCHAR(64) NOT NULL PRIMARY KEY, + NOTI_USER_ID VARCHAR(64) NOT NULL, + NOTI_EVENT_TYPE VARCHAR(20) NOT NULL, + NOTI_DATA TEXT, + NOTI_CREATED_AT DATETIME, + + constraint FK_NOTI_USER_ID + foreign key (NOTI_USER_ID) references USERS (USER_ID) +); \ No newline at end of file diff --git a/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java index 88e5931c..9629cd8d 100644 --- a/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java +++ b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/domain/NotifyServiceImplTest.java @@ -2,11 +2,13 @@ import fr.ght1pc9kc.baywatch.notify.api.model.EventType; import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent; +import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort; import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade; import fr.ght1pc9kc.baywatch.tests.samples.UserSamples; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -14,6 +16,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,7 +35,9 @@ void setUp() { Mono.just(UserSamples.LUKE), Mono.just(UserSamples.OBIWAN), Mono.just(UserSamples.LUKE)); - tested = new NotifyServiceImpl(authFacadeMock); + NotificationPersistencePort notificationPersistenceMock = mock(NotificationPersistencePort.class); + when(notificationPersistenceMock.consume(anyString())).thenReturn(Flux.empty()); + tested = new NotifyServiceImpl(authFacadeMock, notificationPersistenceMock); } @Test diff --git a/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotificationPersistenceAdapterTest.java b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotificationPersistenceAdapterTest.java new file mode 100644 index 00000000..96b7d73e --- /dev/null +++ b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/infra/adapters/NotificationPersistenceAdapterTest.java @@ -0,0 +1,88 @@ +package fr.ght1pc9kc.baywatch.notify.infra.adapters; + +import com.fasterxml.jackson.databind.ObjectMapper; +import fr.ght1pc9kc.baywatch.dsl.tables.Notifications; +import fr.ght1pc9kc.baywatch.notify.api.model.EventType; +import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent; +import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort; +import fr.ght1pc9kc.baywatch.notify.infra.samples.NotificationsRecordSamples; +import fr.ght1pc9kc.baywatch.tests.samples.UserSamples; +import fr.ght1pc9kc.testy.core.extensions.ChainedExtension; +import fr.ght1pc9kc.testy.jooq.WithDslContext; +import fr.ght1pc9kc.testy.jooq.WithInMemoryDatasource; +import fr.ght1pc9kc.testy.jooq.WithSampleDataLoaded; +import org.jooq.DSLContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +class NotificationPersistenceAdapterTest { + private static final WithInMemoryDatasource wDs = WithInMemoryDatasource.builder().build(); + private static final WithDslContext wDslContext = WithDslContext.builder() + .setDatasourceExtension(wDs).build(); + private static final WithSampleDataLoaded wSamples = WithSampleDataLoaded.builder(wDslContext) + .createTablesIfNotExists() + .addDataset(NotificationsRecordSamples.SAMPLE) + .build(); + + @RegisterExtension + @SuppressWarnings("unused") + static ChainedExtension chain = ChainedExtension.outer(wDs) + .append(wDslContext) + .append(wSamples) + .register(); + + private NotificationPersistencePort tested; + + @BeforeEach + void setUp(DSLContext dsl) { + tested = new NotificationPersistenceAdapter(Schedulers.immediate(), dsl, new ObjectMapper()); + } + + @Test + void should_consume_notifications(DSLContext dsl) { + assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero(); + + Flux actuals = tested.consume(UserSamples.OBIWAN.id); + + StepVerifier.create(actuals) + .assertNext(actual -> Assertions.assertAll( + () -> assertThat(actual.id()).isEqualTo("EV01HHA6PFESHHFK4YHT1T2HKHSR"), + () -> assertThat(actual.type()).isEqualTo(EventType.USER_NOTIFICATION) + )).verifyComplete(); + + assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isOne(); + } + + @Test + void should_consume_dummy_notifications(DSLContext dsl) { + assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero(); + + Flux actuals = tested.consume(UserSamples.MWINDU.id); + + StepVerifier.create(actuals) + .assertNext(actual -> Assertions.assertAll( + () -> assertThat(actual.id()).isEqualTo("EV01HHA6PFESHHFK4YHT1T2HKHSQ"), + () -> assertThat(actual.type()).isEqualTo(EventType.USER_NOTIFICATION) + )).verifyComplete(); + + assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isOne(); + } + + @Test + void should_not_consume_notifications(DSLContext dsl) { + assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero(); + + Flux actuals = tested.consume(UserSamples.DSIDIOUS.id); + + StepVerifier.create(actuals).verifyComplete(); + + assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero(); + } +} \ No newline at end of file diff --git a/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/infra/samples/NotificationsRecordSamples.java b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/infra/samples/NotificationsRecordSamples.java new file mode 100644 index 00000000..0d4acbe2 --- /dev/null +++ b/sandside/src/test/java/fr/ght1pc9kc/baywatch/notify/infra/samples/NotificationsRecordSamples.java @@ -0,0 +1,56 @@ +package fr.ght1pc9kc.baywatch.notify.infra.samples; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import fr.ght1pc9kc.baywatch.dsl.tables.records.NotificationsRecord; +import fr.ght1pc9kc.baywatch.notify.api.model.EventType; +import fr.ght1pc9kc.baywatch.notify.api.model.Severity; +import fr.ght1pc9kc.baywatch.notify.api.model.UserNotification; +import fr.ght1pc9kc.baywatch.tests.samples.FeedSamples; +import fr.ght1pc9kc.baywatch.tests.samples.UserSamples; +import fr.ght1pc9kc.testy.jooq.model.RelationalDataSet; + +import java.time.LocalDateTime; +import java.util.List; + +import static fr.ght1pc9kc.baywatch.dsl.tables.Notifications.NOTIFICATIONS; + +public class NotificationsRecordSamples implements RelationalDataSet { + public static final NotificationsRecordSamples SAMPLE = new NotificationsRecordSamples(); + + public static final UserNotification USER_NOTIFICATION = UserNotification.builder() + .code(UserNotification.CODE_NEWS_ADD) + .severity(Severity.info) + .title("The new light saber was arrived") + .message("Hello your new light saber was arrived, you can go to get it !") + .actions("VSC") + .target(FeedSamples.JEDI.id) + .build(); + + public static final NotificationsRecord DUMMY_NOTIFICATION_RECORD = NOTIFICATIONS.newRecord() + .setNotiId("EV01HHA6PFESHHFK4YHT1T2HKHSQ") + .setNotiUserId(UserSamples.MWINDU.id) + .setNotiEventType(EventType.USER_NOTIFICATION.name()) + .setNotiData("String") + .setNotiCreatedAt(LocalDateTime.now()); + public static final NotificationsRecord USER_NOTIFICATIONS_RECORD; + + static { + ObjectMapper json = new ObjectMapper(); + try { + USER_NOTIFICATIONS_RECORD = NOTIFICATIONS.newRecord() + .setNotiId("EV01HHA6PFESHHFK4YHT1T2HKHSR") + .setNotiUserId(UserSamples.OBIWAN.id) + .setNotiEventType(EventType.USER_NOTIFICATION.name()) + .setNotiData(json.writeValueAsString(USER_NOTIFICATION)) + .setNotiCreatedAt(LocalDateTime.now()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public List records() { + return List.of(USER_NOTIFICATIONS_RECORD, DUMMY_NOTIFICATION_RECORD); + } +}