Skip to content

Commit

Permalink
hotfix: dropped member reader perfo
Browse files Browse the repository at this point in the history
  • Loading branch information
Yalz authored Oct 8, 2024
2 parents b4c0a16 + 5782cc7 commit 1e32b22
Show file tree
Hide file tree
Showing 43 changed files with 245 additions and 137 deletions.
2 changes: 1 addition & 1 deletion ldes-fragmentisers/ldes-fragmentisers-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ldes-fragmentisers</artifactId>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package be.vlaanderen.informatievlaanderen.ldes.server.fragmentation;

import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.ViewName;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.services.MemberMetricsRepository;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.entities.UnprocessedView;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.exceptions.FragmentationJobException;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.repository.UnprocessedViewRepository;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.builder.JobBuilder;
Expand All @@ -29,34 +29,38 @@ public class FragmentationService {
public static final String FRAGMENTATION_JOB = "fragmentation";
public static final String COLLECTION_NAME = "collectionName";
public static final String VIEW_NAME = "viewName";
public static final String COLLECTION_ID = "collectionId";
public static final String VIEW_ID = "viewId";
public static final String LDES_SERVER_CREATE_FRAGMENTS_COUNT = "ldes_server_create_fragments_count";
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private final Job bucketiseJob;
private final MemberMetricsRepository memberRepository;
private final UnprocessedViewRepository unprocessedViewRepository;

public FragmentationService(@Qualifier(ASYNC_JOB_LAUNCHER) JobLauncher jobLauncher, JobRepository jobRepository, JobExplorer jobExplorer,
@Qualifier(BUCKETISATION_STEP) Step bucketiseMembersStep, Step paginationStep,
MemberMetricsRepository memberRepository) {
UnprocessedViewRepository unprocessedViewRepository) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
this.memberRepository = memberRepository;
this.bucketiseJob = createJob(jobRepository, bucketiseMembersStep, paginationStep);
this.unprocessedViewRepository = unprocessedViewRepository;
this.bucketiseJob = createJob(bucketiseMembersStep, paginationStep);
this.cleanupOldJobs();
}

