Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch #440

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build-drivers.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
api project(':drivers:linkedbq')
api project(':drivers:atomicq')
api project(':drivers:syncq')
api project(':drivers:elasticsearch')
/* api project(':drivers:sbktemplate') */
/* above line is a signature */
}
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@
<allow pkg="net.rubyeye" />
<allow pkg="software.amazon"/>
<allow pkg="org.openjdk.jmh"/>
<allow pkg="co.elastic.clients" />
<allow pkg="org.elasticsearch.client" />
</import-control>
202 changes: 202 additions & 0 deletions drivers/elasticsearch/README.md

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions drivers/elasticsearch/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
api project(":sbk-api")

// Add your storage driver specific dependencies here
api 'co.elastic.clients:elasticsearch-java:8.15.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.javaprop.JavaPropsFactory;
import io.sbk.api.DataReader;
import io.sbk.api.DataWriter;
import io.sbk.data.impl.SbkString;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Storage;
import io.sbk.data.DataType;
import io.sbk.params.InputOptions;
import io.sbk.system.Printer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Class for Elasticsearch storage driver.
*
* Incase if your data type in other than byte[] (Byte Array)
* then change the datatype and getDataType.
*/
public class Elasticsearch implements Storage<String> {
private final static String CONFIGFILE = "Elasticsearch.properties";
private ElasticsearchConfig config;
private ElasticsearchClient elasticsearchClient;

public static long generateStartKey(int id) {
return (long) id * (long) Integer.MAX_VALUE;
}

@Override
public void addArgs(final InputOptions params) throws IllegalArgumentException {
final ObjectMapper mapper = new ObjectMapper(new JavaPropsFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
config = mapper.readValue(
Objects.requireNonNull(Elasticsearch.class.getClassLoader().getResourceAsStream(CONFIGFILE)),
ElasticsearchConfig.class);
} catch (Exception ex) {
ex.printStackTrace();
throw new IllegalArgumentException(ex);
}

// change and uncomment the below code as per your driver specific parameters
// params.addOption("param", true, "Elasticsearch parameter, default param: " + config.param);
params.addOption("user", true, "ElasticSearch user : " + config.user);
params.addOption("password", true, "ElasticSearch Password " + config.password);
params.addOption("url", true, "ElasticSearch URL:" + config.url);
params.addOption("index", true, "ElasticSearch Index: " + config.index);
}

@Override
public void parseArgs(final ParameterOptions params) throws IllegalArgumentException {
// change and uncommnet the below code as per your driver specific parameters
// config.param = params.getOptionValue("param", config.param);
config.user = params.getOptionValue("user", config.user);
config.password = params.getOptionValue("password", config.password);
config.url = params.getOptionValue("url", config.url);
config.index = params.getOptionValue("index", config.index);
}

@Override
public void openStorage(final ParameterOptions params) throws IOException {
try {
elasticsearchClient = connect();
Printer.log.info("ElasticSearch Client Connected.....");
String index1 = config.index.trim();
if (!indexExists(index1)) {
createIndex(elasticsearchClient, index1);
}
} catch (ElasticsearchException e ) {
Printer.log.error(e.getMessage());
throw new RuntimeException(e);
}
}

private ElasticsearchClient connect() {
Printer.log.info("Attempting to connect to Elasticsearch...");
try {
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(config.user, config.password));

RestClientBuilder builder = RestClient.builder(HttpHost.create(config.url))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
} catch (ElasticsearchException e) {
Printer.log.error("Error connecting to Elasticsearch: " + e.getMessage());
throw new RuntimeException(e);
}
}

private boolean indexExists(String indexName) {
try {
BooleanResponse response = elasticsearchClient.
indices().exists(e -> e.index(indexName));
return response.value();
} catch (IOException e) {
e.printStackTrace();
return false;
}
}

