Skip to content

Commit

Permalink
Merge pull request #289 from bcgov/feature/GRAD2-2334
Browse files Browse the repository at this point in the history
GRAD2-2334: task is completed.
  • Loading branch information
kamal-mohammed authored Sep 19, 2023
2 parents 6970057 + e08b416 commit 563d9af
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import ca.bc.gov.educ.api.trax.choreographer.ChoreographEventHandler;
import ca.bc.gov.educ.api.trax.messaging.jetstream.Publisher;
import ca.bc.gov.educ.api.trax.model.entity.Event;
import ca.bc.gov.educ.api.trax.model.entity.TraxUpdatedPubEvent;
import ca.bc.gov.educ.api.trax.repository.TraxUpdatedPubEventRepository;
import ca.bc.gov.educ.api.trax.repository.EventRepository;
import ca.bc.gov.educ.api.trax.util.EducGradTraxApiConstants;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockAssert;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static ca.bc.gov.educ.api.trax.constant.EventStatus.DB_COMMITTED;

@Component
Expand All @@ -31,6 +32,8 @@ public class JetStreamEventScheduler {

private final Publisher publisher;

private final EducGradTraxApiConstants constants;

/**
* Instantiates a new Stan event scheduler.
*
Expand All @@ -39,46 +42,58 @@ public class JetStreamEventScheduler {
*/
public JetStreamEventScheduler(final EventRepository eventRepository,
final TraxUpdatedPubEventRepository traxUpdatedPubEventRepository,
final ChoreographEventHandler choreographer, Publisher publisher) {
final ChoreographEventHandler choreographer, Publisher publisher,
final EducGradTraxApiConstants constants) {
this.eventRepository = eventRepository;
this.traxUpdatedPubEventRepository = traxUpdatedPubEventRepository;
this.choreographer = choreographer;
this.publisher = publisher;
this.constants = constants;
}

@Scheduled(cron = "${cron.scheduled.process.events.grad-to-trax.run}") // minimum = every 5 minutes
@Scheduled(cron = "${cron.scheduled.process.events.grad-to-trax.run}")
@SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.grad-to-trax.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.grad-to-trax.lockAtMostFor}")
public void findAndProcessEvents() {
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getGradToTraxCronRun(), constants.getGradToTraxLockAtMostFor());
LockAssert.assertLocked();
final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString());
if (!results.isEmpty()) {
results.stream()
.filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5)))
.forEach(el -> {
try {
choreographer.handleEvent(el);
} catch (final Exception ex) {
log.error("Exception while trying to handle GRAD updated message", ex);
}
});
int cnt = 0;
for (Event e : results) {
if (cnt++ >= constants.getGradToTraxProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getGradToTraxProcessingThreshold());
break;
}
try {
choreographer.handleEvent(e);
} catch (final Exception ex) {
log.error("Exception while trying to handle GRAD updated message", ex);
}
}
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: processing is completed");
}
}

@Scheduled(cron = "${cron.scheduled.process.events.trax-to-grad.run}") // minimum = every 5 minutes
@Scheduled(cron = "${cron.scheduled.process.events.trax-to-grad.run}")
@SchedulerLock(name = "PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.trax-to-grad.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.trax-to-grad.lockAtMostFor}")
public void findAndPublishGradStatusEventsToJetStream() {
log.debug("PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor());
LockAssert.assertLocked();
final var results = this.traxUpdatedPubEventRepository.findByEventStatusOrderByCreateDate(DB_COMMITTED.toString());
if (!results.isEmpty()) {
results.stream()
.filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5)))
.forEach(el -> {
try {
publisher.dispatchChoreographyEvent(el);
} catch (final Exception ex) {
log.error("Exception while trying to handle TRAX updated message", ex);
}
});
int cnt = 0;
for (TraxUpdatedPubEvent el : results) {
if (cnt++ >= constants.getTraxToGradProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold());
break;
}
try {
publisher.dispatchChoreographyEvent(el);
} catch (final Exception ex) {
log.error("Exception while trying to handle TRAX updated message", ex);
}
}
log.debug("PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM: processing is completed");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ca.bc.gov.educ.api.trax.scheduler;

