forked from deephaven/deephaven-core
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This is adding Flight SQL Java ADBC tests, mainly in support of a documentation effort on how to connect different clients to Deephaven Flight SQL, see deephaven/deephaven-docs-community#365
- Loading branch information
1 parent
37b861a
commit 09f8d35
Showing
7 changed files
with
392 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
extensions/flight-sql/src/adbcTest/java/io/deephaven/server/DeephavenServerTestBase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
// | ||
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending | ||
// | ||
package io.deephaven.server; | ||
|
||
import io.deephaven.engine.context.ExecutionContext; | ||
import io.deephaven.io.logger.LogBuffer; | ||
import io.deephaven.io.logger.LogBufferGlobal; | ||
import io.deephaven.server.runner.GrpcServer; | ||
import io.deephaven.server.runner.MainHelper; | ||
import io.deephaven.util.SafeCloseable; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeAll; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Timeout; | ||
|
||
import java.io.IOException; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
@Timeout(30) | ||
public abstract class DeephavenServerTestBase { | ||
|
||
public interface TestComponent { | ||
|
||
GrpcServer server(); | ||
|
||
ExecutionContext executionContext(); | ||
} | ||
|
||
protected TestComponent component; | ||
|
||
private LogBuffer logBuffer; | ||
private SafeCloseable executionContext; | ||
private GrpcServer server; | ||
protected int localPort; | ||
|
||
protected abstract TestComponent component(); | ||
|
||
@BeforeAll | ||
static void setupOnce() throws IOException { | ||
MainHelper.bootstrapProjectDirectories(); | ||
} | ||
|
||
@BeforeEach | ||
void setup() throws IOException { | ||
logBuffer = new LogBuffer(128); | ||
LogBufferGlobal.setInstance(logBuffer); | ||
component = component(); | ||
executionContext = component.executionContext().open(); | ||
server = component.server(); | ||
server.start(); | ||
localPort = server.getPort(); | ||
} | ||
|
||
@AfterEach | ||
void tearDown() throws InterruptedException { | ||
server.stopWithTimeout(10, TimeUnit.SECONDS); | ||
server.join(); | ||
executionContext.close(); | ||
LogBufferGlobal.clear(logBuffer); | ||
} | ||
} |
115 changes: 115 additions & 0 deletions
115
...ons/flight-sql/src/adbcTest/java/io/deephaven/server/flightsql/FlightSqlAdbcTestBase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
// | ||
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending | ||
// | ||
package io.deephaven.server.flightsql; | ||
|
||
import io.deephaven.server.DeephavenServerTestBase; | ||
import org.apache.arrow.adbc.core.AdbcConnection; | ||
import org.apache.arrow.adbc.core.AdbcDatabase; | ||
import org.apache.arrow.adbc.core.AdbcDriver; | ||
import org.apache.arrow.adbc.core.AdbcException; | ||
import org.apache.arrow.adbc.core.AdbcStatement; | ||
import org.apache.arrow.adbc.driver.flightsql.FlightSqlConnectionProperties; | ||
import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriverFactory; | ||
import org.apache.arrow.memory.BufferAllocator; | ||
import org.apache.arrow.memory.RootAllocator; | ||
import org.apache.arrow.vector.IntVector; | ||
import org.apache.arrow.vector.VectorSchemaRoot; | ||
import org.apache.arrow.vector.ipc.ArrowReader; | ||
import org.apache.arrow.vector.types.Types; | ||
import org.apache.arrow.vector.types.pojo.Field; | ||
import org.apache.arrow.vector.types.pojo.FieldType; | ||
import org.apache.arrow.vector.types.pojo.Schema; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public abstract class FlightSqlAdbcTestBase extends DeephavenServerTestBase { | ||
|
||
private static final Map<String, String> DEEPHAVEN_INT = Map.of( | ||
"deephaven:isSortable", "true", | ||
"deephaven:isRowStyle", "false", | ||
"deephaven:isPartitioning", "false", | ||
"deephaven:type", "int", | ||
"deephaven:isNumberFormat", "false", | ||
"deephaven:isStyle", "false", | ||
"deephaven:isDateFormat", "false"); | ||
|
||
BufferAllocator allocator; | ||
AdbcDatabase database; | ||
AdbcConnection connection; | ||
|
||
@BeforeEach | ||
void setUp() throws AdbcException { | ||
final Map<String, Object> options = new HashMap<>(); | ||
AdbcDriver.PARAM_URI.set(options, String.format("grpc://localhost:%d", localPort)); | ||
FlightSqlConnectionProperties.WITH_COOKIE_MIDDLEWARE.set(options, true); | ||
options.put(FlightSqlConnectionProperties.RPC_CALL_HEADER_PREFIX + "Authorization", "Anonymous"); | ||
options.put(FlightSqlConnectionProperties.RPC_CALL_HEADER_PREFIX + "x-deephaven-auth-cookie-request", "true"); | ||
allocator = new RootAllocator(); | ||
database = new FlightSqlDriverFactory().getDriver(allocator).open(options); | ||
connection = database.connect(); | ||
} | ||
|
||
@AfterEach | ||
void tearDown() throws Exception { | ||
connection.close(); | ||
database.close(); | ||
allocator.close(); | ||
} | ||
|
||
@Test | ||
void executeSchema() throws Exception { | ||
final Schema expectedSchema = new Schema(List | ||
.of(new Field("Foo", new FieldType(true, Types.MinorType.INT.getType(), null, DEEPHAVEN_INT), null))); | ||
try (final AdbcStatement statement = connection.createStatement()) { | ||
statement.setSqlQuery("SELECT 42 as Foo"); | ||
assertThat(statement.executeSchema()).isEqualTo(expectedSchema); | ||
} | ||
} | ||
|
||
@Test | ||
void executeQuery() throws Exception { | ||
final Schema expectedSchema = new Schema(List | ||
.of(new Field("Foo", new FieldType(true, Types.MinorType.INT.getType(), null, DEEPHAVEN_INT), null))); | ||
try (final AdbcStatement statement = connection.createStatement()) { | ||
statement.setSqlQuery("SELECT 42 as Foo"); | ||
try (final AdbcStatement.QueryResult result = statement.executeQuery()) { | ||
final ArrowReader reader = result.getReader(); | ||
assertThat(reader.loadNextBatch()).isTrue(); | ||
final VectorSchemaRoot root = reader.getVectorSchemaRoot(); | ||
assertThat(root.getSchema()).isEqualTo(expectedSchema); | ||
final IntVector vector = (IntVector) root.getVector(0); | ||
assertThat(vector.isNull(0)).isFalse(); | ||
assertThat(vector.get(0)).isEqualTo(42); | ||
assertThat(reader.loadNextBatch()).isFalse(); | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
void preparedExecuteQuery() throws Exception { | ||
final Schema expectedSchema = new Schema(List | ||
.of(new Field("Foo", new FieldType(true, Types.MinorType.INT.getType(), null, DEEPHAVEN_INT), null))); | ||
try (final AdbcStatement statement = connection.createStatement()) { | ||
statement.setSqlQuery("SELECT 42 as Foo"); | ||
statement.prepare(); | ||
try (final AdbcStatement.QueryResult result = statement.executeQuery()) { | ||
final ArrowReader reader = result.getReader(); | ||
assertThat(reader.loadNextBatch()).isTrue(); | ||
final VectorSchemaRoot root = reader.getVectorSchemaRoot(); | ||
assertThat(root.getSchema()).isEqualTo(expectedSchema); | ||
final IntVector vector = (IntVector) root.getVector(0); | ||
assertThat(vector.isNull(0)).isFalse(); | ||
assertThat(vector.get(0)).isEqualTo(42); | ||
assertThat(reader.loadNextBatch()).isFalse(); | ||
} | ||
} | ||
} | ||
} |
123 changes: 123 additions & 0 deletions
123
...sions/flight-sql/src/adbcTest/java/io/deephaven/server/flightsql/FlightSqlTestModule.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
// | ||
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending | ||
// | ||
package io.deephaven.server.flightsql; | ||
|
||
import dagger.Module; | ||
import dagger.Provides; | ||
import dagger.multibindings.IntoSet; | ||
import io.deephaven.base.clock.Clock; | ||
import io.deephaven.engine.context.ExecutionContext; | ||
import io.deephaven.engine.updategraph.OperationInitializer; | ||
import io.deephaven.engine.updategraph.UpdateGraph; | ||
import io.deephaven.engine.util.AbstractScriptSession; | ||
import io.deephaven.engine.util.NoLanguageDeephavenSession; | ||
import io.deephaven.engine.util.ScriptSession; | ||
import io.deephaven.server.arrow.ArrowModule; | ||
import io.deephaven.server.auth.AuthorizationProvider; | ||
import io.deephaven.server.config.ConfigServiceModule; | ||
import io.deephaven.server.console.ConsoleModule; | ||
import io.deephaven.server.log.LogModule; | ||
import io.deephaven.server.plugin.PluginsModule; | ||
import io.deephaven.server.session.ExportTicketResolver; | ||
import io.deephaven.server.session.ObfuscatingErrorTransformerModule; | ||
import io.deephaven.server.session.SessionModule; | ||
import io.deephaven.server.session.TicketResolver; | ||
import io.deephaven.server.table.TableModule; | ||
import io.deephaven.server.test.TestAuthModule; | ||
import io.deephaven.server.test.TestAuthorizationProvider; | ||
import io.deephaven.server.util.Scheduler; | ||
|
||
import javax.inject.Named; | ||
import javax.inject.Singleton; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
|
||
@Module(includes = { | ||
ArrowModule.class, | ||
ConfigServiceModule.class, | ||
ConsoleModule.class, | ||
LogModule.class, | ||
SessionModule.class, | ||
TableModule.class, | ||
TestAuthModule.class, | ||
ObfuscatingErrorTransformerModule.class, | ||
PluginsModule.class, | ||
FlightSqlModule.class | ||
}) | ||
public class FlightSqlTestModule { | ||
@IntoSet | ||
@Provides | ||
TicketResolver ticketResolver(ExportTicketResolver resolver) { | ||
return resolver; | ||
} | ||
|
||
@Singleton | ||
@Provides | ||
AbstractScriptSession<?> provideAbstractScriptSession( | ||
final UpdateGraph updateGraph, | ||
final OperationInitializer operationInitializer) { | ||
return new NoLanguageDeephavenSession( | ||
updateGraph, operationInitializer, "non-script-session"); | ||
} | ||
|
||
@Provides | ||
ScriptSession provideScriptSession(AbstractScriptSession<?> scriptSession) { | ||
return scriptSession; | ||
} | ||
|
||
@Provides | ||
@Singleton | ||
ScheduledExecutorService provideExecutorService() { | ||
return Executors.newScheduledThreadPool(1); | ||
} | ||
|
||
@Provides | ||
Scheduler provideScheduler(ScheduledExecutorService concurrentExecutor) { | ||
return new Scheduler.DelegatingImpl( | ||
Executors.newSingleThreadExecutor(), | ||
concurrentExecutor, | ||
Clock.system()); | ||
} | ||
|
||
@Provides | ||
@Named("session.tokenExpireMs") | ||
long provideTokenExpireMs() { | ||
return 60_000_000; | ||
} | ||
|
||
@Provides | ||
@Named("http.port") | ||
int provideHttpPort() { | ||
return 0;// 'select first available' | ||
} | ||
|
||
@Provides | ||
@Named("grpc.maxInboundMessageSize") | ||
int provideMaxInboundMessageSize() { | ||
return 1024 * 1024; | ||
} | ||
|
||
@Provides | ||
AuthorizationProvider provideAuthorizationProvider(TestAuthorizationProvider provider) { | ||
return provider; | ||
} | ||
|
||
@Provides | ||
@Singleton | ||
TestAuthorizationProvider provideTestAuthorizationProvider() { | ||
return new TestAuthorizationProvider(); | ||
} | ||
|
||
@Provides | ||
@Singleton | ||
static UpdateGraph provideUpdateGraph() { | ||
return ExecutionContext.getContext().getUpdateGraph(); | ||
} | ||
|
||
@Provides | ||
@Singleton | ||
static OperationInitializer provideOperationInitializer() { | ||
return ExecutionContext.getContext().getOperationInitializer(); | ||
} | ||
} |
Oops, something went wrong.