-
Notifications
You must be signed in to change notification settings - Fork 15
/
index.js
91 lines (86 loc) · 2.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const { ServiceBroker } = require("moleculer");
const ChannelsMiddleware = require("../..").Middleware;
async function main() {
const broker = new ServiceBroker({
nodeID: "channelTest",
// logger: false,
logLevel: "debug",
middlewares: [
ChannelsMiddleware({
schemaProperty: "streamOne",
adapterPropertyName: "streamOneAdapter",
sendMethodName: "sendToStreamOneChannel",
channelHandlerTrigger: "emitStreamOneLocalChannelHandler",
adapter: {
type: "NATS",
options: {
nats: {
// url: process.env.NATS_SERVER,
connectionOptions: {
// debug: true
// user: process.env.NATS_USER,
// pass: process.env.NATS_PASSWORD
},
streamConfig: {
name: "streamOne",
subjects: ["streamOneTopic.*"]
},
consumerOptions: {
config: {
deliver_policy: "new",
ack_policy: "explicit",
max_ack_pending: 1
}
}
},
maxInFlight: 10,
maxRetries: 3,
deadLettering: {
enabled: false,
queueName: "DEAD_LETTER_REG"
}
}
}
})
]
});
broker.createService({
name: "sub",
streamOne: {
"streamOneTopic.>": {
group: "other",
nats: {
consumerOptions: {
config: {
deliver_policy: "new"
}
},
streamConfig: {
// Create a single stream for all topics that match `streamOneTopic.>`
// Note: Will override the streamConfig defined in middleware config
name: "streamOne",
subjects: ["streamOneTopic.>"]
}
},
// This handler will be called for all topics that match `streamOneTopic.>`
async handler(payload) {
console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
}
}
}
});
await broker.start().delay(2000);
const msg = {
id: 1,
name: "John",
age: 25
};
await broker.sendToStreamOneChannel("streamOneTopic.abc", { ...msg, topic: "abc" });
await broker.Promise.delay(200);
await broker.sendToStreamOneChannel("streamOneTopic.abc.def", { ...msg, topic: "abc.def" });
await broker.Promise.delay(200);
await broker.sendToStreamOneChannel("streamOneTopic.xyz", { ...msg, topic: "xyz" });
await broker.Promise.delay(200);
broker.repl();
}
main().catch(err => console.error(err));