Skip to content

Commit

Permalink
Accept query id and slug in QueuedStatement
Browse files Browse the repository at this point in the history
  • Loading branch information
prithvip committed Aug 22, 2024
1 parent 797019c commit 2697a94
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 2 deletions.
15 changes: 15 additions & 0 deletions presto-docs/src/main/sphinx/rest/statement.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ Statement Resource
}
}

.. function:: PUT /v1/statement/{queryId}?slug={slug}

:query query: SQL Query to execute
:query queryId: Query identifier to associate with this query
:query slug: Nonce to associate with this query, that will be required for subsequent requests
:reqheader X-Presto-User: User to execute statement on behalf of (optional)
:reqheader X-Presto-Source: Source of query
:reqheader X-Presto-Catalog: Catalog to execute query against
:reqheader X-Presto-Schema: Schema to execute query against

Submits a statement to Presto for execution. This function is
the analogue of the POST, and behaves exactly the same. The
difference is that a query id and slug can be explicitly provided,
instead of Presto generating it.

.. function:: GET /v1/statement/{queryId}/{token}

:query queryId: The query identifier returned from the initial POST to /v1/statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -99,6 +100,7 @@
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

@Path("/")
Expand Down Expand Up @@ -222,6 +224,61 @@ public Response postStatement(
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}

/**
* HTTP endpoint for submitting queries to the Presto Coordinator.
* Presto performs lazy execution. The submission of a query returns
* a placeholder for the result set, but the query gets
* scheduled/dispatched only when the client polls for results.
* This endpoint accepts a pre-minted queryId and slug, instead of
* generating it.
*
* @param statement The statement or sql query string submitted
* @param queryId Pre-minted query ID to associate with this query
* @param slug Pre-minted slug to protect this query
* @param xForwardedProto Forwarded protocol (http or https)
* @param servletRequest The http request
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @return {@link javax.ws.rs.core.Response} HTTP response code
*/
@PUT
@Path("/v1/statement/{queryId}")
@Produces(APPLICATION_JSON)
public Response putStatement(
String statement,
@PathParam("queryId") QueryId queryId,
@QueryParam("slug") String slug,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo)
{
if (isNullOrEmpty(statement)) {
throw badRequest(BAD_REQUEST, "SQL statement is empty");
}

if (queries.containsKey(queryId)) {
Query query = queries.get(queryId);
if (!query.getSlug().equals(slug) || query.getLastToken() != 0) {
throw badRequest(CONFLICT, "Query already exists");
}
}

abortIfPrefixUrlInvalid(xPrestoPrefixUrl);

// TODO: For future cases we may want to start tracing from client. Then continuation of tracing
// will be needed instead of creating a new trace here.
SessionContext sessionContext = new HttpRequestSessionContext(
servletRequest,
sqlParserOptions,
tracerProviderManager.getTracerProvider(),
Optional.of(sessionPropertyManager));
Query query = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0, queryId, slug);
queries.put(query.getQueryId(), query);

return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}

