Skip to content

Commit

Permalink
added new type reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
tg44 committed Jun 29, 2024
1 parent 340d1be commit b5da11a
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 18 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ When it gets an element in each topic, it calls the `template`, emits the output

The `template` will get a `{messages: []}` object, the indexes will match to the topic indexes.

### Reduce

Gets the messages from the given topic(s), and the old state from the last emitted message, and combines them.
The `template` will get a `{message: {}, state: {}}` object.
`toTopic` should be used, `toTopicTemplate` will not work!

### Constants

You can add commonly used constants as a "transformation".
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
},
"main": "src/app.ts",
"scripts": {
"test": "`npm bin`/ts-interface-builder src/types.ts && nyc ./node_modules/.bin/_mocha 'test/**/*.test.ts'",
"build": "`npm bin`/ts-interface-builder src/types.ts && npx tsc"
"test": "npx ts-interface-builder src/types.ts && nyc ./node_modules/.bin/_mocha 'test/**/*.test.ts'",
"build": "npx ts-interface-builder src/types.ts && npx tsc"
},
"keywords": [],
"author": "tg44",
Expand Down
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const metricsData: Map<string, MetricsData> = new Map()
const publishers: {io: AllSupportedIOs, publisher?: PublishFunc}[] = []

const globalPublisher = (topic: string, message: string) => {
mqttData.set(topic, message)
publishers.forEach(p => {
if(p.publisher && topic.startsWith(p.io.topicPrefix || "")) {
const t = p.io.topicPrefix ? topic.replace(p.io.topicPrefix, "") : topic
Expand Down
32 changes: 20 additions & 12 deletions src/handleMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ import {hasKey} from "./utils";

export type PublishFunc = (topic: string, message: string) => void

export const parseMessage = (message: string, topic: string, configs: AllSupportedOps[], isVerbose: boolean): undefined | Record<string,any> => {
let msg: any;
try {
msg = JSON.parse(message)
msg = fixupMessageForJsoneKeys(msg)
if(isVerbose) {
console.info("")
console.info("Message from topic " + topic)
console.info(" parsed: " + JSON.stringify(configs, null, 2))
}
} catch (error) {
console.error('Json parse on topic ' + topic + ' message was; ' + message)
console.error(error);
return undefined;
}
return msg;
}

export const handleMessage = (
topic: string,
message: string,
Expand All @@ -18,18 +36,8 @@ export const handleMessage = (
isVerbose: boolean
) => {
const configs = transforms.filter(element => element.fromTopics.some(subscription => mqttMatch(subscription, topic)));
let msg: any;
try {
msg = JSON.parse(message)
msg = fixupMessageForJsoneKeys(msg)
if(isVerbose) {
console.info("")
console.info("Message from topic " + topic)
console.info(" parsed: " + JSON.stringify(configs, null, 2))
}
} catch (error) {
console.error('Json parse on topic ' + topic + ' message was; ' + message)
console.error(error);
const msg = parseMessage(message, topic, configs, isVerbose)
if(msg === undefined) {
return
}
if(isVerbose) {
Expand Down
12 changes: 11 additions & 1 deletion src/operators.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import jsone from "json-e";
import {CollectOps, TransformationOps, FilterOps, OnceOps, RepeatOps, AllSupportedOps} from "./types";
import {PublishFunc} from "./handleMessage";
import {parseMessage, PublishFunc} from "./handleMessage";

export const evaluateTransformAndEmitLogic = (c: AllSupportedOps, mqttData: Map<string, any>, timerData: Map<number, object>, additionalConstants: object, mqttPublish: PublishFunc, isVerbose: boolean) => {
if(c.emitType === 'repeat') {
Expand Down Expand Up @@ -36,6 +36,16 @@ export const evaluateTransformAndEmitLogic = (c: AllSupportedOps, mqttData: Map<
if(hasAllDataPredicate(c, allData)){
mapAndEmit(c, {...additionalConstants, ...{messages: allData}}, mqttPublish, isVerbose)
}
} else if(c.emitType === 'reduce') {
if(typeof c.toTopicTemplate === 'string') {
const data = mqttData.get(c.id+c.fromTopics[0])
const stateData = parseMessage(mqttData.get(c.toTopicTemplate), c.toTopicTemplate, [c], isVerbose)
if(data) {
mapAndEmit(c, {...additionalConstants, ...{message: data, state: stateData ?? {}}}, mqttPublish, isVerbose)
}
} else {
console.error('ReduceOps toTopicTemplate must be string')
}
} else {
// @ts-ignore
console.info('Non valid emit type used in ' + c.id + ' (' + c.emitType + ')');
Expand Down
10 changes: 7 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type EmitType = 'constant' | 'repeat' | 'once' | 'map' | 'filter' | 'collect' | 'zipLast' | 'combineLatest'
type EmitType = 'constant' | 'repeat' | 'once' | 'map' | 'filter' | 'collect' | 'zipLast' | 'combineLatest' | 'reduce'
type IOType = 'webserver' | 'mqtt' | 'hookCall'

interface Transformer {
Expand All @@ -12,14 +12,14 @@ interface Transformation {
id: number,
fromTopics: string[],
topicKeyToMessage?: string,
toTopicTemplate: string,
toTopicTemplate: string | object,
emitType: EmitType,
wrapper?: string,
useConstants?: object,
useMetrics?: object,
}

export type TransformationOps = RepeatOps | OnceOps | MapOps | CollectOps | ZipLastOps | CombineLatestOps
export type TransformationOps = RepeatOps | OnceOps | MapOps | CollectOps | ZipLastOps | CombineLatestOps | ReduceOps
export type AllSupportedOps = FilterOps | TransformationOps
export type AllSupportedConfigs = ConstantDef | AllSupportedOps
export type AllSupportedIOs = MqttIO | HookCallIO | WebserverIO
Expand All @@ -45,6 +45,10 @@ export interface MapOps extends Transformation, Transformer {
emitType: "map",
}

export interface ReduceOps extends Transformation, Transformer {
emitType: "reduce",
}

export interface FilterOps extends Transformation, Filter {
emitType: "filter",
}
Expand Down

0 comments on commit b5da11a

Please sign in to comment.