Skip to content

Commit

Permalink
Merge pull request #294 from bcgov/feature/GRAD2-2423
Browse files Browse the repository at this point in the history
GRAD2-2423: task is complete.
  • Loading branch information
kamal-mohammed authored Dec 8, 2023
2 parents d66aede + bd8eb96 commit 6f05d06
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 128 deletions.
1 change: 1 addition & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
src/main/java/ca/bc/gov/educ/api/trax/scheduler/**,
src/main/java/ca/bc/gov/educ/api/trax/exception/**,
src/main/java/ca/bc/gov/educ/api/trax/model/**,
src/main/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorService,
src/main/java/ca/bc/gov/educ/api/trax/util/**,
src/main/java/ca/bc/gov/educ/api/trax/repository/**
</sonar.exclusions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -25,19 +26,20 @@
@Component
@Slf4j
public class ChoreographEventHandler {
private final Executor singleTaskExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("task-executor-%d").build())
.setCorePoolSize(1).setMaximumPoolSize(1).build();
private final Executor eventExecutor;
private final Map<String, EventService> eventServiceMap;

public ChoreographEventHandler(final List<EventService> eventServices) {
this.eventServiceMap = new HashMap<>();
this.eventExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("event-executor-%d").build())
.setCorePoolSize(10).setMaximumPoolSize(20).setKeepAliveTime(Duration.ofSeconds(60)).build();
eventServices.forEach(eventService -> this.eventServiceMap.put(eventService.getEventType(), eventService));
}

public void handleEvent(@NonNull final Event event) {
//only one thread will process all the request. since RDB won't handle concurrent requests.
this.singleTaskExecutor.execute(() -> {
this.eventExecutor.execute(() -> {
try {
switch (event.getEventType()) {
case "GRAD_STUDENT_GRADUATED":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
@Slf4j
public class Subscriber {

private final Executor subscriberExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("jet-stream-subscriber-%d").build())
.setCorePoolSize(2).setMaximumPoolSize(2).setKeepAliveTime(Duration.ofMillis(1000)).build();
private final Executor subscriberExecutor;
private final EventHandlerDelegatorService eventHandlerDelegatorService;
private final Map<String, List<String>> streamTopicsMap = new HashMap<>(); // one stream can have multiple topics.
private final Connection natsConnection;
Expand All @@ -57,6 +55,9 @@ public Subscriber(final Connection natsConnection, final EventHandlerDelegatorSe
this.eventHandlerDelegatorService = eventHandlerDelegatorService;
this.natsConnection = natsConnection;
this.constants = constants;
this.subscriberExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("jet-stream-subscriber-%d").build())
.setCorePoolSize(10).setMaximumPoolSize(10).setKeepAliveTime(Duration.ofSeconds(60)).build();
this.initializeStreamTopicMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static ca.bc.gov.educ.api.trax.constant.EventStatus.DB_COMMITTED;

@Component
Expand Down Expand Up @@ -54,12 +56,13 @@ public JetStreamEventScheduler(final EventRepository eventRepository,
@Scheduled(cron = "${cron.scheduled.process.events.grad-to-trax.run}")
@SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.grad-to-trax.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.grad-to-trax.lockAtMostFor}")
public void findAndProcessEvents() {
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getGradToTraxCronRun(), constants.getGradToTraxLockAtMostFor());
LockAssert.assertLocked();
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getGradToTraxCronRun(), constants.getGradToTraxLockAtMostFor());
final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString());
if (!results.isEmpty()) {
var filteredList = results.stream().filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))).toList();
int cnt = 0;
for (Event e : results) {
for (Event e : filteredList) {
if (cnt++ >= constants.getGradToTraxProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getGradToTraxProcessingThreshold());
break;
Expand All @@ -77,12 +80,13 @@ public void findAndProcessEvents() {
@Scheduled(cron = "${cron.scheduled.process.events.trax-to-grad.run}")
@SchedulerLock(name = "PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.trax-to-grad.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.trax-to-grad.lockAtMostFor}")
public void findAndPublishGradStatusEventsToJetStream() {
log.debug("PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor());
LockAssert.assertLocked();
log.debug("PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor());
final var results = this.traxUpdatedPubEventRepository.findByEventStatusOrderByCreateDate(DB_COMMITTED.toString());
if (!results.isEmpty()) {
var filteredList = results.stream().filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))).toList();
int cnt = 0;
for (TraxUpdatedPubEvent el : results) {
for (TraxUpdatedPubEvent el : filteredList) {
if (cnt++ >= constants.getTraxToGradProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public TraxUpdateTriggeredRecordScheduler(final TraxUpdateService traxUpdateServ
@Scheduled(cron = "${cron.scheduled.process.trigger-jobs.read-trax-update.run}") // every 5 minute
@SchedulerLock(name = "PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS", lockAtLeastFor = "${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtMostFor}")
public void scheduledRunForTraxUpdates() {
log.debug("PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS: started - cron {}, lockAtMostFor {}", constants.getTraxTriggersCronRun(), constants.getTraxTriggersLockAtMostFor());
LockAssert.assertLocked();
log.debug("PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS: started - cron {}, lockAtMostFor {}", constants.getTraxTriggersCronRun(), constants.getTraxTriggersLockAtMostFor());
final var results = traxUpdateService.getOutstandingList();
if (!results.isEmpty()) {
int cnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Event persistEventToDB(final ChoreographedEvent choreographedEvent) throw
.createDate(LocalDateTime.now())
.updateDate(LocalDateTime.now())
.build();
return eventRepository.save(event);
return this.eventRepository.save(event);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,12 @@ public void handleChoreographyEvent(@NonNull final ChoreographedEvent choreograp
try {
if (message.getSubject().equalsIgnoreCase(TRAX_UPDATE_EVENT_TOPIC.toString())) {
this.choreographedEventPersistenceService.updateEventStatus(choreographedEvent);
if (message.isJetStream()) {
message.ack();
log.warn("acknowledged to Jet Stream for TRAX UPDATE EVENT sent...");
}
message.ack();
log.warn("acknowledged to Jet Stream for TRAX UPDATE EVENT sent...");
} else {
final var persistedEvent = this.choreographedEventPersistenceService.persistEventToDB(choreographedEvent);
if (message.isJetStream()) {
message.ack(); // acknowledge to Jet Stream that api got the message and it is now in DB.
log.warn("acknowledged to Jet Stream for GRAD STATUS EVENT received...");
}
message.ack(); // acknowledge to Jet Stream that api got the message and it is now in DB.
log.warn("acknowledged to Jet Stream for GRAD STATUS EVENT received...");
this.choreographer.handleEvent(persistedEvent);
}
} catch (final BusinessException businessException) {
Expand Down

This file was deleted.

0 comments on commit 6f05d06

Please sign in to comment.