Skip to content

Commit

Permalink
Merge branch 'feature/190-improve-notification-subscription' into dev…
Browse files Browse the repository at this point in the history
…elop
  • Loading branch information
Marthym committed Dec 12, 2023
2 parents e2cd79f + e99e34c commit 8373b03
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import lombok.Getter;

@Getter
public enum EventType {
NEWS_UPDATE("newsUpdate"),
USER_NOTIFICATION("userNotification"),
NEWS_ADD("newsAdd");

@Getter
private final String name;

EventType(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@

import java.time.Duration;

/**
* Notification to the user. Can be anything targeted by a User.
* Must by a {@link ServerEvent} of type {@link EventType#USER_NOTIFICATION}
*
* @param code The notification code
* @param severity The severity, on the front this define the color of notification
* @param title The Title of the notification
* @param message The message
* @param delay The delay the notification stay displayed before automatic acknowledge, in millis, default 5 000.
* @param actions Possible actions to be done on the target entity. For news :
* <ul>
* <li><strong>V</strong>: View</li>
* <li><strong>S</strong>: Share</li>
* <li><strong>C</strong>: Clip</li>
* </ul>
* @param target The id of the targeted entity of actions
*/
@Builder(toBuilder = true)
public record UserNotification(
@NonNull String code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import fr.ght1pc9kc.baywatch.notify.api.model.ReactiveEvent;
import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import fr.ght1pc9kc.baywatch.notify.domain.model.ByUserEventPublisherCacheEntry;
import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
Expand All @@ -27,12 +28,17 @@

@Slf4j
public class NotifyServiceImpl implements NotifyService, NotifyManager {
private static final String PREFIX = "EV";

private final AuthenticationFacade authFacade;
private final NotificationPersistencePort notificationPersistence;

private final Sinks.Many<ServerEvent> multicast;
private final Cache<String, ByUserEventPublisherCacheEntry> cache;

public NotifyServiceImpl(AuthenticationFacade authenticationFacade) {
public NotifyServiceImpl(
AuthenticationFacade authenticationFacade, NotificationPersistencePort notificationPersistence) {
this.notificationPersistence = notificationPersistence;
this.authFacade = authenticationFacade;
this.multicast = Sinks.many().multicast().directBestEffort();
this.cache = Caffeine.newBuilder()
Expand Down Expand Up @@ -60,7 +66,11 @@ public Flux<ServerEvent> subscribe() {
Sinks.Many<ServerEvent> sink = Sinks.many().multicast().directBestEffort();
AtomicReference<Subscription> subscription = new AtomicReference<>();
Flux<ServerEvent> multicastFlux = this.multicast.asFlux().doOnSubscribe(subscription::set);
Flux<ServerEvent> eventPublisher = Flux.merge(sink.asFlux(), multicastFlux)
Flux<ServerEvent> eventPublisher = Flux.merge(
notificationPersistence.consume(u.id),
sink.asFlux(),
multicastFlux
)
.takeWhile(e -> cache.asMap().containsKey(id))
.map(e -> {
log.debug("Event: {}", e);
Expand Down Expand Up @@ -90,7 +100,7 @@ public void close() {

@Override
public <T> BasicEvent<T> send(String userId, EventType type, T data) {
BasicEvent<T> event = new BasicEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
BasicEvent<T> event = new BasicEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data);
Optional.ofNullable(cache.getIfPresent(userId))
.map(ByUserEventPublisherCacheEntry::sink)
.ifPresent(sk -> emit(sk, event));
Expand All @@ -99,7 +109,7 @@ public <T> BasicEvent<T> send(String userId, EventType type, T data) {

@Override
public <T> ReactiveEvent<T> send(String userId, EventType type, Mono<T> data) {
ReactiveEvent<T> event = new ReactiveEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
ReactiveEvent<T> event = new ReactiveEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data);
Optional.ofNullable(cache.getIfPresent(userId))
.map(ByUserEventPublisherCacheEntry::sink)
.ifPresent(sk -> emit(sk, event));
Expand All @@ -108,14 +118,14 @@ public <T> ReactiveEvent<T> send(String userId, EventType type, Mono<T> data) {

@Override
public <T> BasicEvent<T> broadcast(EventType type, T data) {
BasicEvent<T> event = new BasicEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
BasicEvent<T> event = new BasicEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data);
emit(this.multicast, event);
return event;
}

@Override
public <T> ReactiveEvent<T> broadcast(EventType type, Mono<T> data) {
ReactiveEvent<T> event = new ReactiveEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
ReactiveEvent<T> event = new ReactiveEvent<>(PREFIX + UlidCreator.getMonotonicUlid().toString(), type, data);
emit(this.multicast, event);
return event;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package fr.ght1pc9kc.baywatch.notify.domain.ports;

import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface NotificationPersistencePort {
Mono<ServerEvent> persist(ServerEvent event);

Flux<ServerEvent> consume(String userId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package fr.ght1pc9kc.baywatch.notify.infra.adapters;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapLikeType;
import fr.ght1pc9kc.baywatch.common.infra.DatabaseQualifier;
import fr.ght1pc9kc.baywatch.dsl.tables.records.NotificationsRecord;
import fr.ght1pc9kc.baywatch.notify.api.model.BasicEvent;
import fr.ght1pc9kc.baywatch.notify.api.model.EventType;
import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.util.Map;
import java.util.Optional;

import static fr.ght1pc9kc.baywatch.dsl.tables.Notifications.NOTIFICATIONS;

@Slf4j
@Repository
@RequiredArgsConstructor
public class NotificationPersistenceAdapter implements NotificationPersistencePort {
private final @DatabaseQualifier Scheduler databaseScheduler;
private final DSLContext dsl;
private final ObjectMapper jsonMapper;

@Override
public Mono<ServerEvent> persist(ServerEvent event) {
return null;
}

@Override
@SuppressWarnings({"BlockingMethodInNonBlockingContext", "resource"})
public Flux<ServerEvent> consume(String userId) {
var select = dsl.selectFrom(NOTIFICATIONS).where(NOTIFICATIONS.NOTI_USER_ID.eq(userId));

return Flux.<NotificationsRecord>create(sink -> {
var cursor = select.fetchLazy();
sink.onRequest(n -> {
int count = (int) n;
var rs = cursor.fetchNext(count);
rs.forEach(sink::next);
if (rs.size() < count) {
sink.complete();
}
}).onDispose(cursor::close);
}).limitRate(Integer.MAX_VALUE - 1).subscribeOn(databaseScheduler)
.flatMap(r -> Mono.fromCallable(() ->
dsl.deleteFrom(NOTIFICATIONS)
.where(NOTIFICATIONS.NOTI_ID.eq(r.getNotiId()))
.execute())
.then(Mono.just(r)))
.map(this::buildServerEvent);
}

private ServerEvent buildServerEvent(NotificationsRecord noti) {
MapLikeType dataType = jsonMapper.getTypeFactory().constructMapLikeType(
Map.class, String.class, Object.class);
Map<String, String> message = Optional.ofNullable(noti.getNotiData()).map(data -> {
try {
return jsonMapper.readValue(noti.getNotiData(), dataType);
} catch (Exception e) {
return Map.of("message", data);
}
}).orElse(Map.of());
return new BasicEvent<>(
noti.getNotiId(),
EventType.valueOf(noti.getNotiEventType()),
message);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package fr.ght1pc9kc.baywatch.notify.infra;
package fr.ght1pc9kc.baywatch.notify.infra.adapters;

import fr.ght1pc9kc.baywatch.admin.api.model.Counter;
import fr.ght1pc9kc.baywatch.admin.api.model.CounterGroup;
Expand All @@ -7,6 +7,7 @@
import fr.ght1pc9kc.baywatch.notify.api.NotifyManager;
import fr.ght1pc9kc.baywatch.notify.api.NotifyService;
import fr.ght1pc9kc.baywatch.notify.domain.NotifyServiceImpl;
import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
Expand All @@ -22,8 +23,9 @@ public class NotifyServiceAdapter implements NotifyService, NotifyManager, Count
@Delegate
private final NotifyServiceImpl delegate;

public NotifyServiceAdapter(AuthenticationFacade authFacade, MeterRegistry registry) {
this.delegate = new NotifyServiceImpl(authFacade);
public NotifyServiceAdapter(
AuthenticationFacade authFacade, NotificationPersistencePort notificationPersistence, MeterRegistry registry) {
this.delegate = new NotifyServiceImpl(authFacade, notificationPersistence);

Gauge.builder("bw.session_count", delegate::countCacheEntries)
.baseUnit("gauge")
Expand Down
1 change: 1 addition & 0 deletions sandside/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ spring:
password: hasselhoff
hikari:
data-source-properties.foreign_keys=true: true

# maximum-pool-size: 1
flyway:
baseline-on-migrate: true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table NOTIFICATIONS
(
NOTI_ID VARCHAR(64) NOT NULL PRIMARY KEY,
NOTI_USER_ID VARCHAR(64) NOT NULL,
NOTI_EVENT_TYPE VARCHAR(20) NOT NULL,
NOTI_DATA TEXT,
NOTI_CREATED_AT DATETIME,

constraint FK_NOTI_USER_ID
foreign key (NOTI_USER_ID) references USERS (USER_ID)
);
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import fr.ght1pc9kc.baywatch.notify.api.model.EventType;
import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import fr.ght1pc9kc.baywatch.tests.samples.UserSamples;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -32,7 +35,9 @@ void setUp() {
Mono.just(UserSamples.LUKE),
Mono.just(UserSamples.OBIWAN),
Mono.just(UserSamples.LUKE));
tested = new NotifyServiceImpl(authFacadeMock);
NotificationPersistencePort notificationPersistenceMock = mock(NotificationPersistencePort.class);
when(notificationPersistenceMock.consume(anyString())).thenReturn(Flux.empty());
tested = new NotifyServiceImpl(authFacadeMock, notificationPersistenceMock);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package fr.ght1pc9kc.baywatch.notify.infra.adapters;

import com.fasterxml.jackson.databind.ObjectMapper;
import fr.ght1pc9kc.baywatch.dsl.tables.Notifications;
import fr.ght1pc9kc.baywatch.notify.api.model.EventType;
import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import fr.ght1pc9kc.baywatch.notify.domain.ports.NotificationPersistencePort;
import fr.ght1pc9kc.baywatch.notify.infra.samples.NotificationsRecordSamples;
import fr.ght1pc9kc.baywatch.tests.samples.UserSamples;
import fr.ght1pc9kc.testy.core.extensions.ChainedExtension;
import fr.ght1pc9kc.testy.jooq.WithDslContext;
import fr.ght1pc9kc.testy.jooq.WithInMemoryDatasource;
import fr.ght1pc9kc.testy.jooq.WithSampleDataLoaded;
import org.jooq.DSLContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

class NotificationPersistenceAdapterTest {
private static final WithInMemoryDatasource wDs = WithInMemoryDatasource.builder().build();
private static final WithDslContext wDslContext = WithDslContext.builder()
.setDatasourceExtension(wDs).build();
private static final WithSampleDataLoaded wSamples = WithSampleDataLoaded.builder(wDslContext)
.createTablesIfNotExists()
.addDataset(NotificationsRecordSamples.SAMPLE)
.build();

@RegisterExtension
@SuppressWarnings("unused")
static ChainedExtension chain = ChainedExtension.outer(wDs)
.append(wDslContext)
.append(wSamples)
.register();

private NotificationPersistencePort tested;

@BeforeEach
void setUp(DSLContext dsl) {
tested = new NotificationPersistenceAdapter(Schedulers.immediate(), dsl, new ObjectMapper());
}

@Test
void should_consume_notifications(DSLContext dsl) {
assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero();

Flux<ServerEvent> actuals = tested.consume(UserSamples.OBIWAN.id);

StepVerifier.create(actuals)
.assertNext(actual -> Assertions.assertAll(
() -> assertThat(actual.id()).isEqualTo("EV01HHA6PFESHHFK4YHT1T2HKHSR"),
() -> assertThat(actual.type()).isEqualTo(EventType.USER_NOTIFICATION)
)).verifyComplete();

assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isOne();
}

@Test
void should_consume_dummy_notifications(DSLContext dsl) {
assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero();

Flux<ServerEvent> actuals = tested.consume(UserSamples.MWINDU.id);

StepVerifier.create(actuals)
.assertNext(actual -> Assertions.assertAll(
() -> assertThat(actual.id()).isEqualTo("EV01HHA6PFESHHFK4YHT1T2HKHSQ"),
() -> assertThat(actual.type()).isEqualTo(EventType.USER_NOTIFICATION)
)).verifyComplete();

assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isOne();
}

@Test
void should_not_consume_notifications(DSLContext dsl) {
assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero();

Flux<ServerEvent> actuals = tested.consume(UserSamples.DSIDIOUS.id);

StepVerifier.create(actuals).verifyComplete();

assertThat(dsl.fetchCount(Notifications.NOTIFICATIONS)).isNotZero();
}
}
Loading

0 comments on commit 8373b03

Please sign in to comment.