-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add EventGroupSelectorManager instead of ActorSelectorManager
- Loading branch information
1 parent
1eeb7c1
commit 5f423b2
Showing
12 changed files
with
884 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
ktor-network/jvm/src/io/ktor/network/selector/eventgroup/Attachment.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.network.selector.eventgroup | ||
|
||
import kotlinx.coroutines.suspendCancellableCoroutine | ||
import java.nio.channels.SelectionKey | ||
|
||
internal inline val SelectionKey.attachment get() = attachment() as Attachment | ||
|
||
/** | ||
* Attachment for SelectionKey | ||
* It contains task for each interest and allows to run them and resume the continuation | ||
*/ | ||
internal class Attachment { | ||
private var acceptTask: Task<Any?>? = null | ||
private var readTask: Task<Any?>? = null | ||
private var writeTask: Task<Any?>? = null | ||
private var connectTask: Task<Any?>? = null | ||
|
||
suspend fun <T> runTask(interest: Int, task: suspend () -> T): T { | ||
return suspendCancellableCoroutine { | ||
@Suppress("UNCHECKED_CAST") | ||
setContinuationByInterest(interest, Task(it.toResumableCancellable(), task) as Task<Any?>) | ||
} | ||
} | ||
|
||
suspend fun runTaskAndResumeContinuation(key: SelectionKey) { | ||
when { | ||
key.isAcceptable -> acceptTask.runAndResume(SelectionKey.OP_ACCEPT) | ||
key.isReadable -> readTask.runAndResume(SelectionKey.OP_READ) | ||
key.isWritable -> writeTask.runAndResume(SelectionKey.OP_WRITE) | ||
key.isConnectable -> connectTask.runAndResume(SelectionKey.OP_CONNECT) | ||
} | ||
} | ||
|
||
private suspend fun Task<Any?>?.runAndResume(interest: Int) { | ||
val task = this ?: return | ||
setContinuationByInterest(interest, null) | ||
task.runAndResume() | ||
} | ||
|
||
private fun setContinuationByInterest(interest: Int, task: Task<Any?>?) { | ||
when (interest) { | ||
SelectionKey.OP_ACCEPT -> acceptTask = task | ||
SelectionKey.OP_READ -> readTask = task | ||
SelectionKey.OP_WRITE -> writeTask = task | ||
SelectionKey.OP_CONNECT -> connectTask = task | ||
} | ||
} | ||
|
||
fun cancel(cause: Throwable? = null) { | ||
acceptTask.cancel(cause) | ||
readTask.cancel(cause) | ||
writeTask.cancel(cause) | ||
connectTask.cancel(cause) | ||
} | ||
|
||
private fun Task<*>?.cancel(cause: Throwable? = null) { | ||
this?.continuation?.cancel(cause) | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
ktor-network/jvm/src/io/ktor/network/selector/eventgroup/Connection.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.network.selector.eventgroup | ||
|
||
import java.nio.channels.SocketChannel | ||
|
||
/** | ||
* Allows to perform read and write operations on the socket channel, | ||
* which will be submitted as tasks to the event loop and will be suspended until | ||
* they will be executed in the context of the event loop | ||
*/ | ||
internal interface Connection { | ||
val channel: SocketChannel | ||
|
||
suspend fun <T> performRead(body: suspend (SocketChannel) -> T): T | ||
|
||
suspend fun <T> performWrite(body: suspend (SocketChannel) -> T): T | ||
|
||
fun close() | ||
} |
28 changes: 28 additions & 0 deletions
28
ktor-network/jvm/src/io/ktor/network/selector/eventgroup/EventGroupContext.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.network.selector.eventgroup | ||
|
||
import io.ktor.utils.io.* | ||
import kotlin.coroutines.* | ||
|
||
private val MAX_THREADS by lazy { | ||
Runtime.getRuntime().availableProcessors() | ||
.minus(2) | ||
.coerceAtLeast(1) | ||
} | ||
|
||
@InternalAPI | ||
public class EventGroupContext( | ||
public val parallelism: Int, | ||
) : CoroutineContext.Element { | ||
override val key: CoroutineContext.Key<*> = Key | ||
|
||
public companion object Key : CoroutineContext.Key<EventGroupContext> | ||
} | ||
|
||
@InternalAPI | ||
internal fun CoroutineContext.eventGroupParallelism(): Int { | ||
return get(EventGroupContext.Key)?.parallelism ?: MAX_THREADS | ||
} |
109 changes: 109 additions & 0 deletions
109
ktor-network/jvm/src/io/ktor/network/selector/eventgroup/EventGroupSelectorManager.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.network.selector.eventgroup | ||
|
||
import io.ktor.network.selector.* | ||
import io.ktor.utils.io.* | ||
import kotlinx.atomicfu.* | ||
import kotlinx.coroutines.* | ||
import java.nio.channels.* | ||
import java.nio.channels.spi.* | ||
import kotlin.coroutines.* | ||
|
||
@OptIn(InternalAPI::class) | ||
public class EventGroupSelectorManager(context: CoroutineContext) : SelectorManager { | ||
public val group: EventGroup = EventGroup(context.eventGroupParallelism()) | ||
|
||
override val coroutineContext: CoroutineContext = context + CoroutineName("eventgroup") | ||
|
||
override val provider: SelectorProvider = SelectorProvider.provider() | ||
|
||
override fun notifyClosed(selectable: Selectable) { | ||
// whatever | ||
} | ||
|
||
override suspend fun select(selectable: Selectable, interest: SelectInterest) { | ||
error("no select in eventgroup") | ||
} | ||
|
||
override fun close() { | ||
group.close() | ||
} | ||
} | ||
|
||
public class EventGroup(private val maxLoops: Int) { | ||
private val acceptLoop = Eventloop() | ||
private val loopIndex = atomic(0) | ||
private val loops = mutableListOf<Eventloop>() | ||
|
||
init { | ||
acceptLoop.run() | ||
|
||
repeat(maxLoops - 1) { | ||
val next = Eventloop().apply { run() } | ||
loops.add(next) | ||
} | ||
} | ||
|
||
private fun registerAcceptKey(channel: Selectable) = acceptLoop.runOnLoop { | ||
acceptLoop.addInterest(channel, SelectionKey.OP_ACCEPT) | ||
} | ||
|
||
internal fun registerChannel(channel: ServerSocketChannel): RegisteredServerChannel { | ||
val selectableChannel = SelectableBase(channel) | ||
val key = registerAcceptKey(selectableChannel) | ||
|
||
return RegisteredServerChannelImpl(channel, key) | ||
} | ||
|
||
private inner class RegisteredServerChannelImpl( | ||
override val channel: ServerSocketChannel, | ||
private val key: CompletableDeferred<SelectionKey>, | ||
) : RegisteredServerChannel { | ||
override suspend fun acceptConnection(configure: (SocketChannel) -> Unit): ConnectionImpl { | ||
val result = key.await().attachment.runTask(SelectionKey.OP_ACCEPT) { | ||
channel.accept().apply { | ||
configureBlocking(false) | ||
configure(this) | ||
} | ||
} | ||
|
||
val nextLoopIndex = loopIndex.getAndIncrement() % (maxLoops - 1) | ||
|
||
return ConnectionImpl(result, loops[nextLoopIndex]) | ||
} | ||
} | ||
|
||
private class ConnectionImpl( | ||
override val channel: SocketChannel, | ||
val loop: Eventloop, | ||
) : Connection { | ||
private val selectable = SelectableBase(channel) | ||
|
||
override suspend fun <T> performRead(body: suspend (SocketChannel) -> T): T { | ||
return runTask(SelectionKey.OP_READ) { body(channel) } | ||
} | ||
|
||
override suspend fun <T> performWrite(body: suspend (SocketChannel) -> T): T { | ||
return runTask(SelectionKey.OP_WRITE) { body(channel) } | ||
} | ||
|
||
override fun close() { | ||
channel.close() | ||
} | ||
|
||
private suspend fun <T> runTask(interest: Int, body: suspend () -> T): T { | ||
val key = loop.addInterest(selectable, interest) | ||
return key.attachment.runTask(interest, body).also { | ||
loop.deleteInterest(selectable, interest) | ||
} | ||
} | ||
} | ||
|
||
public fun close() { | ||
acceptLoop.close(null) | ||
loops.forEach { it.close(null) } | ||
} | ||
} |
104 changes: 104 additions & 0 deletions
104
ktor-network/jvm/src/io/ktor/network/selector/eventgroup/Eventloop.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.network.selector.eventgroup | ||
|
||
import io.ktor.network.selector.* | ||
import kotlinx.coroutines.* | ||
import java.nio.channels.* | ||
|
||
internal class Eventloop { | ||
val scope = newThreadContext(nThreads = 1).wrapInScope() | ||
|
||
fun run(): Job { | ||
return scope.launch { runLoop() } | ||
} | ||
|
||
private val taskQueue = ArrayDeque<Task<*>>() | ||
|
||
private val selector = Selector.open() | ||
|
||
fun close(cause: Throwable?) { | ||
taskQueue.forEach { it.continuation.cancel(cause) } | ||
selector.close() | ||
} | ||
|
||
private suspend fun runLoop() { | ||
while (true) { | ||
runAllPendingTasks() | ||
|
||
val n = selector.select(SELECTOR_TIMEOUT_MILLIS) | ||
yield() | ||
|
||
if (n == 0) { | ||
continue | ||
} | ||
|
||
val selectionKeys = selector.selectedKeys().iterator() | ||
while (selectionKeys.hasNext()) { | ||
val key = selectionKeys.next() | ||
selectionKeys.remove() | ||
|
||
try { | ||
if (!key.isValid) continue | ||
key.attachment.runTaskAndResumeContinuation(key) | ||
} catch (e: Throwable) { | ||
key.channel().close() | ||
key.attachment.cancel(e) | ||
} | ||
} | ||
} | ||
} | ||
|
||
private suspend fun runAllPendingTasks() { | ||
repeat(taskQueue.size) { | ||
taskQueue.removeFirst().runAndResume() | ||
} | ||
} | ||
|
||
internal fun <T> runOnLoop(body: suspend () -> T): CompletableDeferred<T> { | ||
val result = CompletableDeferred<T>() | ||
taskQueue.addLast(Task(result.toResumableCancellable(), body)) | ||
return result | ||
} | ||
|
||
internal fun addInterest(selectable: Selectable, interest: Int): SelectionKey { | ||
val channel = selectable.channel | ||
val key = channel.keyFor(selector) | ||
selectable.interestOp(SelectInterest.byValue(interest), true) | ||
val ops = selectable.interestedOps | ||
|
||
if (key == null) { | ||
if (ops != 0) { | ||
channel.register(selector, ops, Attachment()) | ||
} | ||
} else { | ||
if (key.interestOps() != ops) { | ||
key.interestOps(ops) | ||
} | ||
} | ||
return key | ||
} | ||
|
||
internal fun deleteInterest(selectable: Selectable, interest: Int) { | ||
val channel = selectable.channel | ||
val key = channel.keyFor(selector) | ||
selectable.interestOp(SelectInterest.byValue(interest), false) | ||
val ops = selectable.interestedOps | ||
|
||
if (key == null) { | ||
if (ops != 0) { | ||
channel.register(selector, ops, Attachment()) | ||
} | ||
} else { | ||
if (key.interestOps() != ops) { | ||
key.interestOps(ops) | ||
} | ||
} | ||
} | ||
|
||
companion object { | ||
private const val SELECTOR_TIMEOUT_MILLIS = 20L | ||
} | ||
} |
21 changes: 21 additions & 0 deletions
21
ktor-network/jvm/src/io/ktor/network/selector/eventgroup/RegisteredServerChannel.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.network.selector.eventgroup | ||
|
||
import java.net.Socket | ||
import java.nio.channels.ServerSocketChannel | ||
import java.nio.channels.SocketChannel | ||
|
||
/** | ||
* Represents a server channel registered to an event loop with OP_ACCEPT interest | ||
*/ | ||
internal interface RegisteredServerChannel { | ||
val channel: ServerSocketChannel | ||
|
||
/** | ||
* Allows to accept connections on the server socket channel | ||
*/ | ||
suspend fun acceptConnection(configure: (SocketChannel) -> Unit = {}): Connection | ||
} |
Oops, something went wrong.