Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventGroupSelectorManager instead of ActorSelectorManager #4066

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,9 @@ public actual enum class SelectInterest(public val flag: Int) {
public val flags: IntArray = values().map { it.flag }.toIntArray()

public val size: Int = values().size

private val byFlag = values().associateBy { it.flag }

public fun byValue(value: Int): SelectInterest = byFlag[value] ?: error("Unknown SelectInterest value: $value")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import kotlinx.coroutines.suspendCancellableCoroutine
import java.nio.channels.SelectionKey

internal inline val SelectionKey.attachment get() = attachment() as Attachment

/**
* Attachment for SelectionKey
* It contains task for each interest and allows to run them and resume the continuation
*/
internal class Attachment {
private var acceptTask: Task<Any?>? = null
private var readTask: Task<Any?>? = null
private var writeTask: Task<Any?>? = null
private var connectTask: Task<Any?>? = null

suspend fun <T> runTask(interest: Int, task: suspend () -> T): T {
return suspendCancellableCoroutine {
@Suppress("UNCHECKED_CAST")
setContinuationByInterest(interest, Task(it.toResumableCancellable(), task) as Task<Any?>)
}
}

suspend fun runTaskAndResumeContinuation(key: SelectionKey) {
when {
key.isAcceptable -> acceptTask.runAndResume(SelectionKey.OP_ACCEPT)
key.isReadable -> readTask.runAndResume(SelectionKey.OP_READ)
key.isWritable -> writeTask.runAndResume(SelectionKey.OP_WRITE)
key.isConnectable -> connectTask.runAndResume(SelectionKey.OP_CONNECT)
}
}

private suspend fun Task<Any?>?.runAndResume(interest: Int) {
val task = this ?: return
setContinuationByInterest(interest, null)
task.runAndResume()
}

private fun setContinuationByInterest(interest: Int, task: Task<Any?>?) {
when (interest) {
SelectionKey.OP_ACCEPT -> acceptTask = task
SelectionKey.OP_READ -> readTask = task
SelectionKey.OP_WRITE -> writeTask = task
SelectionKey.OP_CONNECT -> connectTask = task
}
}

fun cancel(cause: Throwable? = null) {
acceptTask.cancel(cause)
readTask.cancel(cause)
writeTask.cancel(cause)
connectTask.cancel(cause)
}

private fun Task<*>?.cancel(cause: Throwable? = null) {
this?.continuation?.cancel(cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import java.nio.channels.SocketChannel

/**
* Allows to perform read and write operations on the socket channel,
* which will be submitted as tasks to the event loop and will be suspended until
* they will be executed in the context of the event loop
*/
internal interface Connection {
val channel: SocketChannel

suspend fun <T> performRead(body: suspend (SocketChannel) -> T): T

suspend fun <T> performWrite(body: suspend (SocketChannel) -> T): T

fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import io.ktor.utils.io.*
import kotlin.coroutines.*

private val MAX_THREADS by lazy {
Runtime.getRuntime().availableProcessors()
.minus(2)
.coerceAtLeast(1)
}

@InternalAPI
public class EventGroupContext(
public val parallelism: Int,
) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> = Key

public companion object Key : CoroutineContext.Key<EventGroupContext>
}

@InternalAPI
internal fun CoroutineContext.eventGroupParallelism(): Int {
return get(EventGroupContext.Key)?.parallelism ?: MAX_THREADS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import io.ktor.network.selector.*
import io.ktor.utils.io.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import java.nio.channels.*
import java.nio.channels.spi.*
import kotlin.coroutines.*

@OptIn(InternalAPI::class)
public class EventGroupSelectorManager(context: CoroutineContext) : SelectorManager {
public val group: EventGroup = EventGroup(context.eventGroupParallelism())

override val coroutineContext: CoroutineContext = context + CoroutineName("eventgroup")

override val provider: SelectorProvider = SelectorProvider.provider()

override fun notifyClosed(selectable: Selectable) {
// whatever
}

override suspend fun select(selectable: Selectable, interest: SelectInterest) {
error("no select in eventgroup")
}

override fun close() {
group.close()
}
}

public class EventGroup(private val maxLoops: Int) {
private val acceptLoop = Eventloop()
private val loopIndex = atomic(0)
private val loops = mutableListOf<Eventloop>()

init {
acceptLoop.run()

repeat(maxLoops - 1) {
val next = Eventloop().apply { run() }
loops.add(next)
}
}

private fun registerAcceptKey(channel: Selectable) = acceptLoop.runOnLoop {
acceptLoop.addInterest(channel, SelectionKey.OP_ACCEPT)
}

internal fun registerChannel(channel: ServerSocketChannel): RegisteredServerChannel {
val selectableChannel = SelectableBase(channel)
val key = registerAcceptKey(selectableChannel)

return RegisteredServerChannelImpl(channel, key)
}

private inner class RegisteredServerChannelImpl(
override val channel: ServerSocketChannel,
private val key: CompletableDeferred<SelectionKey>,
) : RegisteredServerChannel {
override suspend fun acceptConnection(configure: (SocketChannel) -> Unit): ConnectionImpl {
val result = key.await().attachment.runTask(SelectionKey.OP_ACCEPT) {
channel.accept().apply {
configureBlocking(false)
configure(this)
}
}

val nextLoopIndex = loopIndex.getAndIncrement() % (maxLoops - 1)

return ConnectionImpl(result, loops[nextLoopIndex])
}
}

private class ConnectionImpl(
override val channel: SocketChannel,
val loop: Eventloop,
) : Connection {
private val selectable = SelectableBase(channel)

override suspend fun <T> performRead(body: suspend (SocketChannel) -> T): T {
return runTask(SelectionKey.OP_READ) { body(channel) }
}

override suspend fun <T> performWrite(body: suspend (SocketChannel) -> T): T {
return runTask(SelectionKey.OP_WRITE) { body(channel) }
}

override fun close() {
channel.close()
}

private suspend fun <T> runTask(interest: Int, body: suspend () -> T): T {
val key = loop.addInterest(selectable, interest)
return key.attachment.runTask(interest, body).also {
loop.deleteInterest(selectable, interest)
}
}
}

public fun close() {
acceptLoop.close(null)
loops.forEach { it.close(null) }
}
}
104 changes: 104 additions & 0 deletions ktor-network/jvm/src/io/ktor/network/selector/eventgroup/Eventloop.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import io.ktor.network.selector.*
import kotlinx.coroutines.*
import java.nio.channels.*

internal class Eventloop {
val scope = newThreadContext(nThreads = 1).wrapInScope()

fun run(): Job {
return scope.launch { runLoop() }
Copy link

@andrsuh andrsuh Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if coroutine completes unexpectedly? Can this lead to the situation where we loose event loops one by one silently? Do we need some completion handler that analyse if the completion was controlled or unexpected, notify about this and resurrect coroutine to maintain the constant number of event loops?

}

private val taskQueue = ArrayDeque<Task<*>>()

private val selector = Selector.open()

fun close(cause: Throwable?) {
taskQueue.forEach { it.continuation.cancel(cause) }
selector.close()
}

private suspend fun runLoop() {
while (true) {
runAllPendingTasks()

val n = selector.select(SELECTOR_TIMEOUT_MILLIS)
yield()

if (n == 0) {
continue
}

val selectionKeys = selector.selectedKeys().iterator()
while (selectionKeys.hasNext()) {
val key = selectionKeys.next()
selectionKeys.remove()

try {
if (!key.isValid) continue
key.attachment.runTaskAndResumeContinuation(key)
} catch (e: Throwable) {
key.channel().close()
key.attachment.cancel(e)
}
}
}
}

private suspend fun runAllPendingTasks() {
repeat(taskQueue.size) {
taskQueue.removeFirst().runAndResume()
}
}

internal fun <T> runOnLoop(body: suspend () -> T): CompletableDeferred<T> {
val result = CompletableDeferred<T>()
taskQueue.addLast(Task(result.toResumableCancellable(), body))
return result
}

internal fun addInterest(selectable: Selectable, interest: Int): SelectionKey {
val channel = selectable.channel
val key = channel.keyFor(selector)
selectable.interestOp(SelectInterest.byValue(interest), true)
val ops = selectable.interestedOps

if (key == null) {
if (ops != 0) {
channel.register(selector, ops, Attachment())
}
} else {
if (key.interestOps() != ops) {
key.interestOps(ops)
}
}
return key
}

internal fun deleteInterest(selectable: Selectable, interest: Int) {
val channel = selectable.channel
val key = channel.keyFor(selector)
selectable.interestOp(SelectInterest.byValue(interest), false)
val ops = selectable.interestedOps

if (key == null) {
if (ops != 0) {
channel.register(selector, ops, Attachment())
}
} else {
if (key.interestOps() != ops) {
key.interestOps(ops)
}
}
}

companion object {
private const val SELECTOR_TIMEOUT_MILLIS = 20L
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.selector.eventgroup

import java.net.Socket
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel

/**
* Represents a server channel registered to an event loop with OP_ACCEPT interest
*/
internal interface RegisteredServerChannel {
val channel: ServerSocketChannel

/**
* Allows to accept connections on the server socket channel
*/
suspend fun acceptConnection(configure: (SocketChannel) -> Unit = {}): Connection
}
Loading
Loading