Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InitFlow and move session states to context #266

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 143 additions & 16 deletions r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.cache.PrepareCache;
import io.asyncer.r2dbc.mysql.codec.CodecContext;
import io.asyncer.r2dbc.mysql.collation.CharCollation;
import io.asyncer.r2dbc.mysql.constant.ServerStatuses;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.r2dbc.spi.IsolationLevel;
import org.jetbrains.annotations.Nullable;

import java.nio.file.Path;
import java.time.Duration;
import java.time.ZoneId;

import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
Expand All @@ -37,6 +40,10 @@ public final class ConnectionContext implements CodecContext {

private static final ServerVersion NONE_VERSION = ServerVersion.create(0, 0, 0);

private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4);

private static final ServerVersion MARIA_10_1_1 = ServerVersion.create(10, 1, 1, true);

private final ZeroDateOption zeroDateOption;

@Nullable
Expand All @@ -52,16 +59,47 @@ public final class ConnectionContext implements CodecContext {

private Capability capability = Capability.DEFAULT;

private PrepareCache prepareCache;

@Nullable
private ZoneId timeZone;

private String product = "Unknown";

/**
* Current isolation level inferred by past statements.
* <p>
* Inference rules:
* <ol><li>In the beginning, it is also {@link #sessionIsolationLevel}.</li>
* <li>A transaction has began with a {@link IsolationLevel}, it will be changed to the value</li>
* <li>The transaction end (commit or rollback), it will recover to {@link #sessionIsolationLevel}.</li></ol>
*/
private volatile IsolationLevel currentIsolationLevel;

/**
* Session isolation level.
*
* <ol><li>It is applied to all subsequent transactions performed within the current session.</li>
* <li>Calls {@link io.r2dbc.spi.Connection#setTransactionIsolationLevel}, it will change to the value.</li>
* <li>It can be changed within transactions, but does not affect the current ongoing transaction.</li></ol>
*/
private volatile IsolationLevel sessionIsolationLevel;

private boolean lockWaitTimeoutSupported = false;

/**
* Current lock wait timeout in seconds.
*/
private volatile Duration currentLockWaitTimeout;

/**
* Session lock wait timeout in seconds.
*/
private volatile Duration sessionLockWaitTimeout;

/**
* Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or OK
* message which means handshake V9 completed.
* <p>
* It would be updated multiple times, so {@code volatile} is required.
*/
private volatile short serverStatuses = ServerStatuses.AUTO_COMMIT;

Expand All @@ -80,18 +118,50 @@ public final class ConnectionContext implements CodecContext {
}

/**
* Initializes this context.
* Initializes handshake information after connection is established.
*
* @param connectionId the connection identifier that is specified by server.
* @param version the server version.
* @param capability the connection capabilities.
*/
void init(int connectionId, ServerVersion version, Capability capability) {
void initHandshake(int connectionId, ServerVersion version, Capability capability) {
this.connectionId = connectionId;
this.serverVersion = version;
this.capability = capability;
}

/**
* Initializes session information after logged-in.
*
* @param prepareCache the prepare cache.
* @param isolationLevel the session isolation level.
* @param lockWaitTimeoutSupported if the server supports lock wait timeout.
* @param lockWaitTimeout the lock wait timeout.
* @param product the server product name.
* @param timeZone the server timezone.
*/
void initSession(
PrepareCache prepareCache,
IsolationLevel isolationLevel,
boolean lockWaitTimeoutSupported,
Duration lockWaitTimeout,
@Nullable String product,
@Nullable ZoneId timeZone
) {
this.prepareCache = prepareCache;
this.currentIsolationLevel = this.sessionIsolationLevel = isolationLevel;
this.lockWaitTimeoutSupported = lockWaitTimeoutSupported;
this.currentLockWaitTimeout = this.sessionLockWaitTimeout = lockWaitTimeout;
this.product = product == null ? "Unknown" : product;

if (timeZone != null) {
if (isTimeZoneInitialized()) {
throw new IllegalStateException("Connection timezone have been initialized");
}
this.timeZone = timeZone;
}
}

/**
* Get the connection identifier that is specified by server.
*
Expand Down Expand Up @@ -128,6 +198,14 @@ public ZoneId getTimeZone() {
return timeZone;
}

String getProduct() {
return product;
}

PrepareCache getPrepareCache() {
return prepareCache;
}

boolean isTimeZoneInitialized() {
return timeZone != null;
}
Expand All @@ -138,13 +216,6 @@ public boolean isMariaDb() {
return (capability != null && capability.isMariaDb()) || serverVersion.isMariaDb();
}

void initTimeZone(ZoneId timeZone) {
if (isTimeZoneInitialized()) {
throw new IllegalStateException("Connection timezone have been initialized");
}
this.timeZone = timeZone;
}

@Override
public ZeroDateOption getZeroDateOption() {
return zeroDateOption;
Expand All @@ -170,19 +241,23 @@ public int getLocalInfileBufferSize() {
}

/**
* Checks if the server supports lock wait timeout.
* Checks if the server supports InnoDB lock wait timeout.
*
* @return if the server supports lock wait timeout.
* @return if the server supports InnoDB lock wait timeout.
*/
public boolean isLockWaitTimeoutSupported() {
return lockWaitTimeoutSupported;
}

/**
* Enables lock wait timeout supported when loading session variables.
* Checks if the server supports statement timeout.
*
* @return if the server supports statement timeout.
*/
void enableLockWaitTimeoutSupported() {
this.lockWaitTimeoutSupported = true;
public boolean isStatementTimeoutSupported() {
boolean isMariaDb = isMariaDb();
return (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_1_1)) ||
(!isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4));
}

/**
Expand All @@ -202,4 +277,56 @@ public short getServerStatuses() {
public void setServerStatuses(short serverStatuses) {
this.serverStatuses = serverStatuses;
}

IsolationLevel getCurrentIsolationLevel() {
return currentIsolationLevel;
}

void setCurrentIsolationLevel(IsolationLevel isolationLevel) {
this.currentIsolationLevel = isolationLevel;
}

void resetCurrentIsolationLevel() {
this.currentIsolationLevel = this.sessionIsolationLevel;
}

IsolationLevel getSessionIsolationLevel() {
return sessionIsolationLevel;
}

void setSessionIsolationLevel(IsolationLevel isolationLevel) {
this.sessionIsolationLevel = isolationLevel;
}

void setCurrentLockWaitTimeout(Duration timeoutSeconds) {
this.currentLockWaitTimeout = timeoutSeconds;
}

void resetCurrentLockWaitTimeout() {
this.currentLockWaitTimeout = this.sessionLockWaitTimeout;
}

boolean isLockWaitTimeoutChanged() {
return currentLockWaitTimeout != sessionLockWaitTimeout;
}

Duration getSessionLockWaitTimeout() {
return sessionLockWaitTimeout;
}

void setAllLockWaitTimeout(Duration timeoutSeconds) {
this.currentLockWaitTimeout = this.sessionLockWaitTimeout = timeoutSeconds;
}

boolean isInTransaction() {
return (serverStatuses & ServerStatuses.IN_TRANSACTION) != 0;
}

boolean isAutoCommit() {
// Within transaction, autocommit remains disabled until end the transaction with COMMIT or ROLLBACK.
// The autocommit mode then reverts to its previous state.
short serverStatuses = this.serverStatuses;
return (serverStatuses & ServerStatuses.IN_TRANSACTION) == 0 &&
(serverStatuses & ServerStatuses.AUTO_COMMIT) != 0;
}
}

This file was deleted.

Loading
Loading