Skip to content

Commit

Permalink
Add SearchFlip::Connection#bulk (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkamel authored Jul 27, 2022
1 parent 4d977f8 commit 8c3f148
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 3 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@

# CHANGELOG

## v3.7.0

* Add `SearchFlip::Connection#bulk` to allow more clean bulk indexing to
multiple indices at once

## v3.6.0

* Support Elasticsearch v8
Expand Down
45 changes: 45 additions & 0 deletions lib/search_flip/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,51 @@ def index_exists?(index_name)
raise e
end

# Initiates and yields a bulk object, such that index, import, create,
# update and delete requests can be appended to the bulk request. Please
# note that you need to manually pass the desired index name as well as
# type name (depending on the Elasticsearch version) when using #bulk on a
# connection object or Elasticsearch will return an error. After the bulk
# requests are successfully processed all existing indices will
# subsequently be refreshed when auto_refresh is enabled.
#
# @see SearchFlip::Config See SearchFlip::Config for auto_refresh
#
# @example
# connection = SearchFlip::Connection.new
#
# connection.bulk ignore_errors: [409] do |bulk|
# bulk.create comment.id, CommentIndex.serialize(comment),
# _index: CommentIndex.index_name, version: comment.version, version_type: "external_gte"
#
# bulk.delete product.id, _index: ProductIndex.index_name, routing: product.user_id
#
# # ...
# end
#
# @param options [Hash] Specifies options regarding the bulk indexing
# @option options ignore_errors [Array] Specifies an array of http status
# codes that shouldn't raise any exceptions, like eg 409 for conflicts,
# ie when optimistic concurrency control is used.
# @option options raise [Boolean] Prevents any exceptions from being
# raised. Please note that this only applies to the bulk response, not to
# the request in general, such that connection errors, etc will still
# raise.

def bulk(options = {})
default_options = {
http_client: http_client,
bulk_limit: bulk_limit,
bulk_max_mb: bulk_max_mb
}

SearchFlip::Bulk.new("#{base_url}/_bulk", default_options.merge(options)) do |indexer|
yield indexer
end

refresh if SearchFlip::Config[:auto_refresh]
end

# Returns the full Elasticsearch type URL, ie base URL, index name with
# prefix and type name.
#
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def delete(scope, options = {}, additional_index_options = {})
scope
end

# Initiates and yields the bulk object, such that index, import, create,
# Initiates and yields a bulk object, such that index, import, create,
# update and delete requests can be appended to the bulk request. Sends a
# refresh request afterwards if auto_refresh is enabled.
#
Expand Down
2 changes: 1 addition & 1 deletion lib/search_flip/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module SearchFlip
VERSION = "3.6.0"
VERSION = "3.7.0"
end
1 change: 0 additions & 1 deletion search_flip.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Gem::Specification.new do |spec|

spec.files = `git ls-files`.split($INPUT_RECORD_SEPARATOR)
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]

spec.post_install_message = <<~MESSAGE
Expand Down
90 changes: 90 additions & 0 deletions spec/search_flip/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,96 @@
end
end

describe "#bulk" do
it "imports objects to the specified indices" do
connection = SearchFlip::Connection.new

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 1, { id: 1 }, _index: CommentIndex.index_name, ** connection.version.to_i < 8 ? { _type: CommentIndex.type_name } : {}
end
end

expect(&bulk).to(change { CommentIndex.total_count }.by(1).and(change { CommentIndex.total_count }.by(1)))
end

it "raises when no index is given" do
connection = SearchFlip::Connection.new

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, id: 1
end
end

expect(&bulk).to raise_error(SearchFlip::ResponseError)
end

it "respects options" do
connection = SearchFlip::Connection.new

connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

bulk = proc do
connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end
end

expect(&bulk).to raise_error(SearchFlip::Bulk::Error)

bulk = proc do
connection.bulk ignore_errors: [409] do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end
end

expect(&bulk).not_to(change { ProductIndex.total_count })
end

it "passes default options" do
allow(SearchFlip::Bulk).to receive(:new)

connection = SearchFlip::Connection.new

connection.bulk do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

expect(SearchFlip::Bulk).to have_received(:new).with(
anything,
http_client: connection.http_client,
bulk_limit: connection.bulk_limit,
bulk_max_mb: connection.bulk_max_mb
)
end

it "passes custom options" do
allow(SearchFlip::Bulk).to receive(:new)

connection = SearchFlip::Connection.new

options = {
bulk_limit: "bulk limit",
bulk_max_mb: "bulk max mb",
http_client: "http client"
}

connection.bulk(options) do |indexer|
indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {}
end

expect(SearchFlip::Bulk).to have_received(:new).with(anything, options)
end
end

describe "#index_url" do
it "returns the index url for the specified index" do
connection = SearchFlip::Connection.new(base_url: "base_url")
Expand Down
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
TestIndex.delete_index if TestIndex.index_exists?
ProductIndex.match_all.delete
Product.delete_all
CommentIndex.match_all.delete
Comment.delete_all
end
end

Expand Down

0 comments on commit 8c3f148

Please sign in to comment.