From 0ca699fcde72cbe69a47cf935001e0245964d8f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20T=C3=B6rcsv=C3=A1ri?= Date: Sat, 29 Jun 2024 22:45:54 +0200 Subject: [PATCH] added new type `reduce` --- README.md | 6 ++++++ package.json | 4 ++-- src/app.ts | 1 + src/handleMessage.ts | 32 ++++++++++++++++++++------------ src/operators.ts | 12 +++++++++++- src/types.ts | 10 +++++++--- 6 files changed, 47 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 17fbaa2..3fc710f 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,12 @@ When it gets an element in each topic, it calls the `template`, emits the output The `template` will get a `{messages: []}` object, the indexes will match to the topic indexes. +### Reduce + +Gets the messages from the given topic(s), and the old state from the last emitted message, and combines them. +The `template` will get a `{message: {}, state: {}}` object. +`toTopic` should be used, `toTopicTemplate` will not work! + ### Constants You can add commonly used constants as a "transformation". diff --git a/package.json b/package.json index 4969ccf..cb07b9d 100644 --- a/package.json +++ b/package.json @@ -8,8 +8,8 @@ }, "main": "src/app.ts", "scripts": { - "test": "`npm bin`/ts-interface-builder src/types.ts && nyc ./node_modules/.bin/_mocha 'test/**/*.test.ts'", - "build": "`npm bin`/ts-interface-builder src/types.ts && npx tsc" + "test": "`npx ts-interface-builder src/types.ts && nyc ./node_modules/.bin/_mocha 'test/**/*.test.ts'", + "build": "npx ts-interface-builder src/types.ts && npx tsc" }, "keywords": [], "author": "tg44", diff --git a/src/app.ts b/src/app.ts index c859b0f..578e9a9 100644 --- a/src/app.ts +++ b/src/app.ts @@ -30,6 +30,7 @@ const metricsData: Map = new Map() const publishers: {io: AllSupportedIOs, publisher?: PublishFunc}[] = [] const globalPublisher = (topic: string, message: string) => { + mqttData.set(topic, message) publishers.forEach(p => { if(p.publisher && topic.startsWith(p.io.topicPrefix || "")) { const t = p.io.topicPrefix ? topic.replace(p.io.topicPrefix, "") : topic diff --git a/src/handleMessage.ts b/src/handleMessage.ts index c9c5df9..6a58612 100644 --- a/src/handleMessage.ts +++ b/src/handleMessage.ts @@ -5,6 +5,24 @@ import {hasKey} from "./utils"; export type PublishFunc = (topic: string, message: string) => void +export const parseMessage = (message: string, topic: string, configs: AllSupportedOps[], isVerbose: boolean): undefined | Record => { + let msg: any; + try { + msg = JSON.parse(message) + msg = fixupMessageForJsoneKeys(msg) + if(isVerbose) { + console.info("") + console.info("Message from topic " + topic) + console.info(" parsed: " + JSON.stringify(configs, null, 2)) + } + } catch (error) { + console.error('Json parse on topic ' + topic + ' message was; ' + message) + console.error(error); + return undefined; + } + return msg; +} + export const handleMessage = ( topic: string, message: string, @@ -18,18 +36,8 @@ export const handleMessage = ( isVerbose: boolean ) => { const configs = transforms.filter(element => element.fromTopics.some(subscription => mqttMatch(subscription, topic))); - let msg: any; - try { - msg = JSON.parse(message) - msg = fixupMessageForJsoneKeys(msg) - if(isVerbose) { - console.info("") - console.info("Message from topic " + topic) - console.info(" parsed: " + JSON.stringify(configs, null, 2)) - } - } catch (error) { - console.error('Json parse on topic ' + topic + ' message was; ' + message) - console.error(error); + const msg = parseMessage(message, topic, configs, isVerbose) + if(msg === undefined) { return } if(isVerbose) { diff --git a/src/operators.ts b/src/operators.ts index ff0c5ff..424c182 100644 --- a/src/operators.ts +++ b/src/operators.ts @@ -1,6 +1,6 @@ import jsone from "json-e"; import {CollectOps, TransformationOps, FilterOps, OnceOps, RepeatOps, AllSupportedOps} from "./types"; -import {PublishFunc} from "./handleMessage"; +import {parseMessage, PublishFunc} from "./handleMessage"; export const evaluateTransformAndEmitLogic = (c: AllSupportedOps, mqttData: Map, timerData: Map, additionalConstants: object, mqttPublish: PublishFunc, isVerbose: boolean) => { if(c.emitType === 'repeat') { @@ -36,6 +36,16 @@ export const evaluateTransformAndEmitLogic = (c: AllSupportedOps, mqttData: Map< if(hasAllDataPredicate(c, allData)){ mapAndEmit(c, {...additionalConstants, ...{messages: allData}}, mqttPublish, isVerbose) } + } else if(c.emitType === 'reduce') { + if(typeof c.toTopicTemplate === 'string') { + const data = mqttData.get(c.id+c.fromTopics[0]) + const stateData = parseMessage(mqttData.get(c.toTopicTemplate), c.toTopicTemplate, [c], isVerbose) + if(data) { + mapAndEmit(c, {...additionalConstants, ...{message: data, state: stateData ?? {}}}, mqttPublish, isVerbose) + } + } else { + console.error('ReduceOps toTopicTemplate must be string') + } } else { // @ts-ignore console.info('Non valid emit type used in ' + c.id + ' (' + c.emitType + ')'); diff --git a/src/types.ts b/src/types.ts index 377eaf3..9032c09 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,4 @@ -type EmitType = 'constant' | 'repeat' | 'once' | 'map' | 'filter' | 'collect' | 'zipLast' | 'combineLatest' +type EmitType = 'constant' | 'repeat' | 'once' | 'map' | 'filter' | 'collect' | 'zipLast' | 'combineLatest' | 'reduce' type IOType = 'webserver' | 'mqtt' | 'hookCall' interface Transformer { @@ -12,14 +12,14 @@ interface Transformation { id: number, fromTopics: string[], topicKeyToMessage?: string, - toTopicTemplate: string, + toTopicTemplate: string | object, emitType: EmitType, wrapper?: string, useConstants?: object, useMetrics?: object, } -export type TransformationOps = RepeatOps | OnceOps | MapOps | CollectOps | ZipLastOps | CombineLatestOps +export type TransformationOps = RepeatOps | OnceOps | MapOps | CollectOps | ZipLastOps | CombineLatestOps | ReduceOps export type AllSupportedOps = FilterOps | TransformationOps export type AllSupportedConfigs = ConstantDef | AllSupportedOps export type AllSupportedIOs = MqttIO | HookCallIO | WebserverIO @@ -45,6 +45,10 @@ export interface MapOps extends Transformation, Transformer { emitType: "map", } +export interface ReduceOps extends Transformation, Transformer { + emitType: "reduce", +} + export interface FilterOps extends Transformation, Filter { emitType: "filter", }