import ca.bc.gov.educ.api.trax.model.entity.TraxUpdateInGradEntity;
import ca.bc.gov.educ.api.trax.service.TraxUpdateService;
import ca.bc.gov.educ.api.trax.util.EducGradTraxApiConstants;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockAssert;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
Expand All @@ -12,25 +14,35 @@
public class TraxUpdateTriggeredRecordScheduler {
private final TraxUpdateService traxUpdateService;

public TraxUpdateTriggeredRecordScheduler(final TraxUpdateService traxUpdateService) {
private final EducGradTraxApiConstants constants;

public TraxUpdateTriggeredRecordScheduler(final TraxUpdateService traxUpdateService,
final EducGradTraxApiConstants constants) {
this.traxUpdateService = traxUpdateService;
this.constants = constants;
}

@Scheduled(cron = "${cron.scheduled.process.trigger-jobs.read-trax-update.run}") // every 5 minute
@SchedulerLock(name = "PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS", lockAtLeastFor = "${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtMostFor}")
public void scheduledRunForTraxUpdates() {
log.debug("PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS: started - cron {}, lockAtMostFor {}", constants.getTraxTriggersCronRun(), constants.getTraxTriggersLockAtMostFor());
LockAssert.assertLocked();
final var results = traxUpdateService.getOutstandingList();
if (!results.isEmpty()) {
results.stream()
.forEach(ts -> {
try {
traxUpdateService.publishTraxUpdatedEvent(ts);
traxUpdateService.updateStatus(ts);
} catch (final Exception ex) {
log.error("Exception while trying to handle update_in_grad records", ex);
}
});
int cnt = 0;
for (TraxUpdateInGradEntity ts : results) {
if (cnt++ >= constants.getTraxTriggersProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getTraxTriggersProcessingThreshold());
break;
}
try {
traxUpdateService.publishTraxUpdatedEvent(ts);
traxUpdateService.updateStatus(ts);
} catch (final Exception ex) {
log.error("Exception while trying to handle update_in_grad records", ex);
}
}
log.debug("PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS: processing is completed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class EducGradTraxApiConstants {
public static final String GRAD_TRAX_COMMON_URL_MAPPING = GRAD_TRAX_API_ROOT_MAPPING + "/common";
public static final String GRAD_TSW_URL_MAPPING = GRAD_TRAX_API_ROOT_MAPPING + "/tsw";


// Service Methods Mappings
public static final String GET_ALL_COUNTRY_MAPPING = "/country";
public static final String GET_ALL_COUNTRY_BY_CODE_MAPPING = "/country/{countryCode}";
Expand Down Expand Up @@ -124,4 +123,44 @@ public class EducGradTraxApiConstants {

@Value("${endpoint.educ-school-api.get-all-schools.url}")
private String allSchoolSchoolApiUrl;

// Scheduler: ongoing updates from TRAX to GRAD
@Value("${cron.scheduled.process.events.trax-to-grad.run}")
private String traxToGradCronRun;

@Value("${cron.scheduled.process.events.trax-to-grad.lockAtLeastFor}")
private String traxToGradLockAtLeastFor;

@Value("${cron.scheduled.process.events.trax-to-grad.lockAtMostFor}")
private String traxToGradLockAtMostFor;

@Value("${cron.scheduled.process.events.trax-to-grad.threshold}")
private int traxToGradProcessingThreshold;

// Scheduler: ongoing updates from GRAD to TRAX
@Value("${cron.scheduled.process.events.grad-to-trax.run}")
private String gradToTraxCronRun;

@Value("${cron.scheduled.process.events.grad-to-trax.lockAtLeastFor}")
private String gradToTraxLockAtLeastFor;

@Value("${cron.scheduled.process.events.grad-to-trax.lockAtMostFor}")
private String gradToTraxLockAtMostFor;

@Value("${cron.scheduled.process.events.grad-to-trax.threshold}")
private int gradToTraxProcessingThreshold;

// Scheduler: TRAX triggers
@Value("${cron.scheduled.process.trigger-jobs.read-trax-update.run}")
private String traxTriggersCronRun;

@Value("${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtLeastFor}")
private String traxTriggersLockAtLeastFor;

@Value("${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtMostFor}")
private String traxTriggersLockAtMostFor;

@Value("${cron.scheduled.process.trigger-jobs.read-trax-update.threshold}")
private int traxTriggersProcessingThreshold;

}
3 changes: 3 additions & 0 deletions api/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,18 @@ cron:
run: ${CRON_SCHEDULED_GRAD_TO_TRAX_EVENTS}
lockAtLeastFor: ${CRON_SCHEDULED_GRAD_TO_TRAX_EVENTS_LOCK_AT_LEAST_FOR}
lockAtMostFor: ${CRON_SCHEDULED_GRAD_TO_TRAX_EVENTS_LOCK_AT_MOST_FOR}
threshold: ${CRON_SCHEDULED_GRAD_TO_TRAX_EVENTS_THRESHOLD}
trax-to-grad:
run: ${CRON_SCHEDULED_TRAX_TO_GRAD_EVENTS}
lockAtLeastFor: ${CRON_SCHEDULED_TRAX_TO_GRAD_EVENTS_LOCK_AT_LEAST_FOR}
lockAtMostFor: ${CRON_SCHEDULED_TRAX_TO_GRAD_EVENTS_LOCK_AT_MOST_FOR}
threshold: ${CRON_SCHEDULED_TRAX_TO_GRAD_EVENTS_THRESHOLD}
trigger-jobs:
read-trax-update:
run: ${CRON_SCHEDULED_TRIGGER_TRAX_UPDATES}
lockAtLeastFor: ${CRON_SCHEDULED_TRIGGER_TRAX_UPDATES_LOCK_AT_LEAST_FOR}
lockAtMostFor: ${CRON_SCHEDULED_TRIGGER_TRAX_UPDATES_LOCK_AT_MOST_FOR}
threshold: ${CRON_SCHEDULED_TRIGGER_TRAX_UPDATES_THRESHOLD}

#Incremental Trax Update
trax:
Expand Down
3 changes: 3 additions & 0 deletions api/src/test/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,18 @@ cron:
run: 0 0/10 * * * *
lockAtLeastFor: PT1M
lockAtMostFor: PT4M
threshold: 100
trax-to-grad:
run: 0 0/10 * * * *
lockAtLeastFor: PT1M
lockAtMostFor: PT4M
threshold: 100
trigger-jobs:
read-trax-update:
run: 0 0/2 * * * *
lockAtLeastFor: 800ms
lockAtMostFor: 900ms
threshold: 100

#Incremental Trax Update
trax:
Expand Down

0 comments on commit 563d9af

Please sign in to comment.