-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
ChannelFeature.java
164 lines (140 loc) · 6.19 KB
/
ChannelFeature.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package communication;
import org.junit.jupiter.api.Test;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class ChannelFeature {
public static class Channel<T> {
private final BlockingQueue<T> queue;
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// Private constructor to initialize the BlockingQueue with the specified capacity.
private Channel(int capacity) {
this.queue = new ArrayBlockingQueue<>(capacity);
}
/**
* Static factory method to create a Channel with a default capacity of 1.
*
* @param <T> The type of messages this channel will handle.
* @return A new Channel instance with a capacity of 1.
*/
public static <T> Channel<T> make() {
return new Channel<>(1);
}
/**
* Static factory method to create a Channel with a specified capacity.
*
* @param <T> The type of messages this channel will handle.
* @param capacity The maximum number of messages that can be held in the channel at any one time.
* @return A new Channel instance with the specified capacity.
*/
public static <T> Channel<T> make(int capacity) {
return new Channel<>(capacity);
}
/**
* Sends a message to the channel by placing the result of the supplied function
* into the BlockingQueue. The send operation is performed in a new virtual thread.
* If the queue is full, this method will block until space is available.
*
* @param func A Supplier function that produces the message to be sent to the channel.
* @throws RuntimeException if the thread is interrupted while waiting to send the message.
*/
public void send(Supplier<T> func) {
appendInQueue(func);
}
public void send(T t) {
appendInQueue(() -> t);
}
private void appendInQueue(Supplier<T> func) {
Thread.ofVirtual().start(() -> {
try {
queue.put(func.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupt status
throw new RuntimeException("Thread was interrupted while sending", e);
}
});
}
/**
* Receives a message from the channel by retrieving and removing the head of the BlockingQueue.
* If the queue is empty, this method will block for the specified timeout duration, waiting for a message.
*
* @param timeout The maximum time to wait for a message.
* @param unit The time unit of the timeout argument.
* @return The message received from the channel.
* @throws RuntimeException if the timeout expires before a message is received or if the thread is interrupted.
*/
public T receive(long timeout, TimeUnit unit) {
try {
T value = queue.poll(timeout, unit);
if (value == null) {
throw new RuntimeException("Timeout waiting for the message");
}
return value;
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupt status
throw new RuntimeException("Thread was interrupted while receiving", e);
}
}
public T receive() throws InterruptedException {
return queue.take();
}
/**
* Receives a message from the channel asynchronously by using a CompletableFuture that
* will complete when a message is available. This method does not block the main thread and
* returns immediately with a future that can be used to obtain the message once it arrives.
*
* @return A CompletableFuture that will be completed with the received message when it becomes available.
*/
public CompletableFuture<T> receiveAsync() {
return CompletableFuture.supplyAsync(() -> {
try {
return queue.take(); // take() will block the virtual thread but not the platform thread
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupt status
throw new RuntimeException("Thread was interrupted while receiving", e);
}
}, executor);
}
}
@Test
void runChannel() throws InterruptedException {
Channel<String> channel = Channel.make();
Thread consumer = Thread.ofVirtual().start(() -> {
String message;
try {
message = channel.receive();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(STR."Channel received message: \{message}");
});
Thread.ofVirtual().start(() -> {
try {
Thread.sleep(1000);
channel.send(() -> "Hello from producer");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
consumer.join();
}
@Test
void runChannelMultipleThreads() throws InterruptedException {
Channel<String> channel = Channel.make(2);
// Sending messages to the channel in virtual threads.
channel.send(() -> "Message from producer 1");
channel.send("Message from producer 2");
// Two consumers receiving messages from the channel.
Thread consumer1 = Thread.ofVirtual().start(() -> {
String message = channel.receive(5, TimeUnit.SECONDS);
System.out.println(STR."Channel Consumer 1 received: \{message}");
});
try {
var message = channel.receiveAsync().get();
System.out.println(STR."Channel Consumer 2 received: \{message}");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
// Wait for both consumers to finish
consumer1.join();
}
}