Skip to content

Commit

Permalink
Merge branch 'feature/159-avoid-plus-on-search-feeds-result-for-mine'…
Browse files Browse the repository at this point in the history
… into develop
  • Loading branch information
Marthym committed Oct 22, 2023
2 parents 6f44ec6 + 30dc287 commit 61f29ae
Show file tree
Hide file tree
Showing 53 changed files with 512 additions and 459 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package fr.ght1pc9kc.baywatch.common.infra.mappers;

import fr.ght1pc9kc.baywatch.common.api.model.Entity;
import fr.ght1pc9kc.baywatch.common.domain.DateUtils;
import fr.ght1pc9kc.baywatch.common.domain.Hasher;
import fr.ght1pc9kc.baywatch.dsl.tables.records.FeedsRecord;
import fr.ght1pc9kc.baywatch.dsl.tables.records.FeedsUsersRecord;
import fr.ght1pc9kc.baywatch.dsl.tables.records.NewsRecord;
import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.News;
import fr.ght1pc9kc.baywatch.techwatch.api.model.RawFeed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.RawNews;
import fr.ght1pc9kc.baywatch.techwatch.api.model.State;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import org.jooq.Record;
import org.jooq.tools.StringUtils;
import org.mapstruct.Mapper;
Expand Down Expand Up @@ -89,44 +89,40 @@ default News recordToNews(Record r) {
.build();
}

default RawFeed recordToRawFeed(Record r) {
return RawFeed.builder()
.id(r.get(FEEDS.FEED_ID))
.url(URI.create(r.get(FEEDS.FEED_URL)))
.name(r.get(FEEDS.FEED_NAME))
.description(r.get(FEEDS.FEED_DESCRIPTION))
.lastWatch(DateUtils.toInstant(r.get(FEEDS.FEED_LAST_WATCH)))
.build();
}

default Feed recordToFeed(Record r) {
RawFeed raw = recordToRawFeed(r);
default Entity<WebFeed> recordToFeed(Record r) {
Set<String> tags = (r.indexOf(FEEDS_USERS.FEUS_TAGS) < 0)
? Set.of()
: Optional.ofNullable(r.get(FEEDS_USERS.FEUS_TAGS))
.map(t -> Set.of(t.split(",")))
.orElse(Set.of());
String name = (r.indexOf(FEEDS_USERS.FEUS_FEED_NAME) >= 0 && r.get(FEEDS_USERS.FEUS_FEED_NAME) != null)
? r.get(FEEDS_USERS.FEUS_FEED_NAME) : raw.name();
return Feed.builder()
.raw(raw)
? r.get(FEEDS_USERS.FEUS_FEED_NAME) : r.get(FEEDS.FEED_NAME);

String owner = (r.indexOf(FEEDS_USERS.FEUS_USER_ID) >= 0 && r.get(FEEDS_USERS.FEUS_USER_ID) != null)
? r.get(FEEDS_USERS.FEUS_USER_ID) : Entity.NO_ONE;

WebFeed webFeed = WebFeed.builder()
.reference(r.get(FEEDS.FEED_ID))
.location(URI.create(r.get(FEEDS.FEED_URL)))
.name(name)
.description(r.get(FEEDS.FEED_DESCRIPTION))
.tags(tags)
.build();

return Entity.identify(webFeed.reference(), owner, webFeed);
}

@Mapping(target = "feedId", source = "raw.id")
@Mapping(target = "feedUrl", source = "raw.url")
@Mapping(target = "feedName", source = "raw.name")
@Mapping(target = "feedDescription", source = "raw.description")
@Mapping(target = "feedLastWatch", source = "raw.lastWatch")
FeedsRecord feedToFeedsRecord(Feed feed);
@Mapping(target = "feedId", source = "reference")
@Mapping(target = "feedUrl", source = "location")
@Mapping(target = "feedName", source = "name")
@Mapping(target = "feedDescription", source = "description")
FeedsRecord feedToFeedsRecord(WebFeed feed);

@Mapping(target = "feusFeedId", source = "raw.id")
@Mapping(target = "feusFeedId", source = "reference")
@Mapping(target = "feusFeedName", source = "name")
@Mapping(target = "feusTags",
expression = "java( (feed.getTags() != null && !feed.getTags().isEmpty())?String.join(\",\", feed.getTags()):null )")
FeedsUsersRecord feedToFeedsUsersRecord(Feed feed);
expression = "java( (feed.tags() != null && !feed.tags().isEmpty())?String.join(\",\", feed.tags()):null )")
FeedsUsersRecord feedToFeedsUsersRecord(WebFeed feed);