@Scheduled(cron = FRAGMENTATION_CRON)
public void scheduledJobLauncher() {
memberRepository.getUnprocessedViews()
unprocessedViewRepository.findAll()
.parallelStream()
.filter(this::noJobsRunning)
.forEach(viewName -> {
.forEach(unprocessedView -> {
try {
jobLauncher.run(bucketiseJob, new JobParametersBuilder()
.addString(VIEW_NAME, viewName.getViewName())
.addString(COLLECTION_NAME, viewName.getCollectionName())
.addLong(VIEW_ID, (long) unprocessedView.viewId())
.addLong(COLLECTION_ID, (long) unprocessedView.collectionId())
.addString(VIEW_NAME, unprocessedView.viewName())
.addString(COLLECTION_NAME, unprocessedView.collectionName())
.addLocalDateTime("triggered", LocalDateTime.now())
.toJobParameters());
} catch (JobInstanceAlreadyCompleteException | JobExecutionAlreadyRunningException |
Expand All @@ -66,18 +70,22 @@ public void scheduledJobLauncher() {
});
}

private boolean noJobsRunning(ViewName viewName) {
private boolean noJobsRunning(UnprocessedView unprocessedView) {
return jobExplorer.findRunningJobExecutions(FRAGMENTATION_JOB)
.stream()
.noneMatch(jobExecution -> {
var params = jobExecution.getJobParameters();
String view = Objects.requireNonNull(params.getString(VIEW_NAME));
String collection = Objects.requireNonNull(params.getString(COLLECTION_NAME));
return view.equals(viewName.getViewName()) && collection.equals(viewName.getCollectionName());
final UnprocessedView fromParams = new UnprocessedView(
Objects.requireNonNull(params.getLong(COLLECTION_ID)).intValue(),
params.getString(COLLECTION_NAME),
Objects.requireNonNull(params.getLong(VIEW_ID)).intValue(),
params.getString(VIEW_NAME)
);
return Objects.equals(fromParams, unprocessedView);
});
}

private Job createJob(JobRepository jobRepository, Step step, Step paginationStep) {
private Job createJob(Step step, Step paginationStep) {
return new JobBuilder(FRAGMENTATION_JOB, jobRepository)
.start(step)
.next(paginationStep)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.entities;

public record UnprocessedView(int collectionId, String collectionName, int viewId, String viewName) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.repository;

import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.entities.UnprocessedView;

import java.util.List;

public interface UnprocessedViewRepository {
List<UnprocessedView> findAll();
}
Original file line number Diff line number Diff line change
@@ -1,81 +1,77 @@
package be.vlaanderen.informatievlaanderen.ldes.server.fragmentation;

import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.ViewName;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.services.MemberMetricsRepository;
import be.vlaanderen.informatievlaanderen.ldes.server.domain.services.ServerMetrics;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.entities.UnprocessedView;
import be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.repository.UnprocessedViewRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.batch.core.repository.JobRepository;

import java.util.List;
import java.util.Objects;
import java.util.Set;

import static be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.FragmentationService.*;
import static be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.batch.BatchConfiguration.ASYNC_JOB_LAUNCHER;
import static be.vlaanderen.informatievlaanderen.ldes.server.fragmentation.batch.BucketJobDefinitions.BUCKETISATION_STEP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.tuple;
import static org.mockito.Mockito.*;

@RunWith(SpringRunner.class)
@SpringBatchTest
@EnableAutoConfiguration
@ContextConfiguration(classes = {SpringBatchConfiguration.class, FragmentationService.class})
@TestPropertySource(properties = {"ldes-server.fragmentation-cron=*/1 * * * * *"})
@ExtendWith(MockitoExtension.class)
class FragmentationServiceTest {
@MockBean(name = BUCKETISATION_STEP)
Step bucketStep;
@MockBean(name = "paginationStep")
Step paginationStep;
@MockBean
ServerMetrics serverMetrics;
@MockBean
FragmentationStrategyCollection strategyCollection;
@MockBean
MemberMetricsRepository memberMetricsRepository;
@MockBean(name = ASYNC_JOB_LAUNCHER)
JobLauncher jobLauncher;
@MockBean
JobExplorer jobExplorer;
@Autowired
private static final String COLLECTION = "collection";
@Mock
private Step bucketStep;
@Mock
private Step paginationStep;
@Mock
private MemberMetricsRepository memberMetricsRepository;
@Mock
private JobLauncher jobLauncher;
@Mock
private JobExplorer jobExplorer;
@Mock
private JobRepository jobRepository;
@Mock
private UnprocessedViewRepository unprocessedViewRepository;
@Captor
private ArgumentCaptor<JobParameters> captor;
private FragmentationService fragmentationService;
private List<UnprocessedView> unprocessedViews;

@BeforeEach
void setUp() {
fragmentationService = new FragmentationService(jobLauncher, jobRepository, jobExplorer, bucketStep, paginationStep, unprocessedViewRepository);
unprocessedViews = List.of(
new UnprocessedView(1, COLLECTION, 1, "v1"),
new UnprocessedView(1, COLLECTION, 2, "v2")
);
}

@Test
void when_unprocessedViews_then_triggerJobsForEachView() throws Exception {
String collection = "collection";

when(memberMetricsRepository.getUnprocessedViews())
.thenReturn(List.of(new ViewName(collection, "v1"),
new ViewName(collection, "v2")));
when(unprocessedViewRepository.findAll()).thenReturn(unprocessedViews);

fragmentationService.scheduledJobLauncher();

ArgumentCaptor<JobParameters> captor = ArgumentCaptor.forClass(JobParameters.class);
verify(jobLauncher, times(2)).run(any(), captor.capture());
assertThat(captor.getAllValues())
.extracting(obj -> obj.getString(COLLECTION_NAME), obj -> obj.getString(VIEW_NAME))
.containsExactlyInAnyOrder(
tuple(collection, "v1"),
tuple(collection, "v2")
);
.map(FragmentationServiceTest::mapParamsToUnprocessedView)
.containsExactlyInAnyOrderElementsOf(unprocessedViews);
}

@Test
void when_noUnprocessedViews_then_triggerNone() {
when(memberMetricsRepository.getUnprocessedViews()).thenReturn(List.of());
when(unprocessedViewRepository.findAll()).thenReturn(List.of());

fragmentationService.scheduledJobLauncher();

Expand All @@ -84,29 +80,33 @@ void when_noUnprocessedViews_then_triggerNone() {

@Test
void when_unprocessedViews_then_triggerJobsForEachViewThatIsntRunningAlready() throws Exception {
String collection = "collection";

when(memberMetricsRepository.getUnprocessedViews())
.thenReturn(List.of(new ViewName(collection, "v1"), new ViewName(collection, "v2")));
when(unprocessedViewRepository.findAll()).thenReturn(unprocessedViews);

JobExecution jobExecution = mock(JobExecution.class);
JobParameters jobParameters = new JobParametersBuilder()
.addLong(VIEW_ID, 1L)
.addLong(COLLECTION_ID, 1L)
.addString(VIEW_NAME, "v1")
.addString(COLLECTION_NAME, collection)
.addString(COLLECTION_NAME, COLLECTION)
.toJobParameters();
when(jobExecution.getJobParameters()).thenReturn(jobParameters);
when(jobExplorer.findRunningJobExecutions(FRAGMENTATION_JOB)).thenReturn(Set.of(jobExecution));

fragmentationService.scheduledJobLauncher();

ArgumentCaptor<JobParameters> captor = ArgumentCaptor.forClass(JobParameters.class);
verify(jobLauncher, times(1)).run(any(), captor.capture());
assertThat(captor.getAllValues())
.extracting(obj -> obj.getString(COLLECTION_NAME), obj -> obj.getString(VIEW_NAME))
.containsExactlyInAnyOrder(
tuple(collection, "v2")
);
verify(jobLauncher).run(any(), captor.capture());
assertThat(captor.getValue())
.extracting(FragmentationServiceTest::mapParamsToUnprocessedView)
.isEqualTo(new UnprocessedView(1, COLLECTION, 2, "v2"));
}

private static UnprocessedView mapParamsToUnprocessedView(JobParameters params) {
return new UnprocessedView(
Objects.requireNonNull(params.getLong(COLLECTION_ID)).intValue(),
params.getString(COLLECTION_NAME),
Objects.requireNonNull(params.getLong(VIEW_ID)).intValue(),
params.getString(VIEW_NAME)
);
}

}
2 changes: 1 addition & 1 deletion ldes-fragmentisers/ldes-fragmentisers-geospatial/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>ldes-fragmentisers</artifactId>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion ldes-fragmentisers/ldes-fragmentisers-reference/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>ldes-fragmentisers</artifactId>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-fragmentisers</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>ldes-fragmentisers-timebased-hierarchical</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ldes-fragmentisers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>ldes-fragmentisers</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ldes-server-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion ldes-server-application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>ldes-server-application</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ldes-server-compaction/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>ldes-server-compaction</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ldes-server-domain/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>ldes-server-domain</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package be.vlaanderen.informatievlaanderen.ldes.server.domain.services;

import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.ViewName;

import java.util.List;

public interface MemberMetricsRepository {
int getTotalCount(String collectionName);
List<ViewName> getUnprocessedViews();
}
2 changes: 1 addition & 1 deletion ldes-server-infra-postgres/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>
<packaging>pom</packaging>
<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server-infra-postgres</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>postgres-admin-repository</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server-infra-postgres</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</parent>

<artifactId>postgres-fragmentation-repository</artifactId>
Expand Down
Loading

0 comments on commit 1e32b22

Please sign in to comment.