Skip to content

Commit

Permalink
Support opensearch (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkamel authored Jan 19, 2024
1 parent d1a94d4 commit c44721b
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 74 deletions.
45 changes: 34 additions & 11 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,54 @@ jobs:
fail-fast: false
matrix:
elasticsearch:
- plainpicture/elasticsearch:2.4.1_delete-by-query
- elasticsearch:5.4
- docker.elastic.co/elasticsearch/elasticsearch:6.7.0
- docker.elastic.co/elasticsearch/elasticsearch:7.0.0
- docker.elastic.co/elasticsearch/elasticsearch:7.11.2
- docker.elastic.co/elasticsearch/elasticsearch:8.1.1
- image: plainpicture/elasticsearch:2.4.1_delete-by-query
env:
discovery.type: single-node
xpack.security.enabled: false
- image: elasticsearch:5.4
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2
env:
discovery.type: single-node
xpack.security.enabled: false
- image: docker.elastic.co/elasticsearch/elasticsearch:8.1.1
env:
discovery.type: single-node
xpack.security.enabled: false
- image: opensearchproject/opensearch:1.3.14
env:
discovery.type: single-node
plugins.security.disabled: true
- image: opensearchproject/opensearch:2.11.1
env:
discovery.type: single-node
plugins.security.disabled: true
ruby:
- 2.7
- 3.0
- 3.1
- 3.2
services:
elasticsearch:
image: ${{ matrix.elasticsearch }}
env:
discovery.type: single-node
xpack.security.enabled: false
image: ${{ matrix.elasticsearch.image }}
env: ${{ matrix.elasticsearch.env }}
ports:
- 9200:9200
steps:
- uses: actions/checkout@v1
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
- run: gem install bundler
- run: bundle
- run: bundle exec rspec
- run: bundle exec rubocop
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@

# CHANGELOG

## v3.8.0.

* Support Opensearch 1.x and 2.x

## v3.7.2

* Fix wrong AWS signatures by generating the json before passing it to http-rb
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
Using SearchFlip it is dead-simple to create index classes that correspond to
[Elasticsearch](https://www.elastic.co/) indices and to manipulate, query and
aggregate these indices using a chainable, concise, yet powerful DSL. Finally,
SearchFlip supports Elasticsearch 2.x, 5.x, 6.x, 7.x and 8.x. Check section
[Feature Support](#feature-support) for version dependent features.
SearchFlip supports Elasticsearch 2.x, 5.x, 6.x, 7.x and 8.x as well as
Opensearch 1.x and 2.x. Check section [Feature Support](#feature-support) for
version dependent features.

```ruby
CommentIndex.search("hello world", default_field: "title").where(visible: true).aggregate(:user_id).sort(id: "desc")
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def to_hash
res[:aggregations] = aggregation_values if aggregation_values

if must_values || must_not_values || filter_values
if target.connection.version.to_i >= 2
if target.connection.distribution || target.connection.version.to_i >= 2
res[:filter] = {
bool: {}
.merge(must_values ? { must: must_values } : {})
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def upload
return unless parsed_response["errors"]

parsed_response["items"].each do |item|
item.each do |_, element|
item.each_value do |element|
status = element["status"]

next if status.between?(200, 299)
Expand Down
38 changes: 24 additions & 14 deletions lib/search_flip/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@ def initialize(options = {})
@base_url = options[:base_url] || SearchFlip::Config[:base_url]
@http_client = options[:http_client] || SearchFlip::HTTPClient.new
@bulk_limit = options[:bulk_limit] || SearchFlip::Config[:bulk_limit]
@version_mutex = Mutex.new
end

# Queries and returns the Elasticsearch distribution used.
#
# @example
# connection.distribution # => e.g. "opensearch"
#
# @return [String] The Elasticsearch distribution

def distribution
@distribution ||= SearchFlip::JSON.parse(version_response.to_s)["version"]["distribution"]
end

# Queries and returns the Elasticsearch version used.
#
# @example
# connection.version # => e.g. 2.4.1
# connection.version # => e.g. "2.4.1"
#
# @return [String] The Elasticsearch version

def version
@version_mutex.synchronize do
@version ||= begin
response = http_client.headers(accept: "application/json").get("#{base_url}/")

SearchFlip::JSON.parse(response.to_s)["version"]["number"]
end
end
@version ||= SearchFlip::JSON.parse(version_response.to_s)["version"]["number"]
end

# Queries and returns the Elasticsearch cluster health.
Expand Down Expand Up @@ -64,7 +68,7 @@ def cluster_health
def msearch(criterias)
payload = criterias.flat_map do |criteria|
[
SearchFlip::JSON.generate(index: criteria.target.index_name_with_prefix, **(version.to_i < 8 ? { type: criteria.target.type_name } : {})),
SearchFlip::JSON.generate(index: criteria.target.index_name_with_prefix, **(distribution.nil? && version.to_i < 8 ? { type: criteria.target.type_name } : {})),
SearchFlip::JSON.generate(criteria.request)
]
end
Expand Down Expand Up @@ -300,8 +304,8 @@ def refresh(index_names = nil)
# @return [Boolean] Returns true or raises SearchFlip::ResponseError

def update_mapping(index_name, mapping, type_name: nil)
url = type_name && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}
url = type_name && distribution.nil? && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && distribution.nil? && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}

http_client.put("#{url}/_mapping", params: params, json: mapping)

Expand All @@ -318,8 +322,8 @@ def update_mapping(index_name, mapping, type_name: nil)
# @return [Hash] The current type mapping

def get_mapping(index_name, type_name: nil)
url = type_name && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}
url = type_name && distribution.nil? && version.to_i < 8 ? type_url(index_name, type_name) : index_url(index_name)
params = type_name && distribution.nil? && version.to_f >= 6.7 && version.to_i < 8 ? { include_type_name: true } : {}

response = http_client.headers(accept: "application/json").get("#{url}/_mapping", params: params)

Expand Down Expand Up @@ -422,5 +426,11 @@ def type_url(index_name, type_name)
def index_url(index_name)
"#{base_url}/#{index_name}"
end

private

def version_response
@version_response ||= http_client.headers(accept: "application/json").get("#{base_url}/")
end
end
end
8 changes: 4 additions & 4 deletions lib/search_flip/criteria.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ def delete(params = {})
http_request = connection.http_client
http_request = http_request.timeout(http_timeout_value) if http_timeout_value

if connection.version.to_i >= 5
url = connection.version.to_i < 8 ? target.type_url : target.index_url
if connection.distribution || connection.version.to_i >= 5
url = connection.distribution.nil? && connection.version.to_i < 8 ? target.type_url : target.index_url

http_request.post("#{url}/_delete_by_query", params: request_params.merge(params), json: dupped_request)
else
Expand Down Expand Up @@ -622,15 +622,15 @@ def execute!
json: { scroll: scroll_args[:timeout], scroll_id: scroll_args[:id] }
)
elsif scroll_args
url = connection.version.to_i < 8 ? target.type_url : target.index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? target.type_url : target.index_url

http_request.post(
"#{url}/_search",
params: request_params.merge(scroll: scroll_args[:timeout]),
json: request
)
else
url = connection.version.to_i < 8 ? target.type_url : target.index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? target.type_url : target.index_url

http_request.post("#{url}/_search", params: request_params, json: request)
end
Expand Down
8 changes: 4 additions & 4 deletions lib/search_flip/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def get_mapping
# equal to _doc.

def include_type_name?
type_name != "_doc" || connection.version.to_i < 7
type_name != "_doc" || (connection.distribution.nil? && connection.version.to_i < 7)
end

# Retrieves the document specified by id from Elasticsearch. Raises
Expand All @@ -455,7 +455,7 @@ def include_type_name?
# @return [Hash] The specified document

def get(id, params = {})
url = connection.version.to_i < 8 ? type_url : "#{index_url}/_doc"
url = connection.distribution.nil? && connection.version.to_i < 8 ? type_url : "#{index_url}/_doc"
response = connection.http_client.headers(accept: "application/json").get("#{url}/#{id}", params: params)

SearchFlip::JSON.parse(response.to_s)
Expand All @@ -474,7 +474,7 @@ def get(id, params = {})
# @return [Hash] The raw response

def mget(request, params = {})
url = connection.version.to_i < 8 ? type_url : index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? type_url : index_url
response = connection.http_client.headers(accept: "application/json").post("#{url}/_mget", json: request, params: params)

SearchFlip::JSON.parse(response.to_s)
Expand Down Expand Up @@ -633,7 +633,7 @@ def bulk(options = {})
bulk_max_mb: connection.bulk_max_mb
}

url = connection.version.to_i < 8 ? type_url : index_url
url = connection.distribution.nil? && connection.version.to_i < 8 ? type_url : index_url

SearchFlip::Bulk.new("#{url}/_bulk", default_options.merge(options)) do |indexer|
yield indexer
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module SearchFlip
VERSION = "3.7.2"
VERSION = "3.8.0"
end
2 changes: 1 addition & 1 deletion spec/search_flip/bulk_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

it "uses the specified http_client" do
product = create(:product)
url = ProductIndex.connection.version.to_i < 8 ? ProductIndex.type_url : ProductIndex.index_url
url = ProductIndex.connection.distribution.nil? && ProductIndex.connection.version.to_i < 8 ? ProductIndex.type_url : ProductIndex.index_url

stub_request(:put, "#{url}/_bulk").with(headers: { "X-Header" => "Value" }).to_return(status: 200, body: "{}")

Expand Down
38 changes: 22 additions & 16 deletions spec/search_flip/connection_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
require File.expand_path("../spec_helper", __dir__)

RSpec.describe SearchFlip::Connection do
describe "#distribution" do
it "reutrns the distribution" do
expect([nil, "opensearch"]).to include(SearchFlip::Connection.new.distribution)
end
end

describe "#version" do
it "returns the version" do
expect(SearchFlip::Connection.new.version).to match(/\A[0-9.]+\z/)
Expand Down Expand Up @@ -92,7 +98,7 @@
it "returns the specified indices" do
connection = SearchFlip::Connection.new

expect(connection.get_indices.to_set { |index| index["index"] }).to eq(["comments", "products"].to_set)
expect(connection.get_indices.map { |index| index["index"] }.grep_v(/^\./).to_set).to eq(["comments", "products"].to_set)
expect(connection.get_indices("com*").map { |index| index["index"] }).to eq(["comments"])
end

Expand Down Expand Up @@ -170,7 +176,7 @@
it "freezes the specified index" do
connection = SearchFlip::Connection.new

if connection.version.to_f >= 6.6 && connection.version.to_i < 8
if connection.distribution.nil? && connection.version.to_f >= 6.6 && connection.version.to_i < 8
begin
connection.create_index("index_name")
connection.freeze_index("index_name")
Expand All @@ -187,7 +193,7 @@
it "unfreezes the specified index" do
connection = SearchFlip::Connection.new

if connection.version.to_f >= 6.6 && connection.version.to_i < 8
if connection.distribution.nil? && connection.version.to_f >= 6.6 && connection.version.to_i < 8
begin
connection.create_index("index_name")
connection.freeze_index("index_name")
Expand Down Expand Up @@ -241,7 +247,7 @@
end

describe "#update_mapping" do
if SearchFlip::Connection.new.version.to_i >= 7
if SearchFlip::Connection.new.then { |connection| connection.distribution || connection.version.to_i >= 7 }
it "updates the mapping of an index without type name" do
begin
connection = SearchFlip::Connection.new
Expand All @@ -266,7 +272,7 @@

connection.create_index("index_name")

if connection.version.to_i < 8
if connection.distribution.nil? && connection.version.to_i < 8
connection.update_mapping("index_name", { "type_name" => mapping }, type_name: "type_name")

expect(connection.get_mapping("index_name", type_name: "type_name")).to eq("index_name" => { "mappings" => { "type_name" => mapping } })
Expand Down Expand Up @@ -321,9 +327,9 @@

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: CommentIndex.index_name, ** connection.version.to_i < 8 ? { _type: CommentIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: CommentIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: CommentIndex.type_name } : {}
end
end

Expand All @@ -346,23 +352,23 @@
connection = SearchFlip::Connection.new

connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end
end

expect(&bulk).to raise_error(SearchFlip::Bulk::Error)

bulk = proc do
connection.bulk ignore_errors: [409] do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end
end

Expand All @@ -375,7 +381,7 @@
connection = SearchFlip::Connection.new

connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

expect(SearchFlip::Bulk).to have_received(:new).with(
Expand All @@ -398,7 +404,7 @@
}

connection.bulk(options) do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.distribution.nil? && connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

expect(SearchFlip::Bulk).to have_received(:new).with(anything, options)
Expand Down
Loading

0 comments on commit c44721b

Please sign in to comment.