@SuppressWarnings("unused")
default Instant fromLocalDateTime(LocalDateTime date) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class IndexableDataAdapter implements IndexableDataPort {
@Override
public Flux<IndexableFeed> listFeed() {
return systemMaintenanceService.feedList()
.map(mapper::getIndexableFromFeed);
.map(f -> mapper.getIndexableFromFeed(f.self));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import fr.ght1pc9kc.baywatch.common.domain.Hasher;
import fr.ght1pc9kc.baywatch.indexer.domain.model.IndexableFeed;
import fr.ght1pc9kc.baywatch.indexer.domain.model.IndexableFeedEntry;
import fr.ght1pc9kc.baywatch.techwatch.api.model.RawFeed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.RawNews;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.ReportingPolicy;
Expand All @@ -16,10 +16,10 @@
imports = {URI.class, Set.class, Hasher.class},
unmappedTargetPolicy = ReportingPolicy.IGNORE)
public interface IndexerMapper {
@Mapping(source = "reference", target = "id")
@Mapping(source = "name", target = "title")
@Mapping(source = "description", target = "description")
@Mapping(target = "link", expression = "java(rf.url().toString())")
IndexableFeed getIndexableFromFeed(RawFeed rf);
@Mapping(target = "link", expression = "java(rf.location().toString())")
IndexableFeed getIndexableFromFeed(WebFeed rf);

IndexableFeedEntry getIndexableFromEntry(RawNews rn);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package fr.ght1pc9kc.baywatch.opml.domain;

import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.RawFeed;
import fr.ght1pc9kc.baywatch.common.domain.Hasher;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import lombok.extern.slf4j.Slf4j;

import javax.xml.namespace.QName;
Expand All @@ -20,11 +19,11 @@

@Slf4j
public class OpmlReader {
private final Consumer<Feed> onOutline;
private final Consumer<WebFeed> onOutline;
private final Runnable onComplete;
private final Consumer<Throwable> onError;

public OpmlReader(Consumer<Feed> onOutline, Runnable onComplete, Consumer<Throwable> onError) {
public OpmlReader(Consumer<WebFeed> onOutline, Runnable onComplete, Consumer<Throwable> onError) {
this.onOutline = onOutline;
this.onComplete = onComplete;
this.onError = onError;
Expand Down Expand Up @@ -56,12 +55,10 @@ public void read(InputStream is) {
.map(t -> Set.of(t.split(",")))
.orElseGet(Set::of);

Feed feed = Feed.builder()
.raw(RawFeed.builder()
.id(Hasher.identify(uri))
.url(uri)
.name(Optional.ofNullable(text).orElseGet(uri::getHost))
.build())
WebFeed feed = WebFeed.builder()
.reference(Hasher.identify(uri))
.location(uri)
.name(Optional.ofNullable(text).orElseGet(uri::getHost))
.tags(tags)
.build();
this.onOutline.accept(feed);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package fr.ght1pc9kc.baywatch.opml.domain;

import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;

import java.util.function.Consumer;

@FunctionalInterface
public interface OpmlReaderFactory {
OpmlReader create(Consumer<Feed> onOutline, Runnable onComplete, Consumer<Throwable> onError);
OpmlReader create(Consumer<WebFeed> onOutline, Runnable onComplete, Consumer<Throwable> onError);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import fr.ght1pc9kc.baywatch.security.api.model.User;
import fr.ght1pc9kc.baywatch.security.domain.exceptions.UnauthenticatedUser;
import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import fr.ght1pc9kc.baywatch.techwatch.domain.model.QueryContext;
import fr.ght1pc9kc.baywatch.techwatch.infra.persistence.FeedRepository;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -54,8 +54,8 @@ public Mono<Void> opmlImport(Flux<DataBuffer> data) {
.flatMapMany(Exceptions.wrap().function(owner -> {
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
Flux<Feed> feeds = readOpml(pis);
Mono<Feed> db = DataBufferUtils.write(data, pos)
Flux<WebFeed> feeds = readOpml(pis);
Mono<WebFeed> db = DataBufferUtils.write(data, pos)
.map(DataBufferUtils::release)
.doOnTerminate(Exceptions.wrap().runnable(() -> {
pos.flush();
Expand All @@ -64,7 +64,7 @@ public Mono<Void> opmlImport(Flux<DataBuffer> data) {
.then(Mono.empty());
return Flux.merge(db, feeds)
.buffer(100)
.flatMap(f -> feedRepository.persist(f).collectList())
.flatMap(f -> feedRepository.persist(f).map(e -> e.self).collectList())
.flatMap(f -> feedRepository.persistUserRelation(f, owner.id));
})).then();
}
Expand All @@ -74,9 +74,9 @@ private Mono<Void> writeOpml(OutputStream out, Entity<User> owner) {
return feedRepository.list(QueryContext.empty().withUserId(owner.id))
.doFirst(() -> opmlWriter.startOpmlDocument(owner.self))
.doOnEach(signal -> {
Feed feed = signal.get();
Entity<WebFeed> feed = signal.get();
if (feed != null) {
opmlWriter.writeOutline(feed);
opmlWriter.writeOutline(feed.self);
}
})
.doOnComplete(opmlWriter::endOmplDocument)
Expand All @@ -88,7 +88,7 @@ private Mono<Void> writeOpml(OutputStream out, Entity<User> owner) {
}).then();
}

private Flux<Feed> readOpml(InputStream is) {
private Flux<WebFeed> readOpml(InputStream is) {
return Flux.create(sink ->
readerFactory.create(sink::next, sink::complete, sink::error).read(is));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fr.ght1pc9kc.baywatch.opml.domain;

import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.security.api.model.User;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.VisibleForTesting;

Expand Down Expand Up @@ -82,14 +82,14 @@ private void writeHead(XMLStreamWriter xmlWriter, User owner) {
}
}

public void writeOutline(Feed feed) {
public void writeOutline(WebFeed feed) {
try {
xmlWriter.writeStartElement(OPMLTags.OUTLINE);
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_TEXT, feed.getName());
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_TEXT, feed.name());
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_TYPE, OPMLTags.VALUE_TYPE_RSS);
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_XML_URL, feed.getUrl().toString());
xmlWriter.writeAttribute(OPMLTags.TITLE, feed.getName());
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_CATEGORY, String.join(",", feed.getTags()));
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_XML_URL, feed.location().toString());
xmlWriter.writeAttribute(OPMLTags.TITLE, feed.name());
xmlWriter.writeAttribute(OPMLTags.ATTRIBUTE_CATEGORY, String.join(",", feed.tags()));
xmlWriter.writeEndElement(); // outline
xmlWriter.flush();
} catch (XMLStreamException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public Mono<ScrapResult> scrap(Period maxRetention) {

.reduce(Integer::sum)
.onErrorMap(t -> new ScrapingException("Fatal error when scraping !", t))
.doOnTerminate(errors::tryEmitComplete)

.switchIfEmpty(Mono.just(0))
.flatMap(count -> errors.asFlux().collectList().map(
Expand All @@ -132,7 +133,9 @@ private Flux<News> wgetFeedNews(ScrapedFeed feed, Period conservation, Sinks.Man
URI feedUrl = (hostPlugin != null) ? hostPlugin.uriModifier(feed.link()) : feed.link();

if (!SUPPORTED_SCHEMES.contains(feedUrl.getScheme())) {
log.warn("Unsupported scheme for {} !", feedUrl);
errors.tryEmitNext(new FeedScrapingException(AtomFeed.of(feed.id(), feed.link()),
new IllegalArgumentException("Unsupported scheme for " + feedUrl + " !")));
log.warn("Unsupported scheme for {} !", feedUrl);
return Flux.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import fr.ght1pc9kc.baywatch.common.api.ScrapingEventHandler;
import fr.ght1pc9kc.baywatch.techwatch.api.SystemMaintenanceService;
import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.techwatch.api.model.News;
import fr.ght1pc9kc.juery.api.Criteria;
import fr.ght1pc9kc.juery.api.PageRequest;
Expand All @@ -17,7 +16,7 @@
import static fr.ght1pc9kc.baywatch.common.api.model.EntitiesProperties.FEED_ID;

/**
* Delete the orphan {@link Feed}. Orphan was the Feeds followed by nobody.
* Delete the orphan {@link fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed}. Orphan was the Feeds followed by nobody.
* <p>
* If the Feed to delete have {@link News} the News are deleted
*/
Expand Down Expand Up @@ -50,9 +49,9 @@ public Mono<Void> before() {
.then();

return maintenanceService.feedList(PageRequest.all(Criteria.property(COUNT).eq(0)))
.doOnNext(feed -> feeds.tryEmitNext(feed.id()))
.doOnNext(feed -> feeds.tryEmitNext(feed.id))
.doOnComplete(feeds::tryEmitComplete)
.flatMap(feed -> maintenanceService.newsList(PageRequest.all(Criteria.property(FEED_ID).eq(feed.id()))))
.flatMap(feed -> maintenanceService.newsList(PageRequest.all(Criteria.property(FEED_ID).eq(feed.id))))
.doOnComplete(newsSink::tryEmitComplete)
.doOnNext(news -> newsSink.tryEmitNext(news.id()))
.then(deleted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public Flux<String> listAllNewsId() {
@Override
public Flux<ScrapedFeed> feedList() {
return systemMaintenanceService.feedList(PageRequest.all())
.map(f -> new ScrapedFeed(f.id(), f.url()));
.map(f -> new ScrapedFeed(f.id, f.self.location()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface TeamsService {
Mono<Entity<Team>> create(String name, String topic);

/**
* List all teh {@link Team} depending on the {@link PageRequest}
* List all the {@link Team} depending on the {@link PageRequest}
* pass in parameter.
*
* @param pageRequest The filter and pagination information for the request
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fr.ght1pc9kc.baywatch.techwatch.api;

import fr.ght1pc9kc.baywatch.techwatch.api.model.Feed;
import fr.ght1pc9kc.baywatch.common.api.model.Entity;
import fr.ght1pc9kc.baywatch.techwatch.api.model.WebFeed;
import fr.ght1pc9kc.juery.api.PageRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -9,19 +10,19 @@

public interface FeedService {
/**
* Get a single {@link Feed}
* Get a single {@link WebFeed}
*
* @param id the ID of the requested Feed
* @return The Feed
*/
Mono<Feed> get(String id);
Mono<Entity<WebFeed>> get(String id);

/**
* List all the {@link Feed} of the connected user
* List all the {@link WebFeed} of the connected user
*
* @return a Flux of Feed
*/
Flux<Feed> list();
Flux<Entity<WebFeed>> list();

/**
* Depending on the filters this return the list of news, scoped by the current user.
Expand All @@ -30,7 +31,7 @@ public interface FeedService {
* @param pageRequest The page request
* @return The list of feeds
*/
Flux<Feed> list(PageRequest pageRequest);
Flux<Entity<WebFeed>> list(PageRequest pageRequest);

/**
* Count the total number of elements returned by the {@link PageRequest}
Expand All @@ -43,30 +44,30 @@ public interface FeedService {
Mono<Integer> count(PageRequest pageRequest);

/**
* Update the subscription to a {@link Feed}
* Update the subscription to a {@link WebFeed}
*
* @param toPersist the Feed of the subscription to update
* @return The new Feed of the subscription
*/
Mono<Feed> update(Feed toPersist);
Mono<Entity<WebFeed>> update(WebFeed toPersist);

/**
* Add a {@link Feed} to the available Feed list in database
* Add a {@link WebFeed} to the available Feed list in database
*
* @param toAdd The list of {@link Feed} to add
* @return The list of {@link Feed} added
* @param toAdd The list of {@link WebFeed} to add
* @return The list of {@link WebFeed} added
*/
Flux<Feed> add(Collection<Feed> toAdd);
Flux<Entity<WebFeed>> add(Collection<WebFeed> toAdd);

/**
* Subscribe to a {@link Feed} present in database
* Subscribe to a {@link WebFeed} present in database
*
* @param feeds The list of feed IDs the current user want to subscribe
* @return The {@link Feed} the user have effectively subscribed
* @return The {@link WebFeed} the user have effectively subscribed
*/
Flux<Feed> subscribe(Collection<Feed> feeds);
Flux<Entity<WebFeed>> subscribe(Collection<WebFeed> feeds);

Flux<Feed> addAndSubscribe(Collection<Feed> feeds);
Flux<Entity<WebFeed>> addAndSubscribe(Collection<WebFeed> feeds);

Mono<Integer> delete(Collection<String> toDelete);
}
Loading

0 comments on commit 61f29ae

Please sign in to comment.