Skip to content

Commit

Permalink
Merge pull request #15 from AxonIQ/generate-data
Browse files Browse the repository at this point in the history
Initial work to have data automatically generated.  To make this happ…
  • Loading branch information
benrunchey authored Apr 24, 2024
2 parents dc88d2b + 0926557 commit 29fe4f2
Show file tree
Hide file tree
Showing 15 changed files with 374 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.axoniq.demo.bikerental.coreapi.payment;

public class PaymentStatusNamedQueries {
public static final String GET_PAYMENT_ID = "getPaymentId";
public static final String GET_ALL_PAYMENTS = "getAllPayments";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.axoniq.demo.bikerental.coreapi.rental;

public class BikeStatusNamedQueries {
public static final String FIND_ALL = "findAll";
public static final String FIND_ONE = "findOne";
public static final String FIND_AVAILABLE = "findAvailable";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.axoniq.demo.bikerental.coreapi.rental;

public record CountOfBikesByTypeQuery(String bikeType) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ Our next task in defining our projection is to implement the support for handlin

We need to add a `@QueryHandler` method for each query we want to support. Since we already have the bike statuses persisted in the way we need to return the information, we only need to query the database and return that.

Before jumping into creating the methds to handle the queries, we need to consider how we are going to identify the different queries.

=== Using named queries.

Axon Framework allows different ways to identify a query message and link that query to the right method for handling it. In this tutorial we are going to start using the most simple way to identify a query: by assigning each query a name.

We are going to use `String` constants to make sure we always refer to the same query name, both in the modules that send the query messages and in the components that are handling them.

So, as the first step we will create a class to define and share those query names in different components. Define the following class in the `core-api` module:

[source,java]
./core-api/src/main/java/io/axoniq/demo/bikerental/coreapi/rental/BikeStatusNamedQueries.java
----
include::example$core-api/src/main/java/io/axoniq/demo/bikerental/coreapi/rental/BikeStatusNamedQueries.java[]
----

Now that we have the names of the queries defined, let's define the methods that will handle and respond them.


=== Implement a query to return all the bikes.

Let's start by implementing a method to return all the bikes (with their status) defined in our system. Add the following method to your `BikeStatusProjection` class:
Expand All @@ -130,7 +149,8 @@ include::example$rental/src/main/java/io/axoniq/demo/bikerental/rental/query/Bik
<.> Since our query has no parameters (we want to retrieve the information for *all the bikes* in our system), our query handler method does not receive any parameters. It only needs to return the list of items we find in our DB.
<.> As we have the information already prepared and aligned with the response format (thanks to the `EventHandler`s), we only need to retrieve the information from the repository and return it.

In short, we have defined a query handler method that Axon Framework will call upon the reception of a query message to `findAll` the bikes in our system. And the method will simply retrieve the up-to-date information from the DB and return the `BikeStatus` for all the bikes.
In short, we have defined a query handler method that Axon Framework will call upon the reception of a query message to `FIND_ALL` the bikes in our system. And the method will simply retrieve the up-to-date information from the DB and return the `BikeStatus` for all the bikes.


=== Implementing support for other queries in our projection.

Expand All @@ -141,12 +161,12 @@ For example, if we want to support queries to return all the available bikes, fi
[source,java]
.rental/src/main/java/io/axoniq/demo/bikerental/rental/query/BikeStatusProjection.java
----
include::example$rental/src/main/java/io/axoniq/demo/bikerental/rental/query/BikeStatusProjection.java[tags=ClassDefinition;QueryHandlers;!*]
include::example$rental/src/main/java/io/axoniq/demo/bikerental/rental/query/BikeStatusProjection.java[tags=ClassDefinition;findAvailableQueryHandler;findOneQueryHandler;!*]
----
<.> We define a new `QueryHandler` method for the `findAvailable` query.
<.> The query will filter by the type of the bike, so we need to add the `bikeType` argument to the method.
<.> We need to add a specific method to our `BikeStatusRepository` that implements the query to the DB. We will do that right after this. Since we are using Spring Data, the name of the method should follow a specific pattern. (More on this in a few lines)
<.> We define another `QueryHandler` method for the `findOne` query.
<.> We define another `QueryHandler` method for the `FIND_ONE` query.
<.> In this query, we only need to return one bike, and we need the `bikeId` as an argument to the method. In this case, we will return a single `BikeStatus` because we are returning a single element and not a collection.
<.> The default `findById` method provided by the Spring Data `JpaRepository` returns an `Optional<T>` when we look up an item based on its `id`. This is because the `id` we are looking for may not exist in our DB. So we add a fallback to return `null` in case there is no bike with the given `bikeId` in the DB.

Expand Down
4 changes: 2 additions & 2 deletions frontend/src/pages/IndexPage.vue
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ import URL from '../settings'
const newBikeParams = ref({
count: 10,
type: "mountainbike"
type: "mountain bike"
})
const rentalsParams = ref({
bikeType : "mountainbike",
bikeType : "mountain bike",
loops : "64",
concurrency : "8",
abandonPaymentFactor: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class UserInterfaceApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
inventory.size=100
inventory.bikeType=mountain bike

rentalSimulation.bikeType=mountain bike
rentalSimulation.loops=64
rentalSimulation.concurrency=8
rentalSimulation.abandonPaymentFactor=0
rentalSimulation.delayBetweenLoops=1000
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@EntityScan(basePackageClasses = {BikeStatus.class, SagaEntry.class, TokenEntry.class})
@SpringBootApplication
@EnableScheduling
public class RentalApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package io.axoniq.demo.bikerental.rental.query;

import io.axoniq.demo.bikerental.coreapi.rental.BikeInUseEvent;
import io.axoniq.demo.bikerental.coreapi.rental.BikeRegisteredEvent;
import io.axoniq.demo.bikerental.coreapi.rental.BikeRequestedEvent;
import io.axoniq.demo.bikerental.coreapi.rental.BikeReturnedEvent;
import io.axoniq.demo.bikerental.coreapi.rental.BikeStatus;
import io.axoniq.demo.bikerental.coreapi.rental.RentalStatus;
import io.axoniq.demo.bikerental.coreapi.rental.RequestRejectedEvent;
import io.axoniq.demo.bikerental.coreapi.rental.*;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryUpdateEmitter;
Expand Down Expand Up @@ -79,7 +73,6 @@ public void on(BikeReturnedEvent event) {
updateEmitter.emit(q -> "findAll".equals(q.getQueryName()), bs);
updateEmitter.emit(String.class, event.bikeId()::equals, bs);
});

}

@EventHandler
Expand All @@ -98,21 +91,30 @@ public void on(RequestRejectedEvent event) {
//end::EventHandlers[]
//tag::QueryHandlers[]
//tag::findAllQueryHandler[]
@QueryHandler(queryName = "findAll") //<.>
@QueryHandler(queryName = BikeStatusNamedQueries.FIND_ALL) //<.>
public Iterable<BikeStatus> findAll() { // <.>
return bikeStatusRepository.findAll(); //<.>
}

//end::findAllQueryHandler[]
@QueryHandler(queryName = "findAvailable") //<.>
//tag::findAvailableQueryHandler[]
@QueryHandler(queryName = BikeStatusNamedQueries.FIND_AVAILABLE) //<.>
public Iterable<BikeStatus> findAvailable(String bikeType) { //<.>
return bikeStatusRepository.findAllByBikeTypeAndStatus(bikeType, RentalStatus.AVAILABLE);
}

@QueryHandler(queryName = "findOne") // <.>
//end::findAvailableQueryHandler[]
//tag::findOneQueryHandler[]
@QueryHandler(queryName = BikeStatusNamedQueries.FIND_ONE) // <.>
public BikeStatus findOne(String bikeId) { //<.>
return bikeStatusRepository.findById(bikeId).orElse(null); //<.>
}

//end::findOneQueryHandler[]
@QueryHandler
public long countOfBikesByType(CountOfBikesByTypeQuery query) {
return bikeStatusRepository.countBikeStatusesByBikeType(query.bikeType());
}
//end::QueryHandlers[]
}
//end::ClassDefinition[]
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public interface BikeStatusRepository

//tag::QueryMethods[]
List<BikeStatus> findAllByBikeTypeAndStatus(String bikeType, RentalStatus status);
long countBikeStatusesByBikeType(String bikeType);
//end::QueryMethods[]

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.axoniq.demo.bikerental.rental.ui;

import io.axoniq.demo.bikerental.coreapi.payment.ConfirmPaymentCommand;
import io.axoniq.demo.bikerental.coreapi.rental.*;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestParam;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@Component
public class BikeRentalDataGenerator {
private static final List<String> RENTERS = Arrays.asList("Allard", "Steven", "Josh", "David", "Marc", "Sara", "Milan", "Jeroen", "Marina", "Jeannot");
private static final List<String> LOCATIONS = Arrays.asList("Amsterdam", "Paris", "Vilnius", "Barcelona", "London", "New York", "Toronto", "Berlin", "Milan", "Rome", "Belgrade");


private final CommandGateway commandGateway;
private final QueryGateway queryGateway;

Logger logger = LoggerFactory.getLogger(BikeRentalDataGenerator.class);

public BikeRentalDataGenerator(CommandGateway commandGateway, QueryGateway queryGateway) {
this.commandGateway = commandGateway;
this.queryGateway = queryGateway;
}


public String randomRenter() {
return RENTERS.get(ThreadLocalRandom.current().nextInt(RENTERS.size()));
}

public String randomLocation() {
return LOCATIONS.get(ThreadLocalRandom.current().nextInt(LOCATIONS.size()));
}

public CompletableFuture<Void> generateBikes(int bikeCount, String bikeType) {
CompletableFuture<Void> all = CompletableFuture.completedFuture(null);
for (int i = 0; i < bikeCount; i++) {
all = CompletableFuture.allOf(all,
commandGateway.send(new RegisterBikeCommand(UUID.randomUUID().toString(), bikeType, randomLocation())));
}
return all;
}


public Flux<String> generateRentals(String bikeType,
int loops,
int concurrency,
int abandonPaymentFactor,
int delay) {

return this.internalGenerateRentals(bikeType, loops, concurrency, abandonPaymentFactor, delay);

}

private Flux<String> internalGenerateRentals(String bikeType,
int loops,
int concurrency,
int abandonPaymentFactor,
int delay) {

return Flux.range(0, loops)
.flatMap(j -> executeRentalCycle(bikeType, randomRenter(), abandonPaymentFactor, delay)
.map(r -> "OK - Rented, Payed and Returned\n")
.onErrorResume(e -> Mono.just("Not ok: " + e.getMessage() + "\n")),
concurrency);

}

private Mono<String> executeRentalCycle(String bikeType, String renter, int abandonPaymentFactor, int delay) {
CompletableFuture<String> result = selectRandomAvailableBike(bikeType)
.thenCompose(bikeId -> commandGateway.send(new RequestBikeCommand(bikeId, renter))
.thenComposeAsync(paymentRef -> executePayment(bikeId,
(String) paymentRef,
abandonPaymentFactor),
CompletableFuture.delayedExecutor(randomDelay(
delay), TimeUnit.MILLISECONDS))
.thenCompose(r -> whenBikeUnlocked(bikeId))
.thenComposeAsync(r -> commandGateway.send(new ReturnBikeCommand(
bikeId,
randomLocation())),
CompletableFuture.delayedExecutor(randomDelay(
delay), TimeUnit.MILLISECONDS))
.thenApply(r -> bikeId));
return Mono.fromFuture(result);
}

private CompletableFuture<String> selectRandomAvailableBike(String bikeType) {
return queryGateway.query("findAvailable", bikeType, ResponseTypes.multipleInstancesOf(BikeStatus.class))
.thenApply(this::pickRandom)
.thenApply(BikeStatus::getBikeId);
}

private CompletableFuture<String> executePayment(String bikeId, String paymentRef, int abandonPaymentFactor) {
if (abandonPaymentFactor > 0 && ThreadLocalRandom.current().nextInt(abandonPaymentFactor) == 0) {
return CompletableFuture.failedFuture(new IllegalStateException("Customer refused to pay"));
}
SubscriptionQueryResult<String, String> queryResult = queryGateway.subscriptionQuery("getPaymentId",
paymentRef,
String.class,
String.class);
return queryResult.initialResult().concatWith(queryResult.updates())
.filter(Objects::nonNull)
.doOnNext(n -> queryResult.close())
.next()
.flatMap(paymentId -> Mono.fromFuture(commandGateway.send(new ConfirmPaymentCommand(paymentId))))
.map(o -> bikeId)
.toFuture();
}
private CompletableFuture<String> whenBikeUnlocked(String bikeId) {
SubscriptionQueryResult<BikeStatus, BikeStatus> queryResult = queryGateway.subscriptionQuery(BikeStatusNamedQueries.FIND_ONE, bikeId, BikeStatus.class, BikeStatus.class);
return queryResult.initialResult().concatWith(queryResult.updates())
.any(status -> status.getStatus() == RentalStatus.RENTED)
.map(s -> bikeId)
.doOnNext(n -> queryResult.close())
.toFuture();
}
private int randomDelay(int delay) {
if (delay <= 0) {
return 0;
}
return ThreadLocalRandom.current().nextInt(delay - (delay >> 2), delay + delay + (delay >> 2));
}

private <T> T pickRandom(List<T> source) {
return source.get(ThreadLocalRandom.current().nextInt(source.size()));
}
}
Loading

0 comments on commit 29fe4f2

Please sign in to comment.