Skip to content

Commit

Permalink
Renamed msg"" to m""
Browse files Browse the repository at this point in the history
  • Loading branch information
propensive committed Jul 1, 2024
1 parent 9872ba6 commit 71675d6
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions src/core/socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import java.io as ji
import java.nio.channels as jnc
import java.nio.file as jnf

case class BindError() extends Error(msg"the port was not available for binding")
case class BindError() extends Error(m"the port was not available for binding")

object DomainSocket:
def apply[PathType: GenericPath](path: PathType): DomainSocket = DomainSocket(path.pathText)
Expand All @@ -50,10 +50,10 @@ object Control:
case class Conclude[+StateType](message: Bytes, state: Optional[StateType]) extends Control[StateType]
case object Terminate extends Control[Nothing]
case class Continue[+StateType](state: Optional[StateType] = Unset) extends Control[StateType], Interactive

case class Reply[+StateType](message: Bytes, state: Optional[StateType])
extends Control[StateType], Interactive


object Conclude:
def apply[MessageType, StateType](message: MessageType, state: Optional[StateType] = Unset)
Expand Down Expand Up @@ -84,42 +84,42 @@ object Connectable:
channel.finishConnect()
val out = jnc.Channels.newOutputStream(channel).nn
val in = jnc.Channels.newInputStream(channel).nn

Connection(channel, in, out)

def transmit(connection: Connection, input: Bytes): Unit =
connection.out.write(input.mutable(using Unsafe))
connection.out.flush()

def receive(connection: Connection): LazyList[Bytes] =
connection.in.stream[Bytes]

def close(connection: Connection): Unit = connection.channel.close()

given tcpEndpoint(using Online, Errant[StreamError]): Connectable[Endpoint[TcpPort]] with
type Output = Bytes
type Connection = jn.Socket

def connect(endpoint: Endpoint[TcpPort]): jn.Socket =
jn.Socket(jn.InetAddress.getByName(endpoint.remote.s), endpoint.port.number)

def transmit(socket: jn.Socket, input: Bytes): Unit =
val out = socket.getOutputStream.nn
out.write(input.mutable(using Unsafe))
out.flush()

def close(socket: jn.Socket): Unit = socket.close()

def receive(socket: jn.Socket): LazyList[Bytes] = socket.getInputStream.nn.stream[Bytes]

given tcpPort(using Errant[StreamError]): Connectable[TcpPort] with
type Output = Bytes
type Connection = jn.Socket

def connect(port: TcpPort): jn.Socket = jn.Socket(jn.InetAddress.getLocalHost.nn, port.number)
def close(socket: jn.Socket): Unit = socket.close()
def receive(socket: jn.Socket): LazyList[Bytes] = socket.getInputStream.nn.stream[Bytes]

def transmit(socket: jn.Socket, input: Bytes): Unit =
val out = socket.getOutputStream.nn
out.write(input.mutable(using Unsafe))
Expand All @@ -132,23 +132,23 @@ object Addressable:
def connect(endpoint: Endpoint[UdpPort]): Connection =
val address = jn.InetAddress.getByName(endpoint.remote.s).nn
Connection(address, endpoint.port.number, jn.DatagramSocket())

def transmit(connection: Connection, input: Bytes): Unit =
val packet = jn.DatagramPacket(input.mutable(using Unsafe), input.length, connection.address,
connection.port)

connection.socket.send(packet)

given udpPort: Addressable[UdpPort] with
case class Connection(port: Int, socket: jn.DatagramSocket)

def connect(port: UdpPort): Connection =
Connection(port.number, jn.DatagramSocket())

def transmit(connection: Connection, input: Bytes): Unit =
val packet = jn.DatagramPacket(input.mutable(using Unsafe), input.length, jn.InetAddress.getLocalHost.nn,
connection.port)

connection.socket.send(packet)

trait Addressable[EndpointType]:
Expand All @@ -165,7 +165,7 @@ trait Bindable[SocketType]:
type Binding
type Input
type Output

def bind(socket: SocketType): Binding
def connect(binding: Binding): Input
def transmit(binding: Binding, input: Input, output: Output): Unit
Expand All @@ -192,21 +192,21 @@ object Bindable:
jnc.ServerSocketChannel.open(jn.StandardProtocolFamily.UNIX).nn.tap: channel =>
channel.configureBlocking(true)
channel.bind(address)

def connect(channel: jnc.ServerSocketChannel): Connection =
val clientChannel: jnc.SocketChannel = channel.accept().nn
val in = jnc.Channels.newInputStream(clientChannel).nn
val out = jnc.Channels.newOutputStream(clientChannel).nn

