Skip to content

Commit

Permalink
Fix event client deserialization and add publishEvent (#141)
Browse files Browse the repository at this point in the history
* Fix event client deserialization and add publishEvent for `conductor:` type queues

* remove publishEvent
  • Loading branch information
c4lm authored Aug 17, 2023
1 parent ff8393a commit b5156ae
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
4 changes: 3 additions & 1 deletion src/main/java/io/orkes/conductor/client/EventClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@



import java.util.Map;

import io.orkes.conductor.client.model.event.QueueConfiguration;

public abstract class EventClient extends com.netflix.conductor.client.http.EventClient {
public abstract String getQueueConfig(QueueConfiguration queueConfiguration);
public abstract Map<String, Object> getQueueConfig(QueueConfiguration queueConfiguration);

public abstract void deleteQueueConfig(QueueConfiguration queueConfiguration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.orkes.conductor.client.http;

import java.util.List;
import java.util.Map;

import com.netflix.conductor.common.metadata.events.EventHandler;

Expand Down Expand Up @@ -72,8 +73,8 @@ public void unregisterEventHandler(String name) {
}

@Override
public String getQueueConfig(QueueConfiguration queueConfiguration) {
return eventResourceApi.getQueueConfig(queueConfiguration.getQueueType(), queueConfiguration.getQueueName()).toString();
public Map<String, Object> getQueueConfig(QueueConfiguration queueConfiguration) {
return eventResourceApi.getQueueConfig(queueConfiguration.getQueueType(), queueConfiguration.getQueueName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,8 @@ private com.squareup.okhttp.Call getQueueConfigValidateBeforeCall(
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the
* response body
*/
public Object getQueueConfig(String queueType, String queueName) throws ApiException {
ApiResponse<Object> resp = getQueueConfigWithHttpInfo(queueType, queueName);
public Map<String, Object> getQueueConfig(String queueType, String queueName) throws ApiException {
ApiResponse<Map<String, Object>> resp = getQueueConfigWithHttpInfo(queueType, queueName);
return resp.getData();
}

Expand All @@ -649,11 +649,11 @@ public Object getQueueConfig(String queueType, String queueName) throws ApiExcep
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the
* response body
*/
private ApiResponse<Object> getQueueConfigWithHttpInfo(String queueType, String queueName)
private ApiResponse<Map<String, Object>> getQueueConfigWithHttpInfo(String queueType, String queueName)
throws ApiException {
com.squareup.okhttp.Call call =
getQueueConfigValidateBeforeCall(queueType, queueName, null, null);
Type localVarReturnType = new TypeReference<Object>() {}.getType();
Type localVarReturnType = new TypeReference<Map<String, Object>>() {}.getType();
return apiClient.execute(call, localVarReturnType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.orkes.conductor.client.api;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -82,13 +83,9 @@ void testKafkaQueueConfiguration() throws Exception {
eventClient.getQueueConfig(queueConfiguration);
});
eventClient.putQueueConfig(queueConfiguration);
String configurationResponse = eventClient.getQueueConfig(queueConfiguration);
assertTrue(
configurationResponse.contains(
"consumer={max.poll.records=1000, bootstrap.servers=localhost:9092, heartbeat.interval.ms=1000}"));
assertTrue(
configurationResponse.contains(
"producer={batch.size=1024, bootstrap.servers=localhost:9092}"));
Map<String, Object> configurationResponse = eventClient.getQueueConfig(queueConfiguration);
assertTrue(configurationResponse.containsKey("consumer"));
assertTrue(configurationResponse.containsKey("producer"));
eventClient.deleteQueueConfig(queueConfiguration);
}

Expand Down

0 comments on commit b5156ae

Please sign in to comment.