/**
* HTTP endpoint for re-processing a failed query
* @param queryId Query Identifier of the query to be retried
Expand Down Expand Up @@ -386,21 +443,34 @@ private static final class Query
private final DispatchManager dispatchManager;
private final ExecutingQueryResponseProvider executingQueryResponseProvider;
private final QueryId queryId;
private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
private final String slug;
private final AtomicLong lastToken = new AtomicLong();
private final int retryCount;

@GuardedBy("this")
private ListenableFuture<?> querySubmissionFuture;

public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, ExecutingQueryResponseProvider executingQueryResponseProvider, int retryCount)
{
this(query, sessionContext, dispatchManager, executingQueryResponseProvider, retryCount, dispatchManager.createQueryId(), createSlug());
}

public Query(
String query,
SessionContext sessionContext,
DispatchManager dispatchManager,
ExecutingQueryResponseProvider executingQueryResponseProvider,
int retryCount,
QueryId queryId,
String slug)
{
this.query = requireNonNull(query, "query is null");
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
this.queryId = dispatchManager.createQueryId();
this.retryCount = retryCount;
this.queryId = requireNonNull(queryId, "queryId is null");
this.slug = requireNonNull(slug, "slug is null");
}

/**
Expand Down Expand Up @@ -584,6 +654,11 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor
dispatchInfo.getWaitingForPrerequisitesTime());
}

private static String createSlug()
{
return "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
}

private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults)
{
// if failed, query is complete
Expand Down
110 changes: 110 additions & 0 deletions presto-main/src/test/java/com/facebook/presto/server/TestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.StatusResponseHandler;
import com.facebook.airlift.http.client.UnexpectedResponseException;
import com.facebook.airlift.http.client.jetty.JettyHttpClient;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.testing.Closeables;
Expand All @@ -26,6 +27,7 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.type.TimeZoneNotSupportedException;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.QueryId;
Expand All @@ -48,9 +50,11 @@

import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.fromRequest;
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
import static com.facebook.airlift.http.client.Request.Builder.prepareHead;
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
import static com.facebook.airlift.http.client.Request.Builder.preparePut;
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
Expand Down Expand Up @@ -262,6 +266,103 @@ public void testQuery()
assertEquals(rows, ImmutableList.of(ImmutableList.of("system")));
}

@Test
public void testQueryWithPreMintedQueryIdAndSlug()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
String slug = "xxx";
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, slug))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();

QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));

// verify slug in nextUri is same as requested
assertEquals(queryResults.getNextUri().getQuery(), "slug=xxx");

// verify nextUri points to requested query id
assertEquals(queryResults.getNextUri().getPath(), format("/v1/statement/queued/%s/1", queryId));

while (queryResults.getNextUri() != null) {
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
}

if (queryResults.getError() != null) {
fail(queryResults.getError().toString());
}

// verify query id was passed down properly
assertEquals(server.getDispatchManager().getQueryInfo(queryId).getQueryId(), queryId);
}

@Test
public void testPutStatementIdempotency()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, "slug"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();

client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
// Execute PUT request again should succeed
QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));

while (queryResults.getNextUri() != null) {
queryResults = client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
}
if (queryResults.getError() != null) {
fail(queryResults.getError().toString());
}
}

@Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409")
public void testPutStatementWithDifferentSlugFails()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, "slug"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));

Request badRequest = fromRequest(request)
.setUri(uriFor("/v1/statement/", queryId, "different_slug"))
.build();
client.execute(badRequest, createJsonResponseHandler(QUERY_RESULTS_CODEC));
}

@Test(expectedExceptions = UnexpectedResponseException.class, expectedExceptionsMessageRegExp = "Expected response code to be \\[.*\\], but was 409")
public void testPutStatementAfterGetFails()
{
QueryId queryId = new QueryIdGenerator().createNextQueryId();
Request request = preparePut()
.setUri(uriFor("/v1/statement/", queryId, "slug"))
.setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8))
.setHeader(PRESTO_USER, "user")
.setHeader(PRESTO_SOURCE, "source")
.setHeader(PRESTO_CATALOG, "catalog")
.setHeader(PRESTO_SCHEMA, "schema")
.build();

QueryResults queryResults = client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
client.execute(prepareGet().setUri(queryResults.getNextUri()).build(), createJsonResponseHandler(QUERY_RESULTS_CODEC));
client.execute(request, createJsonResponseHandler(QUERY_RESULTS_CODEC));
}

@Test
public void testTransactionSupport()
{
Expand Down Expand Up @@ -327,4 +428,13 @@ public URI uriFor(String path)
{
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build();
}

public URI uriFor(String path, QueryId queryId, String slug)
{
return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl())
.replacePath(path)
.appendPath(queryId.getId())
.addParameter("slug", slug)
.build();
}
}
52 changes: 52 additions & 0 deletions presto-openapi/src/main/resources/queued_statement.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,58 @@ paths:
$ref: './schemas.yaml/#/components/schemas/QueryResults'
'400':
description: Bad request
put:
summary: Submit a new query
description: Submits a new query to the Presto coordinator, with a pre-minted query id and slug
requestBody:
required: true
content:
text/plain:
schema:
type: string
description: The statement or SQL query string to be submitted
parameters:
- name: queryId
in: path
required: true
schema:
type: string
description: The query id to associate with this query
- name: slug
in: query
required: true
schema:
type: string
description: Nonce to associate with this query, which is required for future requests
- name: binaryResults
in: query
required: false
schema:
type: boolean
description: Whether to return results in binary format
- name: X-Forwarded-Proto
in: header
required: false
schema:
type: string
description: Forwarded protocol (http or https)
- name: Presto-Prefix-URL
in: header
required: false
schema:
type: string
description: Prefix URL for Presto
responses:
'200':
description: Query submitted successfully
content:
application/json:
schema:
$ref: './schemas.yaml/#/components/schemas/QueryResults'
'400':
description: Bad request
'409':
description: Conflict, this query already exists
/v1/statement/queued/retry/{queryId}:
get:
summary: Retry a failed query
Expand Down

0 comments on commit 2697a94

Please sign in to comment.