Skip to content

Commit

Permalink
feat(sandside): #224 refactor OPML loader, improve architecture
Browse files Browse the repository at this point in the history
avoid use Spring DataBuffer in domain
fix PipedOutputstream close
and unit tests
  • Loading branch information
Marthym committed Oct 6, 2024
1 parent 428373a commit ff781ee
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<nimbus-jose-jwt.version>9.40</nimbus-jose-jwt.version>
<noexception.version>1.9.1</noexception.version>
<scraphead.version>1.4.1</scraphead.version>
<testy-box.version>1.7.1</testy-box.version>
<testy-box.version>1.7.2</testy-box.version>
<ulid-creator.version>5.2.3</ulid-creator.version>
<unbescape.version>1.1.6.RELEASE</unbescape.version>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package fr.ght1pc9kc.baywatch.opml.api;

import org.springframework.core.io.buffer.DataBuffer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.InputStream;
import java.util.function.Supplier;

public interface OpmlService {
Mono<InputStream> opmlExport();

Mono<Void> opmlImport(Flux<DataBuffer> data);
Mono<Void> opmlImport(Supplier<InputStream> inputSupplier);
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package fr.ght1pc9kc.baywatch.opml.domain;

import com.machinezoo.noexception.Exceptions;
import fr.ght1pc9kc.baywatch.common.domain.QueryContext;
import fr.ght1pc9kc.baywatch.opml.api.OpmlService;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import fr.ght1pc9kc.baywatch.security.api.model.User;
import fr.ght1pc9kc.baywatch.security.domain.exceptions.UnauthenticatedUser;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import fr.ght1pc9kc.baywatch.common.domain.QueryContext;
import fr.ght1pc9kc.baywatch.techwatch.infra.persistence.FeedRepository;
import fr.ght1pc9kc.entity.api.Entity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.function.Supplier;

@Slf4j
@RequiredArgsConstructor
Expand Down Expand Up @@ -47,25 +46,17 @@ public Mono<InputStream> opmlExport() {
}

@Override
public Mono<Void> opmlImport(Flux<DataBuffer> data) {
log.debug("Start importing...");
public Mono<Void> opmlImport(Supplier<InputStream> inputSupplier) {
return authFacade.getConnectedUser()
.switchIfEmpty(Mono.error(new UnauthenticatedUser("Authentication not found !")))
.flatMapMany(Exceptions.wrap().function(owner -> {
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
Flux<Entity<WebFeed>> feeds = readOpml(pis);
Mono<Entity<WebFeed>> db = DataBufferUtils.write(data, pos)
.map(DataBufferUtils::release)
.doOnTerminate(Exceptions.wrap().runnable(() -> {
pos.flush();
pos.close();
}))
.then(Mono.empty());
return Flux.merge(db, feeds)
InputStream is = inputSupplier.get();
return readOpml(is)
.buffer(100)
.flatMap(f -> feedRepository.persist(f).collectList())
.flatMap(f -> feedRepository.persistUserRelation(owner.id(), f));
.flatMap(f -> feedRepository.persistUserRelation(owner.id(), f))
.doOnTerminate(Exceptions.wrap().runnable(is::close));

})).then();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package fr.ght1pc9kc.baywatch.opml.infra;

import com.machinezoo.noexception.Exceptions;
import fr.ght1pc9kc.baywatch.opml.api.OpmlService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
Expand All @@ -20,9 +22,13 @@
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;

Expand All @@ -34,6 +40,7 @@
public class OpmlController {

private final OpmlService opmlService;
private final Scheduler uploadReader = Schedulers.boundedElastic();

@ResponseBody
@GetMapping("/export/baywatch.opml")
Expand All @@ -51,8 +58,19 @@ public Mono<ResponseEntity<Resource>> exportOpml() {
}

@PostMapping("/import")
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
public Mono<Void> importOpml(@RequestPart("opml") Mono<FilePart> opmlFilePart) {
Flux<DataBuffer> data = opmlFilePart.flatMapMany(Part::content);
return opmlService.opmlImport(data);

PipedOutputStream pos = new PipedOutputStream();
DataBufferUtils.write(data, pos)
.doOnTerminate(Exceptions.wrap().runnable(pos::close))
.subscribe(
DataBufferUtils.releaseConsumer(),
t -> log.atError().log("STACKTRACE", t)
);

return opmlService.opmlImport(Exceptions.wrap().supplier(() -> new PipedInputStream(pos)))
.subscribeOn(uploadReader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public Flux<Entity<Map<FeedProperties, String>>> getFeedProperties(
}
if (nonNull(properties)) {
if (properties.size() == 1) {
FEEDS_USERS_PROPERTIES.FUPR_PROPERTY_NAME.eq(properties.iterator().next().name());
query.addConditions(FEEDS_USERS_PROPERTIES.FUPR_PROPERTY_NAME.eq(properties.iterator().next().name()));
} else if (!properties.isEmpty()) {
FEEDS_USERS_PROPERTIES.FUPR_PROPERTY_NAME.in(properties.stream().map(FeedProperties::name).toList());
query.addConditions(FEEDS_USERS_PROPERTIES.FUPR_PROPERTY_NAME.in(properties.stream().map(FeedProperties::name).toList()));
}
}
query.addOrderBy(FEEDS_USERS_PROPERTIES.FUPR_USER_ID, FEEDS_USERS_PROPERTIES.FUPR_FEED_ID);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package fr.ght1pc9kc.baywatch.opml.domain;

import fr.ght1pc9kc.baywatch.common.domain.QueryContext;
import fr.ght1pc9kc.baywatch.opml.api.OpmlService;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import fr.ght1pc9kc.baywatch.security.infra.adapters.SpringAuthenticationContext;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import fr.ght1pc9kc.baywatch.techwatch.infra.persistence.FeedRepository;
import fr.ght1pc9kc.baywatch.tests.samples.FeedSamples;
import fr.ght1pc9kc.baywatch.tests.samples.UserSamples;
import fr.ght1pc9kc.entity.api.Entity;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
import java.util.Collection;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

class OpmlServiceImplTest {

private OpmlService tested;

private AuthenticationFacade authenticationFacadeMock;
private FeedRepository feedRepositoryMock;

@BeforeEach
void setUp() {
feedRepositoryMock = mock(FeedRepository.class);
doReturn(Flux.just(FeedSamples.JEDI)).when(feedRepositoryMock).list(any(QueryContext.class));
doAnswer(answer -> Flux.fromIterable(answer.getArgument(0))).when(feedRepositoryMock).persist(any());
doAnswer(answer -> Flux.fromIterable(answer.getArgument(1))).when(feedRepositoryMock).persistUserRelation(anyString(), any());

authenticationFacadeMock = spy(new SpringAuthenticationContext());
tested = new OpmlServiceImpl(feedRepositoryMock, authenticationFacadeMock, OpmlWriter::new, OpmlReader::new);
}

@Test
void should_export_opml() {
doReturn(Mono.just(UserSamples.OBIWAN)).when(authenticationFacadeMock).getConnectedUser();

StepVerifier.create(tested.opmlExport())
.assertNext(actual -> Assertions.assertThat(actual).asString(StandardCharsets.UTF_8)
.contains("<ownerName>Obiwan Kenobi</ownerName>")
.contains("<title>Baywatch OPML export</title>")
.containsIgnoringNewLines("""
<outline text="Jedi Feed" type="rss" xmlUrl="https://www.jedi.com/" title="Jedi Feed" category=""/>
"""))
.verifyComplete();
}

@Test
@SuppressWarnings("unchecked")
void should_import_opml() {
doReturn(Mono.just(UserSamples.OBIWAN)).when(authenticationFacadeMock).getConnectedUser();

StepVerifier.create(tested.opmlImport(() -> OpmlServiceImplTest.class.getResourceAsStream("okenobi.xml")))
.verifyComplete();

ArgumentCaptor<Collection<Entity<WebFeed>>> captor = ArgumentCaptor.forClass(Collection.class);
verify(feedRepositoryMock).persist(captor.capture());
Assertions.assertThat(captor.getValue()).hasSize(30);
}
}
Loading

0 comments on commit ff781ee

Please sign in to comment.