Skip to content

Commit

Permalink
chore: finalise 3.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Yalz authored Jul 3, 2024
2 parents bb061ae + cb66044 commit 7b3acd3
Show file tree
Hide file tree
Showing 141 changed files with 1,365 additions and 1,499 deletions.
23 changes: 9 additions & 14 deletions docker-compose/config.env
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#
# MONGODB
# POSTGRES
#
SPRING_DATASOURCE_URL=
SPRING_DATASOURCE_USERNAME=
SPRING_DATASOURCE_PASSWORD=

# EMBEDDED MONGODB
SPRING_DATA_MONGODB_HOST=
SPRING_DATA_MONGODB_PORT=

# EXTERNAL MONGODB
SPRING_DATA_MONGODB_URI=
SPRING_DATA_MONGODB_DATABASE=
#
# SPRING BATCHING - FOR FRAGMENTATION PURPOSES
#
SPRING_BATCH_JDBC_INITIALIZESCHEMA=always

#
# SWAGGER
Expand All @@ -31,9 +31,4 @@ MANAGEMENT_ZIPKIN_TRACING_ENDPOINT="http://host.docker.internal:14250"
# HEALTH & INFO & METRICS ACTUATORS
#

MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE=

#
# Mongock
#
MONGOCK_MIGRATIONSCANPACKAGE=
MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE=
23 changes: 11 additions & 12 deletions docker-compose/config.local.env
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#
# MONGODB
# POSTGRES
#
SPRING_DATA_MONGODB_HOST=ldes.mongodb
SPRING_DATA_MONGODB_PORT=27017
SPRING_DATA_MONGODB_AUTOINDEXCREATION="True"
SPRING_DATASOURCE_URL=jdbc:postgresql://ldes-postgres:5432/test
SPRING_DATASOURCE_USERNAME=admin
SPRING_DATASOURCE_PASSWORD=admin

#
# SPRING BATCHING - FOR FRAGMENTATION PURPOSES
#
SPRING_BATCH_JDBC_INITIALIZESCHEMA=always

#
# SWAGGER
Expand All @@ -28,7 +33,7 @@ MANAGEMENT_ZIPKIN_TRACING_ENDPOINT="http://host.docker.internal:14250"

MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE="prometheus, info, health"
MANAGEMENT_HEALTH_DEFAULTS_ENABLED=false
MANAGEMENT_HEALTH_MONGO_ENABLED=true
MANAGEMENT_HEALTH_DB_ENABLED=true
MANAGEMENT_ENDPOINT_HEALTH_SHOWDETAILS="always"

#
Expand All @@ -38,10 +43,4 @@ MANAGEMENT_ENDPOINT_HEALTH_SHOWDETAILS="always"
SIS_DATA=/tmp

