Skip to content

Commit

Permalink
CTR make DRC remote traversal request the server to bulk results by d…
Browse files Browse the repository at this point in the history
…efault, and update token naming to be format-consistent
  • Loading branch information
xiazcy committed Nov 27, 2024
1 parent 038ba3b commit 56c72f6
Show file tree
Hide file tree
Showing 22 changed files with 85 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void init(final Connection connection) {

httpCompressionDecoder = new HttpContentDecompressionHandler();
gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptors(),
cluster.isUserAgentOnConnectEnabled(), cluster.isBulkingEnabled(), connection.getUri());
cluster.isUserAgentOnConnectEnabled(), cluster.isBulkResultsEnabled(), connection.getUri());
gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer());
if (cluster.getIdleConnectionTimeout() > 0) {
final int idleConnectionTimeout = (int) (cluster.getIdleConnectionTimeout() / 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Requ
options.getG().ifPresent(g -> request.addG(g));
options.getLanguage().ifPresent(lang -> request.addLanguage(lang));
options.getMaterializeProperties().ifPresent(mp -> request.addMaterializeProperties(mp));
options.getBulked().ifPresent(bulked -> request.addBulkedResult(Boolean.parseBoolean(bulked)));
options.getBulkResults().ifPresent(bulked -> request.addBulkResults(Boolean.parseBoolean(bulked)));

return submitAsync(request.create());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static Builder getBuilderFromSettings(final Settings settings) {
.connectionSetupTimeoutMillis(settings.connectionPool.connectionSetupTimeoutMillis)
.idleConnectionTimeoutMillis(settings.connectionPool.idleConnectionTimeout)
.enableUserAgentOnConnect(settings.enableUserAgentOnConnect)
.enableBulkedResult(settings.enableBulkedResult)
.bulkResults(settings.bulkResults)
.validationRequest(settings.connectionPool.validationRequest);

if (!settings.auth.type.isEmpty()) {
Expand Down Expand Up @@ -475,10 +475,10 @@ public boolean isUserAgentOnConnectEnabled() {
}

/**
* Checks if cluster is configured to send bulked results
* Checks if cluster is configured to bulk results
*/
public boolean isBulkingEnabled() {
return manager.isBulkedResultEnabled();
public boolean isBulkResultsEnabled() {
return manager.isBulkResultsEnabled();
}

public final static class Builder {
Expand Down Expand Up @@ -513,7 +513,7 @@ public final static class Builder {
private long connectionSetupTimeoutMillis = Connection.CONNECTION_SETUP_TIMEOUT_MILLIS;
private long idleConnectionTimeoutMillis = Connection.CONNECTION_IDLE_TIMEOUT_MILLIS;
private boolean enableUserAgentOnConnect = true;
private boolean enableBulkedResult = false;
private boolean bulkResults = false;

private Builder() {
addInterceptor(SERIALIZER_INTERCEPTOR_NAME,
Expand Down Expand Up @@ -891,10 +891,10 @@ public Builder enableUserAgentOnConnect(final boolean enableUserAgentOnConnect)

/**
* Configures whether cluster will enable result bulking to optimize performance.
* @param enableBulkedResult true enables bulking.
* @param bulkResults true enables bulking.
*/
public Builder enableBulkedResult(final boolean enableBulkedResult) {
this.enableBulkedResult = enableBulkedResult;
public Builder bulkResults(final boolean bulkResults) {
this.bulkResults = bulkResults;
return this;
}

Expand Down Expand Up @@ -966,7 +966,7 @@ class Manager {
private final int port;
private final String path;
private final boolean enableUserAgentOnConnect;
private final boolean enableBulkedResult;
private final boolean bulkResults;

private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

Expand All @@ -979,7 +979,7 @@ private Manager(final Builder builder) {
this.contactPoints = builder.getContactPoints();
this.interceptors = builder.interceptors;
this.enableUserAgentOnConnect = builder.enableUserAgentOnConnect;
this.enableBulkedResult = builder.enableBulkedResult;
this.bulkResults = builder.bulkResults;

connectionPoolSettings = new Settings.ConnectionPoolSettings();
connectionPoolSettings.maxSize = builder.maxConnectionPoolSize;
Expand Down Expand Up @@ -1147,8 +1147,8 @@ public boolean isUserAgentOnConnectEnabled() {
/**
* Checks if cluster is configured to send bulked results
*/
public boolean isBulkedResultEnabled() {
return enableBulkedResult;
public boolean isBulkResultsEnabled() {
return bulkResults;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Optional;

import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_BATCH_SIZE;
import static org.apache.tinkerpop.gremlin.util.Tokens.BULKED;
import static org.apache.tinkerpop.gremlin.util.Tokens.BULK_RESULTS;
import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_EVAL_TIMEOUT;
import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_G;
import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_LANGUAGE;
Expand All @@ -49,7 +49,7 @@ public final class RequestOptions {
private final Long timeout;
private final String language;
private final String materializeProperties;
private final String bulkedResult;
private final String bulkResults;

private RequestOptions(final Builder builder) {
this.graphOrTraversalSource = builder.graphOrTraversalSource;
Expand All @@ -58,7 +58,7 @@ private RequestOptions(final Builder builder) {
this.timeout = builder.timeout;
this.language = builder.language;
this.materializeProperties = builder.materializeProperties;
this.bulkedResult = builder.bulkedResult;
this.bulkResults = builder.bulkResults;
}

public Optional<String> getG() {
Expand All @@ -83,7 +83,7 @@ public Optional<String> getLanguage() {

public Optional<String> getMaterializeProperties() { return Optional.ofNullable(materializeProperties); }

public Optional<String> getBulked() { return Optional.ofNullable(bulkedResult); }
public Optional<String> getBulkResults() { return Optional.ofNullable(bulkResults); }

public static Builder build() {
return new Builder();
Expand All @@ -103,10 +103,12 @@ public static RequestOptions getRequestOptions(final GremlinLang gremlinLang) {
builder.materializeProperties((String) options.get(ARGS_MATERIALIZE_PROPERTIES));
if (options.containsKey(ARGS_LANGUAGE))
builder.language((String) options.get(ARGS_LANGUAGE));
if (options.containsKey(BULKED))
builder.withBulkedResult((boolean) options.get(BULKED));

if (options.containsKey(BULK_RESULTS))
builder.bulkResults((boolean) options.get(BULK_RESULTS));
}
// request the server to bulk results by default when using DRC through request options
if (builder.bulkResults == null)
builder.bulkResults(true);

final Map<String, Object> parameters = gremlinLang.getParameters();
if (parameters != null && !parameters.isEmpty()) {
Expand All @@ -122,7 +124,7 @@ public static final class Builder {
private Long timeout = null;
private String materializeProperties = null;
private String language = null;
private String bulkedResult;
private String bulkResults = null;

/**
* The aliases to set on the request.
Expand Down Expand Up @@ -187,10 +189,10 @@ public Builder materializeProperties(final String materializeProperties) {
}

/**
* Enables or disables bulked result on the server.
* Sets the bulkResults flag to be sent on the request. A value of turn will enable server to bulk results.
*/
public Builder withBulkedResult(final boolean bulked) {
this.bulkedResult = String.valueOf(bulked);
public Builder bulkResults(final boolean bulking) {
this.bulkResults = String.valueOf(bulking);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public final class Settings {
/**
* Toggles if result from server is bulked. Default is false.
*/
public boolean enableBulkedResult = false;
public boolean bulkResults = false;

/**
* Read configuration from a file into a new {@link Settings} object.
Expand Down Expand Up @@ -132,8 +132,8 @@ public static Settings from(final Configuration conf) {
if (conf.containsKey("enableUserAgentOnConnect"))
settings.enableUserAgentOnConnect = conf.getBoolean("enableUserAgentOnConnect");

if (conf.containsKey("enableBulkedResult"))
settings.enableBulkedResult = conf.getBoolean("enableBulkedResult");
if (conf.containsKey("bulkResults"))
settings.bulkResults = conf.getBoolean("bulkResults");

if (conf.containsKey("hosts"))
settings.hosts = conf.getList("hosts").stream().map(Object::toString).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req

private final MessageSerializer<?> serializer;
private final boolean userAgentEnabled;
private final boolean bulkedResultEnabled;
private final boolean bulkResults;
private final List<Pair<String, ? extends RequestInterceptor>> interceptors;
private final URI uri;

public HttpGremlinRequestEncoder(final MessageSerializer<?> serializer,
final List<Pair<String, ? extends RequestInterceptor>> interceptors,
final boolean userAgentEnabled, boolean bulkedResultEnabled, final URI uri) {
final boolean userAgentEnabled, boolean bulkResults, final URI uri) {
this.serializer = serializer;
this.interceptors = interceptors;
this.userAgentEnabled = userAgentEnabled;
this.bulkedResultEnabled = bulkedResultEnabled;
this.bulkResults = bulkResults;
this.uri = uri;
}

Expand All @@ -88,8 +88,8 @@ protected void encode(final ChannelHandlerContext channelHandlerContext, final R
if (userAgentEnabled) {
headersMap.put(HttpRequest.Headers.USER_AGENT, UserAgent.USER_AGENT);
}
if (bulkedResultEnabled) {
headersMap.put(Tokens.BULKED, "true");
if (bulkResults) {
headersMap.put(Tokens.BULK_RESULTS, "true");
}
HttpRequest gremlinRequest = new HttpRequest(headersMap, requestMessage, uri);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void shouldCreateFromConfiguration() {
conf.setProperty("serializer.className", "my.serializers.MySerializer");
conf.setProperty("serializer.config.any", "thing");
conf.setProperty("enableUserAgentOnConnect", false);
conf.setProperty("enableBulkedResult", true);
conf.setProperty("bulkResults", true);
conf.setProperty("connectionPool.enableSsl", true);
conf.setProperty("connectionPool.keyStore", "server.jks");
conf.setProperty("connectionPool.keyStorePassword", "password2");
Expand Down Expand Up @@ -84,7 +84,7 @@ public void shouldCreateFromConfiguration() {
assertEquals("my.serializers.MySerializer", settings.serializer.className);
assertEquals("thing", settings.serializer.config.get("any"));
assertEquals(false, settings.enableUserAgentOnConnect);
assertTrue(settings.enableBulkedResult);
assertTrue(settings.bulkResults);
assertThat(settings.connectionPool.enableSsl, is(true));
assertEquals("server.jks", settings.connectionPool.keyStore);
assertEquals("password2", settings.connectionPool.keyStorePassword);
Expand Down
2 changes: 1 addition & 1 deletion gremlin-python/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ services:
&& python3 ./setup.py build
&& python3 ./setup.py test
&& python3 ./setup.py install
&& radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0' --user-data='bulked=true'
&& radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0' --user-data='bulking=true'
&& radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0'
&& radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.gremlin-v4.0+json';
EXIT_CODE=$$?; chown -R `stat -c "%u:%g" .` .; exit $$EXIT_CODE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ def __init__(self, url, traversal_source, protocol_factory=None,
request_serializer=serializer.GraphBinarySerializersV4(),
response_serializer=None, interceptors=None, auth=None,
headers=None, enable_user_agent_on_connect=True,
enable_bulked_result=False, **transport_kwargs):
bulk_results=False, **transport_kwargs):
log.info("Creating Client with url '%s'", url)

self._closed = False
self._url = url
self._headers = headers
self._enable_user_agent_on_connect = enable_user_agent_on_connect
self._enable_bulked_result = enable_bulked_result
self._bulk_results = bulk_results
self._traversal_source = traversal_source
if "max_content_length" not in transport_kwargs:
transport_kwargs["max_content_length"] = 10 * 1024 * 1024
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Connection:

def __init__(self, url, traversal_source, protocol, transport_factory,
executor, pool, headers=None, enable_user_agent_on_connect=True,
enable_bulked_result=False):
bulk_results=False):
self._url = url
self._headers = headers
self._traversal_source = traversal_source
Expand All @@ -40,9 +40,9 @@ def __init__(self, url, traversal_source, protocol, transport_factory,
self._enable_user_agent_on_connect = enable_user_agent_on_connect
if self._enable_user_agent_on_connect:
self.__add_header(useragent.userAgentHeader, useragent.userAgent)
self._enable_bulked_result = enable_bulked_result
if self._enable_bulked_result:
self.__add_header("bulked", "true")
self._bulk_results = bulk_results
if self._bulk_results:
self.__add_header("bulkResults", "true")

def connect(self):
if self._transport:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, url, traversal_source="g", protocol_factory=None,
request_serializer=serializer.GraphBinarySerializersV4(),
response_serializer=None, interceptors=None, auth=None,
headers=None, enable_user_agent_on_connect=True,
enable_bulked_result=False, **transport_kwargs):
bulk_results=False, **transport_kwargs):
log.info("Creating DriverRemoteConnection with url '%s'", str(url))
self.__url = url
self.__traversal_source = traversal_source
Expand All @@ -47,7 +47,7 @@ def __init__(self, url, traversal_source="g", protocol_factory=None,
self.__auth = auth
self.__headers = headers
self.__enable_user_agent_on_connect = enable_user_agent_on_connect
self.__enable_bulked_result = enable_bulked_result
self.__bulk_results = bulk_results
self.__transport_kwargs = transport_kwargs

if response_serializer is None:
Expand All @@ -62,7 +62,7 @@ def __init__(self, url, traversal_source="g", protocol_factory=None,
interceptors=interceptors, auth=auth,
headers=headers,
enable_user_agent_on_connect=enable_user_agent_on_connect,
enable_bulked_result=enable_bulked_result,
bulk_results=bulk_results,
**transport_kwargs)
self._url = self._client._url
self._traversal_source = self._client._traversal_source
Expand Down Expand Up @@ -123,6 +123,9 @@ def extract_request_options(gremlin_lang):
for os in gremlin_lang.options_strategies:
request_options.update({token: os.configuration[token] for token in Tokens
if token in os.configuration})
# request the server to bulk results by default when using drc through request options
if 'bulkResults' not in request_options:
request_options['bulkResults'] = True
if gremlin_lang.parameters is not None and len(gremlin_lang.parameters) > 0:
request_options["params"] = gremlin_lang.parameters

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
'RequestMessage', ['fields', 'gremlin'])

Tokens = ['batchSize', 'bindings', 'g', 'gremlin', 'language',
'evaluationTimeout', 'materializeProperties', 'timeoutMs', 'userAgent', 'bulked']
'evaluationTimeout', 'materializeProperties', 'timeoutMs', 'userAgent', 'bulkResults']
4 changes: 2 additions & 2 deletions gremlin-python/src/main/python/radish/terrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def __create_remote(server_graph_name):
else:
raise ValueError('serializer not found - ' + world.config.user_data["serializer"])

bulked = world.config.user_data["bulked"] == "true" if "bulked" in world.config.user_data else False
bulking = world.config.user_data["bulking"] == "true" if "bulking" in world.config.user_data else False

return DriverRemoteConnection(test_no_auth_url, server_graph_name,
request_serializer=s, response_serializer=s,
enable_bulked_result=bulked)
bulk_results=bulking)
9 changes: 4 additions & 5 deletions gremlin-python/src/main/python/tests/driver/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import pytest
from gremlin_python.driver.client import Client
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver.request import RequestMessage
from gremlin_python.driver.serializer import GraphBinarySerializersV4
Expand Down Expand Up @@ -266,7 +265,7 @@ def test_client_gremlin_lang_request_options_with_binding(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
# Note that bindings for constructed traversals is done via Parameter only
t = g.with_('language', 'gremlin-lang').V(Parameter.var('x', [1, 2, 3])).count()
request_opts = DriverRemoteConnection.extract_request_options(t.gremlin_lang)
request_opts = {'language': 'gremlin-lang', 'params': {'x': [1, 2, 3]}}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
assert result_set.all().result()[0] == 3
Expand Down Expand Up @@ -381,7 +380,7 @@ def thread_run(tr, result_list):
def test_client_gremlin_lang_with_short(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.with_('language', 'gremlin-lang').V().has('age', short(16)).count()
request_opts = DriverRemoteConnection.extract_request_options(t.gremlin_lang)
request_opts = {'language': 'gremlin-lang'}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
results = []
Expand All @@ -393,7 +392,7 @@ def test_client_gremlin_lang_with_short(client):
def test_client_gremlin_lang_with_long(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V().has('age', long(851401972585122)).count()
request_opts = DriverRemoteConnection.extract_request_options(t.gremlin_lang)
request_opts = {}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
results = []
Expand All @@ -405,7 +404,7 @@ def test_client_gremlin_lang_with_long(client):
def test_client_gremlin_lang_with_bigint(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.with_('language', 'gremlin-lang').V().has('age', bigint(0x1000_0000_0000_0000_0000)).count()
request_opts = DriverRemoteConnection.extract_request_options(t.gremlin_lang)
request_opts = {'language': 'gremlin-lang'}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
results = []
Expand Down
Loading

0 comments on commit 56c72f6

Please sign in to comment.