Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a game can run on multiple engines #107 #139

Merged
merged 1 commit into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions buildscripts/engine.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ oasis {
maxConsumerThreadPoolSize: 4

gameEventsConsumer = {
# groupId = ""

# If not specified, this will take engine id
# If not specified, this will be engine id
# instanceId = ""

# These props will directly feed to Kafka property configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,21 @@ private void gameSpecificCommand(GameCommand gameCommand) {
int gameId = gameCommand.getGameId();
ExternalPartyImpl eventSource = ExternalParty.EXTERNAL_PARTY.get(getContext().getSystem());
if (status == GameCommand.GameLifecycle.CREATE || status == GameCommand.GameLifecycle.START) {
if (acquireGameLock(gameId, engineContext.id())) {
createGameRuleRefNx(gameId);

if (gamesRunning.add(gameId)) {
contextMap.put(gameId, loadGameContext(gameId));
publishGameState(gameId, GameState.STARTED);
} else {
mainLog.info("Game {} is already running in this engine. So skipping re-registration.", gameId);
}
eventSource.ackGameStateChanged(gameCommand);
mainLog.info("Successfully acquired the game {} to be run on this engine having id {}. Game Event: {}",
gameId, engineContext.id(), status);
createGameRuleRefNx(gameId);

if (gamesRunning.add(gameId)) {
contextMap.put(gameId, loadGameContext(gameId));
publishGameState(gameId, GameState.STARTED);
} else {
mainLog.warning("Cannot acquire game {} for this engine, because it is already owned by another engine!", gameId);
eventSource.nackGameStateChanged(gameCommand);
mainLog.info("Game {} is already running in this engine. So skipping re-registration.", gameId);
}
eventSource.ackGameStateChanged(gameCommand);
mainLog.info("Game engine '{}' is ready to run the game id: {}. Ack game state: {}",
engineContext.id(), gameId, status);
} else if (status == GameCommand.GameLifecycle.REMOVE) {
if (releaseGameLock(gameId, engineContext.id())) {
gamesRunning.remove(gameId);
contextMap.remove(gameId);
} else {
mainLog.info("The game {} is not running in this engine. Skipping remove message.", gameId);
}
gamesRunning.remove(gameId);
contextMap.remove(gameId);

publishGameState(gameId, GameState.STOPPED);
eventSource.ackGameStateChanged(gameCommand);
} else if (status == GameCommand.GameLifecycle.UPDATE) {
Expand All @@ -200,40 +192,6 @@ private void gameSpecificCommand(GameCommand gameCommand) {
}
}

private boolean releaseGameLock(int gameId, String myId) {
String gameIdStr = String.valueOf(gameId);
try (DbContext context = engineContext.getDb().createContext()) {
String owningEngine = context.getValueFromMap(ID.GAME_ENGINES, gameIdStr);
if (myId.equals(owningEngine)) {
mainLog.info("Removing game lock... (gameId: {})", gameId);
context.removeKeyFromMap(ID.GAME_ENGINES, gameIdStr);
publishGameState(gameId, GameState.STOPPED, context);
mainLog.info("Removed game lock! (gameId: {})", gameId);
return true;
}
} catch (IOException e) {
mainLog.error("Cannot acquire game lock! Unexpected error!", e);
}
return false;
}

private boolean acquireGameLock(int gameId, String myId) {
String gameIdStr = String.valueOf(gameId);
try (DbContext context = engineContext.getDb().createContext()) {
boolean locked = context.setIfNotExistsInMap(ID.GAME_ENGINES, gameIdStr, myId);
if (!locked) {
String currentEngineRunning = context.getValueFromMap(ID.GAME_ENGINES, gameIdStr);
mainLog.info("Game {} is currently run by the engine having id {}. My engine id = {}",
gameId, currentEngineRunning, myId);
return myId.equals(currentEngineRunning);
}
return true;
} catch (IOException e) {
mainLog.error("Cannot acquire game lock! Unexpected error!", e);
}
return false;
}

private GameContext loadGameContext(int gameId) {
return new GameContext(gameId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testMultipleEngines() throws OasisException, IOException, ExecutionE

boolean gameRunningE1 = engine1.isGameRunning(1);
boolean gameRunningE2 = engine2.isGameRunning(1);
Assertions.assertTrue(gameRunningE1 ^ gameRunningE2);
Assertions.assertTrue(gameRunningE1 && gameRunningE2);

engine1.stopGame(1);
engine2.stopGame(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
final class KafkaConstants {

static final String GAME_EVENT_CONSUMER_GROUP = "oasis-game-event-consumer-%d";

static final String TOPIC_GAME_EVENTS = "oasis.game.events";
static final String TOPIC_GAME_ANNOUNCEMENTS = "oasis.game.announcements";
static final String TOPIC_ENGINE_RELATED_EVENTS = "oasis.engine.events";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void init(RuntimeContextSupport context, MessageReceiver sinkRef) throws
consumerPool = Executors.newCachedThreadPool();

LOG.debug("Initializing kafka broadcast topic consumer...");
Properties broadcastingProps = KafkaUtils.getBroadcastConsumerProps(kafkaConfigs, engineId);
Properties broadcastingProps = KafkaUtils.getBroadcastConsumerProps(kafkaConfigs);
String instanceId = broadcastingProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
String uniqueId = engineId + "::" + instanceId;
gameBroadcastConsumer = new KafkaBroadcastConsumerRunner(sinkRef, KafkaConstants.TOPIC_GAME_ANNOUNCEMENTS, uniqueId);
Expand Down Expand Up @@ -116,7 +116,7 @@ public void handleGameCommand(GameCommand gameCommand) {
gameEventsConsumers.put(gameId, gameReader);
GameEventHandler handler = new GameEventHandler(sinkRef);

Properties thisConsumerProps = KafkaUtils.createGameEventConsumerProps(providedKafkaConfigs, engineId);
Properties thisConsumerProps = KafkaUtils.createGameEventConsumerProps(providedKafkaConfigs, gameId, engineId);
LOG.info("Subscribing to game {} event topic with configs {}...", gameId, thisConsumerProps);
gameReader.init(thisConsumerProps, handler);
consumerPool.submit(gameReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static Properties getFeedConsumerProps(KafkaConfigs kafkaConfigs) {
return props;
}

static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs, String engineId) {
static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down Expand Up @@ -148,23 +148,18 @@ static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs, String en
return props;
}

static Properties createGameEventConsumerProps(KafkaConfigs kafkaConfigs, String engineId) {
static Properties createGameEventConsumerProps(KafkaConfigs kafkaConfigs, int gameId, String engineId) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigs.getBrokerUrls());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// if user has specified custom kafka configs for game event topics...
String consumerGroupId = null;
String consumerGroupId = String.format(KafkaConstants.GAME_EVENT_CONSUMER_GROUP, gameId);
if (kafkaConfigs.getGameEventsConsumer() != null) {
if (Utils.isNotEmpty(kafkaConfigs.getGameEventsConsumer().getProps())) {
props.putAll(kafkaConfigs.getGameEventsConsumer().getProps());
}
consumerGroupId = kafkaConfigs.getGameEventsConsumer().getGroupId();
}

if (Texts.isEmpty(consumerGroupId)) {
consumerGroupId = UUID.randomUUID().toString();
}
String consumerGroupInstanceId = Texts.isEmpty(engineId) ? UUID.randomUUID().toString() : engineId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void getFeedConsumerProps_withoutGroupIdAndProps() {
@Test
void getBroadcastConsumerProps_withGroupId() {
var configs = readConfigs("broadcastConsumer-withGroupId");
var props = KafkaUtils.getBroadcastConsumerProps(configs, "engine-id");
var props = KafkaUtils.getBroadcastConsumerProps(configs);

assertProps(props,
Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092",
Expand All @@ -166,7 +166,7 @@ void getBroadcastConsumerProps_withGroupId() {
@Test
void getBroadcastConsumerProps_withoutGroupIdAndProps() {
var configs = readConfigs("broadcastConsumer-withoutGroupIdAndProps");
var props = KafkaUtils.getBroadcastConsumerProps(configs, "engine-id");
var props = KafkaUtils.getBroadcastConsumerProps(configs);

Assertions.assertEquals(7, props.size());
Assertions.assertNotNull(props.getProperty(GROUP_ID_CONFIG));
Expand All @@ -182,31 +182,31 @@ void getBroadcastConsumerProps_withoutGroupIdAndProps() {
}

@Test
void createGameEventConsumerProps_withGroupId() {
void createGameEventConsumerProps_withEngineId() {
var configs = readConfigs("gameEventsConsumer-withGroupId");
var props = KafkaUtils.createGameEventConsumerProps(configs, "engine-id");
var props = KafkaUtils.createGameEventConsumerProps(configs, 1,"engine-id");

assertProps(props,
Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092",
KEY_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
VALUE_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
GROUP_ID_CONFIG, "test-consumer-group-01",
GROUP_ID_CONFIG, "oasis-game-event-consumer-1",
GROUP_INSTANCE_ID_CONFIG, "engine-id",
AUTO_OFFSET_RESET_CONFIG, KafkaConstants.EARLIEST));
}

@Test
void createGameEventConsumerProps_withoutGroupIdAndProps() {
void createGameEventConsumerProps_withoutEngineIdAndProps() {
var configs = readConfigs("gameEventsConsumer-withoutGroupIdAndProps");
var props = KafkaUtils.createGameEventConsumerProps(configs, "engine-id");
var props = KafkaUtils.createGameEventConsumerProps(configs, 2,null);

Assertions.assertEquals(8, props.size());
Assertions.assertNotNull(props.getProperty(GROUP_ID_CONFIG));
Assertions.assertEquals(props.getProperty(GROUP_ID_CONFIG).length(), UUID.randomUUID().toString().length());
Assertions.assertNotNull(props.getProperty(GROUP_INSTANCE_ID_CONFIG));
Assertions.assertEquals(props.getProperty(GROUP_INSTANCE_ID_CONFIG).length(), UUID.randomUUID().toString().length());
Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092",
KEY_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
VALUE_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
GROUP_INSTANCE_ID_CONFIG, "engine-id",
GROUP_ID_CONFIG, "oasis-game-event-consumer-2",
AUTO_OFFSET_RESET_CONFIG, KafkaConstants.EARLIEST,
MAX_POLL_RECORDS_CONFIG, "500",
ENABLE_AUTO_COMMIT_CONFIG, "true").forEach((k, v) -> {
Expand Down