SPRING_MVC_STATIC_PATH_PATTERN=/**
SPRING_WEB_RESOURCES_STATIC_LOCATIONS=file:/opt/content

#
# Mongock
#

MONGOCK_MIGRATIONSCANPACKAGE=VSDS
SPRING_WEB_RESOURCES_STATIC_LOCATIONS=file:/opt/content
2 changes: 0 additions & 2 deletions docker-compose/mongo/Dockerfile

This file was deleted.

1 change: 0 additions & 1 deletion docker-compose/mongo/init-replicaset.js

This file was deleted.

4 changes: 0 additions & 4 deletions docker-compose/server.config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ spring:
url: jdbc:postgresql://ldes-postgres:5432/test
username: admin
password: admin
jpa:
database: postgresql
liquibase:
change-log: classpath:/db/changelog/master.xml

#server:
# servlet:
Expand Down
14 changes: 11 additions & 3 deletions docs/how-to-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ spring:
url: jdbc:postgresql://localhost:5432/test
username: admin
password: admin
jpa:
hibernate:
ddl-auto: update
batch:
jdbc:
initialize-schema: always

rest:
max-age: 120
max-age-immutable: 604800
Expand Down Expand Up @@ -117,6 +118,13 @@ Here is an explanation provided for all the possibilities on how to tweak and co
<td></td>
<td></td>
</tr>
<tr><td colspan="4"><b>Bucketisation & pagination batching</b></td></tr>
<tr>
<td>spring.batch.jdbc.initialize-schema</td>
<td>Indicates whether the database should be initialized with the tables required for the batching process. This can be either `always`, `embedded` (embedded databases) or `never`</td>
<td>Yes (to let the bucketisation and pagination process run)</td>
<td></td>
</tr>
<tr><td colspan="4"><b><a href="./features/compaction">Fragment Compaction</a></b></td></tr>
<tr>
<td>ldes-server.compaction-cron</td>
Expand Down
2 changes: 1 addition & 1 deletion ldes-fragmentisers/ldes-fragmentisers-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ldes-fragmentisers</artifactId>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<version>3.1.0-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package be.vlaanderen.informatievlaanderen.ldes.server.fragmentation;

import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.EventStreamClosedEvent;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.ViewAddedEvent;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.ViewInitializationEvent;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.ViewSupplier;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.fragmentation.MembersBucketisedEvent;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.fragmentation.NewViewBucketisedEvent;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.fragmentation.ViewNeedsRebucketisationEvent;
Expand All @@ -11,18 +14,17 @@
import be.vlaanderen.informatievlaanderen.ldes.server.ingest.entities.IngestedMember;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
Expand All @@ -32,25 +34,20 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static be.vlaanderen.informatievlaanderen.ldes.server.domain.constants.ServerConfig.FRAGMENTATION_CRON;
import static be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.batch.BucketJobDefinitions.*;

@Service
@EnableScheduling
public class FragmentationService {
public static final int POLLING_RATE = 1500;
public static final String LDES_SERVER_CREATE_FRAGMENTS_COUNT = "ldes_server_create_fragments_count";
private static final String BUCKETISATION_JOB = "bucketisation";
private static final String REBUCKETISATION_JOB = "rebucketisation";
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final ItemReader<IngestedMember> newMemberReader;
private final ItemReader<IngestedMember> rebucketiseMemberReader;
private final BucketProcessor processor;
private final ItemWriter<List<BucketisedMember>> bucketWriter;
private final ApplicationEventPublisher eventPublisher;
private AtomicBoolean shouldTriggerBucketisation = new AtomicBoolean(false);

private final AtomicBoolean shouldTriggerBucketisation = new AtomicBoolean(false);
private final FragmentRepository fragmentRepository;
private final Job bucketisationJob;
private final Job rebucketiseJob;

public FragmentationService(JobLauncher jobLauncher, JobExplorer jobExplorer, JobRepository jobRepository,
PlatformTransactionManager transactionManager,
Expand All @@ -61,14 +58,11 @@ public FragmentationService(JobLauncher jobLauncher, JobExplorer jobExplorer, Jo
FragmentRepository fragmentRepository) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.newMemberReader = newMemberReader;
this.rebucketiseMemberReader = rebucketiseMemberReader;
this.processor = processor;
this.bucketWriter = bucketWriter;
this.eventPublisher = eventPublisher;
this.fragmentRepository = fragmentRepository;

this.bucketisationJob = bucketiseJob(jobRepository, transactionManager, newMemberReader, processor, bucketWriter);
this.rebucketiseJob = rebucketiseJob(jobRepository, transactionManager, rebucketiseMemberReader, processor, bucketWriter);
}

@EventListener(MembersIngestedEvent.class)
Expand All @@ -78,17 +72,17 @@ public void executeFragmentation() {

@EventListener
public void handleViewInitializationEvent(ViewNeedsRebucketisationEvent event) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
launchJob(rebucketiseJob(), new JobParametersBuilder()
launchJob(rebucketiseJob, new JobParametersBuilder()
.addString("viewName", event.viewName().asString())
.addLocalDateTime("triggered", LocalDateTime.now())
.toJobParameters());
}

@Scheduled(fixedRate = POLLING_RATE)
@Scheduled(cron = FRAGMENTATION_CRON)
public void scheduledJobLauncher() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
if (shouldTriggerBucketisation.get() && !isJobRunning(BUCKETISATION_JOB) && !isJobRunning(REBUCKETISATION_JOB)) {
shouldTriggerBucketisation.set(false);
launchJob(bucketiseJob(), new JobParameters());
launchJob(bucketisationJob, new JobParameters());
}
}

Expand All @@ -97,50 +91,26 @@ public void markFragmentsImmutableInCollection(EventStreamClosedEvent event) {
fragmentRepository.markFragmentsImmutableInCollection(event.collectionName());
}

@EventListener({ViewAddedEvent.class, ViewInitializationEvent.class})
@Order
public void handleViewAddedEvent(ViewSupplier event) {
eventPublisher.publishEvent(new ViewNeedsRebucketisationEvent(event.viewSpecification().getName()));
}

private void launchJob(Job job, JobParameters jobParameters) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
jobLauncher.run(job, jobParameters);
if (job.getName().equals(BUCKETISATION_JOB)) {
eventPublisher.publishEvent(new MembersBucketisedEvent());
} else if (job.getName().equals(REBUCKETISATION_JOB)) {
eventPublisher.publishEvent(new NewViewBucketisedEvent(jobParameters.getString("viewName")));
JobExecution jobExecution = jobLauncher.run(job, jobParameters);

if (jobExecution.getStepExecutions().stream().toList().getFirst().getWriteCount() != 0) {
if (job.getName().equals(BUCKETISATION_JOB)) {
eventPublisher.publishEvent(new MembersBucketisedEvent());
} else if (job.getName().equals(REBUCKETISATION_JOB)) {
eventPublisher.publishEvent(new NewViewBucketisedEvent(jobParameters.getString("viewName")));
}
}

}

private boolean isJobRunning(String jobName) {
return !jobExplorer.findRunningJobExecutions(jobName).isEmpty();
}

private Job bucketiseJob() {
return new JobBuilder(BUCKETISATION_JOB, jobRepository)
.start(bucketiseMembers())
.build();
}

private Job rebucketiseJob() {
return new JobBuilder(REBUCKETISATION_JOB, jobRepository)
.start(rebucketiseMembers())
.build();
}

private Step bucketiseMembers() {
return new StepBuilder("bucketiseMembers", jobRepository)
.<IngestedMember, List<BucketisedMember>>chunk(150, transactionManager)
.reader(newMemberReader)
.processor(processor)
.writer(bucketWriter)
.allowStartIfComplete(true)
.build();
}

private Step rebucketiseMembers() {
return new StepBuilder("rebucketiseMembers", jobRepository)
.<IngestedMember, List<BucketisedMember>>chunk(150, transactionManager)
.reader(rebucketiseMemberReader)
.processor(processor)
.writer(bucketWriter)
.allowStartIfComplete(true)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,31 @@
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.factory.FragmentationStrategyCreator;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.repository.BucketisedMemberRepository;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.repository.FragmentRepository;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.services.ViewBucketisationService;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.function.Predicate;

import static be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.services.ViewBucketisationService.ServiceType.FRAGMENTATION;

@Component
public class FragmentationStrategyBatchCollection implements FragmentationStrategyCollection {

private final FragmentRepository fragmentRepository;
private final Set<FragmentationStrategyBatchExecutor> fragmentationStrategySet;
private final BucketisedMemberRepository bucketisedMemberRepository;
private final FragmentationStrategyCreator fragmentationStrategyCreator;
private final ViewBucketisationService viewBucketisationService;
private final ObservationRegistry observationRegistry;

public FragmentationStrategyBatchCollection(
FragmentRepository fragmentRepository,
BucketisedMemberRepository bucketisedMemberRepository,
FragmentationStrategyCreator fragmentationStrategyCreator, ViewBucketisationService viewBucketisationService,
FragmentationStrategyCreator fragmentationStrategyCreator,
ObservationRegistry observationRegistry) {
this.fragmentRepository = fragmentRepository;
this.bucketisedMemberRepository = bucketisedMemberRepository;
this.fragmentationStrategyCreator = fragmentationStrategyCreator;
this.viewBucketisationService = viewBucketisationService;
this.observationRegistry = observationRegistry;
this.fragmentationStrategySet = new HashSet<>();
}
Expand All @@ -58,10 +54,10 @@ public Optional<FragmentationStrategyBatchExecutor> getFragmentationStrategyExec
}

@EventListener({ViewAddedEvent.class, ViewInitializationEvent.class})
@Order(1)
public void handleViewAddedEvent(ViewSupplier event) {
final var fragmentationStrategyExecutor = createExecutor(event.viewSpecification().getName(), event.viewSpecification());
fragmentationStrategySet.add(fragmentationStrategyExecutor);
viewBucketisationService.setHasView(event.viewSpecification().getName(), FRAGMENTATION);
}

@EventListener
Expand All @@ -70,15 +66,13 @@ public void handleEventStreamDeletedEvent(EventStreamDeletedEvent event) {
executor -> Objects.equals(executor.getViewName().getCollectionName(), event.collectionName()));
fragmentRepository.deleteTreeNodesByCollection(event.collectionName());
bucketisedMemberRepository.deleteByCollection(event.collectionName());
viewBucketisationService.setDeletedCollection(event.collectionName(), FRAGMENTATION);
}

@EventListener
public void handleViewDeletedEvent(ViewDeletedEvent event) {
removeFromStrategySet(executor -> Objects.equals(executor.getViewName(), event.getViewName()));
fragmentRepository.removeLdesFragmentsOfView(event.getViewName().asString());
bucketisedMemberRepository.deleteByViewName(event.getViewName());
viewBucketisationService.setDeletedView(event.getViewName(), FRAGMENTATION);
}

private void removeFromStrategySet(Predicate<FragmentationStrategyBatchExecutor> filterPredicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public class FragmentationStrategyImpl implements FragmentationStrategy {
@Override
public List<BucketisedMember> addMemberToFragment(Fragment fragment, FragmentationMember member,
Observation parentObservation) {
return List.of(new BucketisedMember(member.id(), fragment.getViewName(), fragment.getFragmentIdString(), member.sequenceNr()));
return List.of(new BucketisedMember(member.id(), fragment.getViewName(), fragment.getFragmentIdString()));
}
}
Loading

0 comments on commit 7b3acd3

Please sign in to comment.