diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/api/OpmlService.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/api/OpmlService.java index 7c37e9ce..51667ac1 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/api/OpmlService.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/api/OpmlService.java @@ -1,13 +1,12 @@ package fr.ght1pc9kc.baywatch.opml.api; -import org.springframework.core.io.buffer.DataBuffer; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.io.InputStream; +import java.util.function.Supplier; public interface OpmlService { Mono opmlExport(); - Mono opmlImport(Flux data); + Mono opmlImport(Supplier inputSupplier); } diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImpl.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImpl.java index 3d4a958c..0d190219 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImpl.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImpl.java @@ -1,18 +1,16 @@ package fr.ght1pc9kc.baywatch.opml.domain; import com.machinezoo.noexception.Exceptions; +import fr.ght1pc9kc.baywatch.common.domain.QueryContext; import fr.ght1pc9kc.baywatch.opml.api.OpmlService; import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade; import fr.ght1pc9kc.baywatch.security.api.model.User; import fr.ght1pc9kc.baywatch.security.domain.exceptions.UnauthenticatedUser; import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed; -import fr.ght1pc9kc.baywatch.common.domain.QueryContext; import fr.ght1pc9kc.baywatch.techwatch.infra.persistence.FeedRepository; import fr.ght1pc9kc.entity.api.Entity; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -20,6 +18,7 @@ import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.util.function.Supplier; @Slf4j @RequiredArgsConstructor @@ -47,25 +46,17 @@ public Mono opmlExport() { } @Override - public Mono opmlImport(Flux data) { - log.debug("Start importing..."); + public Mono opmlImport(Supplier inputSupplier) { return authFacade.getConnectedUser() .switchIfEmpty(Mono.error(new UnauthenticatedUser("Authentication not found !"))) .flatMapMany(Exceptions.wrap().function(owner -> { - PipedOutputStream pos = new PipedOutputStream(); - PipedInputStream pis = new PipedInputStream(pos); - Flux> feeds = readOpml(pis); - Mono> db = DataBufferUtils.write(data, pos) - .map(DataBufferUtils::release) - .doOnTerminate(Exceptions.wrap().runnable(() -> { - pos.flush(); - pos.close(); - })) - .then(Mono.empty()); - return Flux.merge(db, feeds) + InputStream is = inputSupplier.get(); + return readOpml(is) .buffer(100) .flatMap(f -> feedRepository.persist(f).collectList()) - .flatMap(f -> feedRepository.persistUserRelation(owner.id(), f)); + .flatMap(f -> feedRepository.persistUserRelation(owner.id(), f)) + .doOnTerminate(Exceptions.wrap().runnable(is::close)); + })).then(); } diff --git a/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/infra/OpmlController.java b/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/infra/OpmlController.java index d0b0f6f5..ec72b67e 100644 --- a/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/infra/OpmlController.java +++ b/sandside/src/main/java/fr/ght1pc9kc/baywatch/opml/infra/OpmlController.java @@ -1,11 +1,13 @@ package fr.ght1pc9kc.baywatch.opml.infra; +import com.machinezoo.noexception.Exceptions; import fr.ght1pc9kc.baywatch.opml.api.OpmlService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; @@ -20,9 +22,13 @@ import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; @@ -34,6 +40,7 @@ public class OpmlController { private final OpmlService opmlService; + private final Scheduler uploadReader = Schedulers.boundedElastic(); @ResponseBody @GetMapping("/export/baywatch.opml") @@ -51,8 +58,19 @@ public Mono> exportOpml() { } @PostMapping("/import") + @SuppressWarnings("CallingSubscribeInNonBlockingScope") public Mono importOpml(@RequestPart("opml") Mono opmlFilePart) { Flux data = opmlFilePart.flatMapMany(Part::content); - return opmlService.opmlImport(data); + + PipedOutputStream pos = new PipedOutputStream(); + DataBufferUtils.write(data, pos) + .doOnTerminate(Exceptions.wrap().runnable(pos::close)) + .subscribe( + DataBufferUtils.releaseConsumer(), + t -> log.atError().log("STACKTRACE", t) + ); + + return opmlService.opmlImport(Exceptions.wrap().supplier(() -> new PipedInputStream(pos))) + .subscribeOn(uploadReader); } -} +} \ No newline at end of file diff --git a/sandside/src/test/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImplTest.java b/sandside/src/test/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImplTest.java new file mode 100644 index 00000000..5703680e --- /dev/null +++ b/sandside/src/test/java/fr/ght1pc9kc/baywatch/opml/domain/OpmlServiceImplTest.java @@ -0,0 +1,75 @@ +package fr.ght1pc9kc.baywatch.opml.domain; + +import fr.ght1pc9kc.baywatch.common.domain.QueryContext; +import fr.ght1pc9kc.baywatch.opml.api.OpmlService; +import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade; +import fr.ght1pc9kc.baywatch.security.infra.adapters.SpringAuthenticationContext; +import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed; +import fr.ght1pc9kc.baywatch.techwatch.infra.persistence.FeedRepository; +import fr.ght1pc9kc.baywatch.tests.samples.FeedSamples; +import fr.ght1pc9kc.baywatch.tests.samples.UserSamples; +import fr.ght1pc9kc.entity.api.Entity; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.nio.charset.StandardCharsets; +import java.util.Collection; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +class OpmlServiceImplTest { + + private OpmlService tested; + + private AuthenticationFacade authenticationFacadeMock; + private FeedRepository feedRepositoryMock; + + @BeforeEach + void setUp() { + feedRepositoryMock = mock(FeedRepository.class); + doReturn(Flux.just(FeedSamples.JEDI)).when(feedRepositoryMock).list(any(QueryContext.class)); + doAnswer(answer -> Flux.fromIterable(answer.getArgument(0))).when(feedRepositoryMock).persist(any()); + doAnswer(answer -> Flux.fromIterable(answer.getArgument(1))).when(feedRepositoryMock).persistUserRelation(anyString(), any()); + + authenticationFacadeMock = spy(new SpringAuthenticationContext()); + tested = new OpmlServiceImpl(feedRepositoryMock, authenticationFacadeMock, OpmlWriter::new, OpmlReader::new); + } + + @Test + void should_export_opml() { + doReturn(Mono.just(UserSamples.OBIWAN)).when(authenticationFacadeMock).getConnectedUser(); + + StepVerifier.create(tested.opmlExport()) + .assertNext(actual -> Assertions.assertThat(actual).asString(StandardCharsets.UTF_8) + .contains("Obiwan Kenobi") + .contains("Baywatch OPML export") + .containsIgnoringNewLines(""" + + """)) + .verifyComplete(); + } + + @Test + @SuppressWarnings("unchecked") + void should_import_opml() { + doReturn(Mono.just(UserSamples.OBIWAN)).when(authenticationFacadeMock).getConnectedUser(); + + StepVerifier.create(tested.opmlImport(() -> OpmlServiceImplTest.class.getResourceAsStream("okenobi.xml"))) + .verifyComplete(); + + ArgumentCaptor>> captor = ArgumentCaptor.forClass(Collection.class); + verify(feedRepositoryMock).persist(captor.capture()); + Assertions.assertThat(captor.getValue().size()).isEqualTo(30); + } +} \ No newline at end of file diff --git a/sandside/src/test/resources/fr/ght1pc9kc/baywatch/opml/domain/okenobi.xml b/sandside/src/test/resources/fr/ght1pc9kc/baywatch/opml/domain/okenobi.xml new file mode 100644 index 00000000..531402e9 --- /dev/null +++ b/sandside/src/test/resources/fr/ght1pc9kc/baywatch/opml/domain/okenobi.xml @@ -0,0 +1,72 @@ + + + + Baywatch OPML export + Sun, 28 Apr 2024 17:47:47 GMT + Fred + marthym@gmail.com + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file