Connection(in, out)

def transmit(channel: jnc.ServerSocketChannel, connection: Connection, bytes: Bytes): Unit =
connection.out.write(bytes.mutable(using Unsafe))
connection.out.flush()

def stop(channel: jnc.ServerSocketChannel): Unit =
channel.close()

def close(connection: Connection): Unit =
connection.in.close()
connection.out.close()
Expand All @@ -215,60 +215,60 @@ object Bindable:
type Binding = jn.ServerSocket
type Output = Bytes
type Input = jn.Socket

def bind(port: TcpPort): Binding = jn.ServerSocket(port.number)

def connect(binding: Binding): jn.Socket = binding.accept().nn

def transmit(socket: jn.ServerSocket, input: Input, bytes: Bytes): Unit =
input.getOutputStream.nn.write(bytes.mutable(using Unsafe))
input.getOutputStream.nn.flush()

def close(socket: jn.Socket): Unit = socket.close()
def stop(socket: jn.ServerSocket): Unit = socket.close()

given udpPort: Bindable[UdpPort] with
type Binding = jn.DatagramSocket
type Output = UdpResponse
type Input = UdpPacket

def bind(port: UdpPort): Binding = jn.DatagramSocket(port.number)

def connect(binding: Binding): UdpPacket =
val array = new Array[Byte](1472)
val packet = jn.DatagramPacket(array, 1472)
val socket = binding.receive(packet)
val address = packet.getSocketAddress.nn.asInstanceOf[jn.InetSocketAddress]

val ip = (address.getAddress.nn: @unchecked) match
case ip: jn.Inet4Address =>
val bytes: Array[Byte] = ip.getAddress.nn
Ipv4(bytes(0), bytes(1), bytes(2), bytes(3))

case ip: jn.Inet6Address =>
val bytes: Array[Byte] = ip.getAddress.nn
Ipv6(Long(bytes.take(8).immutable(using Unsafe)), Long(bytes.drop(8).immutable(using Unsafe)))

UdpPacket(array.slice(0, packet.getLength).immutable(using Unsafe), ip, UdpPort.unsafe(address.getPort))

def transmit(socket: jn.DatagramSocket, input: UdpPacket, response: UdpResponse): Unit = response match
case UdpResponse.Ignore => ()

case UdpResponse.Reply(data) =>
val sender = input.sender

val ip: jn.InetAddress = (input.sender: @unchecked) match
case ip: Ipv4 =>
val array = Array[Byte](ip.byte0.toByte, ip.byte1.toByte, ip.byte2.toByte, ip.byte3.toByte)
jn.InetAddress.getByAddress(array).nn

case ip: Ipv6 =>
val array = IArray.from(ip.highBits.bits.bytes ++ ip.lowBits.bits.bytes).mutable(using Unsafe)
jn.InetAddress.getByAddress(array).nn

val packet = jn.DatagramPacket(data.mutable(using Unsafe), data.length, ip, input.port.number)
socket.send(packet)

def stop(binding: Binding): Unit = binding.close()
def close(input: UdpPacket): Unit = ()

Expand Down Expand Up @@ -297,7 +297,7 @@ extension [SocketType](socket: SocketType)
: SocketService raises BindError =

val binding = bindable.bind(socket)

val bindLoop = loop:
val connection = bindable.connect(binding)
async(bindable.transmit(binding, connection, lambda(connection)))
Expand All @@ -317,22 +317,22 @@ extension [EndpointType](endpoint: EndpointType)
: StateType =

val connection = connectable.connect(endpoint)

def recur(input: LazyList[Bytes], state: StateType): StateType = input match
case head #:: tail => handle(using state)(receivable.deserialize(head)) match
case Continue(state2) => recur(tail, state2.or(state))
case Terminate => state

case Reply(message, state2) =>
connectable.transmit(connection, message)
recur(tail, state2.or(state))

case Conclude(message, state2) =>
connectable.transmit(connection, message)
state2.or(state)

case _ => state

recur(connectable.receive(connection), initialState).also:
connectable.close(connection)

Expand All @@ -341,4 +341,4 @@ extension [EndpointType](endpoint: EndpointType)
(using Monitor)
: Unit raises StreamError =

addressable.transmit(addressable.connect(endpoint), transmissible.serialize(message))
addressable.transmit(addressable.connect(endpoint), transmissible.serialize(message))

0 comments on commit 71675d6

Please sign in to comment.