private void createIndex(ElasticsearchClient client, String indexName) {
try {
CreateIndexRequest createIndexRequest = CreateIndexRequest.of(c -> c.index(indexName));
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);

if (createIndexResponse.acknowledged()) {
Printer.log.info(indexName + "Created Successfully");
} else {
Printer.log.info("Index creation was not acknowledged.");
}
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void closeStorage(final ParameterOptions params) throws IOException {
try {
elasticsearchClient._transport().close();
} catch (ElasticsearchException e) {
e.printStackTrace();
Printer.log.error("Failed to close the connection");
}

}

@Override
public DataWriter<String> createWriter(final int id, final ParameterOptions params) {
return new ElasticsearchWriter(id, params, config, elasticsearchClient);
}

@Override
public DataReader<String> createReader(final int id, final ParameterOptions params) {
return new ElasticsearchReader(id, params, config, elasticsearchClient);
}

@Override
public DataType<String> getDataType() {
return new SbkString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright (c) KMG. All Rights Reserved..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.sbk.driver.Elasticsearch;

/**
* Class for Elasticsearch storage configuration.
*/
public class ElasticsearchConfig {
// Add Elasticsearch Storage driver configuration parameters
public String user;
public String password;
public String url;
public String index;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.GetResponse;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Reader;
import io.sbk.system.Printer;

import java.io.IOException;
import java.util.Map;

/**
* Class for Elasticsearch Reader.
*/
public class ElasticsearchReader implements Reader<String> {
private final ElasticsearchConfig config;
private final ElasticsearchClient client;
private long id;

public ElasticsearchReader(int readerId, ParameterOptions params, ElasticsearchConfig config, ElasticsearchClient client) {
this.id = Elasticsearch.generateStartKey(readerId);
this.config = config;
this.client = client;
}

@Override
public String read() throws IOException {
try {
GetRequest request = GetRequest.of(g -> g
.index(config.index.trim())
.id(String.valueOf(id++))
);
GetResponse<Map> response = client.get(request, Map.class);
return response.fields().toString();
} catch (ElasticsearchException e) {
Printer.log.error("Elastic Search: recordRead failed !");
throw new IOException(e);
}
}

@Override
public void close() throws IOException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.sbk.driver.Elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import io.sbk.params.ParameterOptions;
import io.sbk.api.Writer;
import io.sbk.system.Printer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;


/**
* Class for Elasticsearch Writer.
*/
public class ElasticsearchWriter implements Writer<String> {
private final ElasticsearchConfig config;
private final ElasticsearchClient client;
private long id;

public ElasticsearchWriter(int writerID, ParameterOptions params, ElasticsearchConfig config, ElasticsearchClient client) {
this.id = Elasticsearch.generateStartKey(writerID);
this.config = config;
this.client = client;
}

@Override
public CompletableFuture writeAsync(String data) throws IOException {
try {
writeData(data);
} catch (ElasticsearchException ex ) {
Printer.log.error("Elastic Search: recordWrite failed !");
throw new IOException(ex);
}
return null;
}

@Override
public void sync() throws IOException {
}

@Override
public void close() throws IOException {
}


private void writeData(String data) {
Map<String, String> document = new HashMap<>();
document.put("data", data);
try {
IndexRequest<Map<String, String>> request = IndexRequest.of(i -> i
.index(config.index.trim())
.id(String.valueOf(id++))
.document(document)
);
client.index(request);
} catch (ElasticsearchException | IOException ex ) {
Printer.log.error("Elastic Search: recordWrite failed !");
throw new RuntimeException(ex);
}
}
}
13 changes: 13 additions & 0 deletions drivers/elasticsearch/src/main/resources/Elasticsearch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#Copyright (c) KMG. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0

# Elasticsearch storage driver default Properties/parameters
user= rakesh
password= jain@88
url= http://localhost:9200
index=sbkj
1 change: 1 addition & 0 deletions settings-drivers.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ include 'drivers:conqueue'
include 'drivers:linkedbq'
include 'drivers:atomicq'
include 'drivers:syncq'
include 'drivers:elasticsearch'
/* include 'drivers:sbktemplate' */
/* above line is a signature */
Loading