diff --git a/CHANGELOG.md b/CHANGELOG.md index 10d6744..92e4b66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # CHANGELOG +## v3.7.0 + +* Add `SearchFlip::Connection#bulk` to allow more clean bulk indexing to + multiple indices at once + ## v3.6.0 * Support Elasticsearch v8 diff --git a/lib/search_flip/connection.rb b/lib/search_flip/connection.rb index f55d2f3..a6216e4 100644 --- a/lib/search_flip/connection.rb +++ b/lib/search_flip/connection.rb @@ -355,6 +355,51 @@ def index_exists?(index_name) raise e end + # Initiates and yields a bulk object, such that index, import, create, + # update and delete requests can be appended to the bulk request. Please + # note that you need to manually pass the desired index name as well as + # type name (depending on the Elasticsearch version) when using #bulk on a + # connection object or Elasticsearch will return an error. After the bulk + # requests are successfully processed all existing indices will + # subsequently be refreshed when auto_refresh is enabled. + # + # @see SearchFlip::Config See SearchFlip::Config for auto_refresh + # + # @example + # connection = SearchFlip::Connection.new + # + # connection.bulk ignore_errors: [409] do |bulk| + # bulk.create comment.id, CommentIndex.serialize(comment), + # _index: CommentIndex.index_name, version: comment.version, version_type: "external_gte" + # + # bulk.delete product.id, _index: ProductIndex.index_name, routing: product.user_id + # + # # ... + # end + # + # @param options [Hash] Specifies options regarding the bulk indexing + # @option options ignore_errors [Array] Specifies an array of http status + # codes that shouldn't raise any exceptions, like eg 409 for conflicts, + # ie when optimistic concurrency control is used. + # @option options raise [Boolean] Prevents any exceptions from being + # raised. Please note that this only applies to the bulk response, not to + # the request in general, such that connection errors, etc will still + # raise. + + def bulk(options = {}) + default_options = { + http_client: http_client, + bulk_limit: bulk_limit, + bulk_max_mb: bulk_max_mb + } + + SearchFlip::Bulk.new("#{base_url}/_bulk", default_options.merge(options)) do |indexer| + yield indexer + end + + refresh if SearchFlip::Config[:auto_refresh] + end + # Returns the full Elasticsearch type URL, ie base URL, index name with # prefix and type name. # diff --git a/lib/search_flip/index.rb b/lib/search_flip/index.rb index ac9754f..3205805 100644 --- a/lib/search_flip/index.rb +++ b/lib/search_flip/index.rb @@ -601,7 +601,7 @@ def delete(scope, options = {}, additional_index_options = {}) scope end - # Initiates and yields the bulk object, such that index, import, create, + # Initiates and yields a bulk object, such that index, import, create, # update and delete requests can be appended to the bulk request. Sends a # refresh request afterwards if auto_refresh is enabled. # diff --git a/lib/search_flip/version.rb b/lib/search_flip/version.rb index 484049d..96763fc 100644 --- a/lib/search_flip/version.rb +++ b/lib/search_flip/version.rb @@ -1,3 +1,3 @@ module SearchFlip - VERSION = "3.6.0" + VERSION = "3.7.0" end diff --git a/search_flip.gemspec b/search_flip.gemspec index fc0d469..c067053 100644 --- a/search_flip.gemspec +++ b/search_flip.gemspec @@ -15,7 +15,6 @@ Gem::Specification.new do |spec| spec.files = `git ls-files`.split($INPUT_RECORD_SEPARATOR) spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) } - spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] spec.post_install_message = <<~MESSAGE diff --git a/spec/search_flip/connection_spec.rb b/spec/search_flip/connection_spec.rb index fa58853..1bdda4d 100644 --- a/spec/search_flip/connection_spec.rb +++ b/spec/search_flip/connection_spec.rb @@ -315,6 +315,96 @@ end end + describe "#bulk" do + it "imports objects to the specified indices" do + connection = SearchFlip::Connection.new + + bulk = proc do + connection.bulk do |indexer| + indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + indexer.index 1, { id: 1 }, _index: CommentIndex.index_name, ** connection.version.to_i < 8 ? { _type: CommentIndex.type_name } : {} + end + end + + expect(&bulk).to(change { CommentIndex.total_count }.by(1).and(change { CommentIndex.total_count }.by(1))) + end + + it "raises when no index is given" do + connection = SearchFlip::Connection.new + + bulk = proc do + connection.bulk do |indexer| + indexer.index 1, id: 1 + end + end + + expect(&bulk).to raise_error(SearchFlip::ResponseError) + end + + it "respects options" do + connection = SearchFlip::Connection.new + + connection.bulk do |indexer| + indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + end + + bulk = proc do + connection.bulk do |indexer| + indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + end + end + + expect(&bulk).to raise_error(SearchFlip::Bulk::Error) + + bulk = proc do + connection.bulk ignore_errors: [409] do |indexer| + indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + indexer.index 2, { id: 2 }, _index: ProductIndex.index_name, version: 1, version_type: "external", ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + end + end + + expect(&bulk).not_to(change { ProductIndex.total_count }) + end + + it "passes default options" do + allow(SearchFlip::Bulk).to receive(:new) + + connection = SearchFlip::Connection.new + + connection.bulk do |indexer| + indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + end + + expect(SearchFlip::Bulk).to have_received(:new).with( + anything, + http_client: connection.http_client, + bulk_limit: connection.bulk_limit, + bulk_max_mb: connection.bulk_max_mb + ) + end + + it "passes custom options" do + allow(SearchFlip::Bulk).to receive(:new) + + connection = SearchFlip::Connection.new + + options = { + bulk_limit: "bulk limit", + bulk_max_mb: "bulk max mb", + http_client: "http client" + } + + connection.bulk(options) do |indexer| + indexer.index 1, { id: 1 }, _index: ProductIndex.index_name, ** connection.version.to_i < 8 ? { _type: ProductIndex.type_name } : {} + end + + expect(SearchFlip::Bulk).to have_received(:new).with(anything, options) + end + end + describe "#index_url" do it "returns the index url for the specified index" do connection = SearchFlip::Connection.new(base_url: "base_url") diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1f7ae1c..c8fd3c9 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -16,6 +16,8 @@ TestIndex.delete_index if TestIndex.index_exists? ProductIndex.match_all.delete Product.delete_all + CommentIndex.match_all.delete + Comment.delete_all end end