diff --git a/lib/modules/measure/MeasureExporter.ts b/lib/modules/measure/MeasureExporter.ts index 1928a5ac..1de28c5f 100644 --- a/lib/modules/measure/MeasureExporter.ts +++ b/lib/modules/measure/MeasureExporter.ts @@ -212,7 +212,7 @@ export class MeasureExporter extends AbstractExporter { ...measureColumns, ]; - const stream = this.getExportStream(result, columns); + const stream = this.getExportStream(result, columns, engineId); await this.sdk.ms.del(this.exportRedisKey(engineId, exportId)); return stream; diff --git a/lib/modules/shared/services/AbstractExporter.ts b/lib/modules/shared/services/AbstractExporter.ts index f0a60a16..f68bcdec 100644 --- a/lib/modules/shared/services/AbstractExporter.ts +++ b/lib/modules/shared/services/AbstractExporter.ts @@ -34,11 +34,19 @@ export interface Column { isIsoDate?: boolean; } +export type ExportStreamAugmenter = ( + result: SearchResult>, + columns: Column[], + engineId: string, +) => Promise; + export abstract class AbstractExporter

{ protected config: ExporterOption = { expireTime: 2 * 60, }; + protected exportStreamAugmenters: ExportStreamAugmenter[] = []; + constructor( protected plugin: DeviceManagerPlugin, protected target: InternalCollection, @@ -142,6 +150,7 @@ export abstract class AbstractExporter

{ async getExportStream( request: SearchResult>, columns: Column[], + engineId: string, ) { const stream = new PassThrough(); @@ -149,6 +158,10 @@ export abstract class AbstractExporter

{ try { stream.write(stringify([columns.map((column) => column.header)])); while (result) { + for (const augmenter of this.exportStreamAugmenters) { + await augmenter(result, columns, engineId); + } + for (const hit of result.hits) { stream.write(stringify([this.formatHit(columns, hit)])); } diff --git a/lib/modules/shared/services/DigitalTwinExporter.ts b/lib/modules/shared/services/DigitalTwinExporter.ts index dbcf6957..0713c60c 100644 --- a/lib/modules/shared/services/DigitalTwinExporter.ts +++ b/lib/modules/shared/services/DigitalTwinExporter.ts @@ -1,22 +1,44 @@ -import { JSONObject } from "kuzzle"; +import { JSONObject, KHit, SearchResult } from "kuzzle"; import { ask } from "kuzzle-plugin-commons"; import { UUID } from "node:crypto"; -import { DigitalTwinContent, flattenObject } from "../"; +import { + AskDigitalTwinLastMeasuresGet, + DigitalTwinContent, + DigitalTwinMeasures, + EmbeddedMeasure, + flattenObject, +} from "../"; import { NamedMeasures } from "../../decoder"; +import { MeasureContent, MeasureOriginDevice } from "../../measure"; import { AskModelMeasureGet, AssetModelContent, DeviceModelContent, } from "../../model"; -import { InternalCollection } from "../../plugin"; -import { AbstractExporter, Column } from "./AbstractExporter"; +import { DeviceManagerPlugin, InternalCollection } from "../../plugin"; +import { AbstractExporter, Column, ExporterOption } from "./AbstractExporter"; interface MeasureColumn extends Column { isMeasure: boolean; } +interface DigitalTwinExtraData { + measures: DigitalTwinMeasures; + lastMeasuredAt: number; +} + export class DigitalTwinExporter extends AbstractExporter { + constructor( + protected plugin: DeviceManagerPlugin, + protected target: InternalCollection, + config: Partial = {}, + ) { + super(plugin, target, config); + + this.exportStreamAugmenters.push(this.addMeasuresToExportStream.bind(this)); + } + protected exportRedisKey(engineId: string, exportId: string) { return `exports:${engineId}:${this.target}:${exportId}`; } @@ -54,7 +76,7 @@ export class DigitalTwinExporter extends AbstractExporter { }, ]; - const stream = this.getExportStream(digitalTwins, columns); + const stream = this.getExportStream(digitalTwins, columns, engineId); await this.sdk.ms.del(this.exportRedisKey(engineId, exportId)); @@ -144,4 +166,57 @@ export class DigitalTwinExporter extends AbstractExporter { return columns; } + + private async addMeasuresToExportStream( + result: SearchResult>, + _: Column[], + engineId: string, + ) { + const type = this.target === InternalCollection.ASSETS ? "asset" : "device"; + + for (const hit of result.hits) { + let lastMeasures: MeasureContent[]; + + try { + lastMeasures = await ask( + `ask:device-manager:${type}:get-last-measures`, + { + digitalTwinId: hit._id, + engineId, + }, + ); + } catch (e) { + continue; + } + + hit._source.measures = lastMeasures.reduce((accumulator, measure) => { + const measureName = + type === "asset" + ? measure.asset.measureName + : (measure.origin as MeasureOriginDevice).measureName; + + const embeddedMeasure: EmbeddedMeasure = { + measuredAt: measure.measuredAt, + name: measureName, + originId: measure.origin._id, + payloadUuids: measure.origin.payloadUuids, + type: measure.type, + values: measure.values, + }; + + return { + ...accumulator, + [measureName]: embeddedMeasure, + }; + }, {}); + + const lastMeasuredAt = Math.max( + ...lastMeasures.map((measure) => measure.measuredAt), + ); + + if (Number.isFinite(lastMeasuredAt)) { + hit._source.lastMeasuredAt = lastMeasuredAt; + } + } + } } diff --git a/tests/scenario/modules/assets/action-export.test.ts b/tests/scenario/modules/assets/action-export.test.ts index 62d32cf6..34adcbb6 100644 --- a/tests/scenario/modules/assets/action-export.test.ts +++ b/tests/scenario/modules/assets/action-export.test.ts @@ -10,7 +10,7 @@ import fixtures from "../../../fixtures/fixtures"; const assetCount = fixtures["engine-ayse"].assets.length / 2; jest.setTimeout(10000); -function getExportedColums(row) { +function getExportedColums(row: string) { const parsedRow = csvParse(row)[0]; return { @@ -20,16 +20,18 @@ function getExportedColums(row) { co2: parsedRow[3], humidity: parsedRow[4], illuminance: parsedRow[5], - position: parsedRow[6], - positionAccuracy: parsedRow[7], - positionAltitude: parsedRow[8], - powerConsumptionWatt: parsedRow[9], - temperature: parsedRow[10], - temperatureExt: parsedRow[11], - temperatureInt: parsedRow[12], - temperatureWeather: parsedRow[13], - lastMeasuredAt: parsedRow[14], - lastMeasuredAtISO: parsedRow[15], + magiculeExt: parsedRow[6], + magiculeInt: parsedRow[7], + position: parsedRow[8], + positionAccuracy: parsedRow[9], + positionAltitude: parsedRow[10], + powerConsumptionWatt: parsedRow[11], + temperature: parsedRow[12], + temperatureExt: parsedRow[13], + temperatureInt: parsedRow[14], + temperatureWeather: parsedRow[15], + lastMeasuredAt: parsedRow[16], + lastMeasuredAtISO: parsedRow[17], }; } @@ -37,6 +39,8 @@ describe("AssetsController:exportMeasures", () => { const sdk = setupHooks(); it("should prepare export of different assets types and return a CSV as stream", async () => { + const measureDate = Date.now(); + await sendDummyTempPositionPayloads(sdk, [ { deviceEUI: "warehouse", @@ -44,7 +48,7 @@ describe("AssetsController:exportMeasures", () => { location: { lat: 42.2, lon: 2.42, accuracy: 2100 }, battery: 0.8, // ? Use date now - 1s to ensure this asset are second in export - measuredAt: Date.now() - 2000, + measuredAt: measureDate - 2000, }, { deviceEUI: "linked2", @@ -52,7 +56,7 @@ describe("AssetsController:exportMeasures", () => { location: { lat: 42.2, lon: 2.42, accuracy: 2100 }, battery: 0.8, // ? Use date now to ensure this asset is first in export - measuredAt: Date.now(), + measuredAt: measureDate, }, ]); await sdk.collection.refresh("engine-ayse", "assets"); @@ -64,9 +68,6 @@ describe("AssetsController:exportMeasures", () => { controller: "device-manager/assets", action: "export", engineId: "engine-ayse", - body: { - sort: { lastMeasuredAt: "desc" }, - }, }); expect(typeof result.link).toBe("string"); @@ -85,48 +86,58 @@ describe("AssetsController:exportMeasures", () => { writeFileSync("./assets.csv", csv.join("")); - expect(csv[0]).toBe( + expect(csv).toHaveLength(assetCount + 1); + + const header = csv.shift(); + + expect(header).toBe( "Model,Reference,brightness.lumens,co2,humidity,illuminance,magiculeExt,magiculeInt,position,position.accuracy,position.altitude,powerConsumption.watt,temperature,temperatureExt,temperatureInt,temperatureWeather,lastMeasuredAt,lastMeasuredAtISO\n", ); - expect(csv).toHaveLength(assetCount + 1); + const rows = csv + .map(getExportedColums) + .sort((a, b) => b.lastMeasuredAt - a.lastMeasuredAt); - const row1 = getExportedColums(csv[1]); + const row1 = rows[0]; expect(row1.model).toBe("Container"); - expect(typeof row1.reference).toBe("string"); + expect(row1.reference).toBe("linked2"); + expect(row1.position).toBe('{"lat":42.2,"lon":2.42}'); + expect(parseFloat(row1.positionAccuracy)).toBe(2100); + expect(parseFloat(row1.temperatureExt)).toBe(23.3); + expect(parseFloat(row1.lastMeasuredAt)).toBe(measureDate); + expect(row1.lastMeasuredAtISO).toBe(new Date(measureDate).toISOString()); + expect(typeof parseFloat(row1.brightnessLumens)).toBe("number"); expect(typeof parseFloat(row1.co2)).toBe("number"); expect(typeof parseFloat(row1.humidity)).toBe("number"); expect(typeof parseFloat(row1.illuminance)).toBe("number"); - expect(typeof row1.position).toBe("string"); - expect(typeof parseFloat(row1.positionAccuracy)).toBe("number"); expect(typeof parseFloat(row1.positionAltitude)).toBe("number"); expect(typeof parseFloat(row1.powerConsumptionWatt)).toBe("number"); expect(typeof parseFloat(row1.temperature)).toBe("number"); - expect(typeof parseFloat(row1.temperatureExt)).toBe("number"); expect(typeof parseFloat(row1.temperatureInt)).toBe("number"); expect(typeof parseFloat(row1.temperatureWeather)).toBe("number"); - expect(typeof parseFloat(row1.lastMeasuredAt)).toBe("number"); - expect(typeof row1.lastMeasuredAtISO).toBe("string"); - const row2 = getExportedColums(csv[2]); + const row2 = rows[1]; expect(row2.model).toBe("Warehouse"); - expect(typeof row2.reference).toBe("string"); + expect(row2.reference).toBe("linked"); + expect(row2.position).toBe('{"lat":42.2,"lon":2.42}'); + expect(parseFloat(row2.positionAccuracy)).toBe(2100); + expect(parseFloat(row2.lastMeasuredAt)).toBe(measureDate - 2000); + expect(row2.lastMeasuredAtISO).toBe( + new Date(measureDate - 2000).toISOString(), + ); + expect(typeof parseFloat(row2.brightnessLumens)).toBe("number"); expect(typeof parseFloat(row2.co2)).toBe("number"); expect(typeof parseFloat(row2.humidity)).toBe("number"); expect(typeof parseFloat(row2.illuminance)).toBe("number"); - expect(typeof row2.position).toBe("string"); - expect(typeof parseFloat(row2.positionAccuracy)).toBe("number"); expect(typeof parseFloat(row2.positionAltitude)).toBe("number"); expect(typeof parseFloat(row2.powerConsumptionWatt)).toBe("number"); expect(typeof parseFloat(row2.temperature)).toBe("number"); expect(typeof parseFloat(row2.temperatureExt)).toBe("number"); expect(typeof parseFloat(row2.temperatureInt)).toBe("number"); expect(typeof parseFloat(row2.temperatureWeather)).toBe("number"); - expect(typeof parseFloat(row2.lastMeasuredAt)).toBe("number"); - expect(typeof row2.lastMeasuredAtISO).toBe("string"); }); }); diff --git a/tests/scenario/modules/devices/action-export.test.ts b/tests/scenario/modules/devices/action-export.test.ts index edfec350..b258b44f 100644 --- a/tests/scenario/modules/devices/action-export.test.ts +++ b/tests/scenario/modules/devices/action-export.test.ts @@ -16,7 +16,7 @@ import fixtures from "../../../fixtures/fixtures"; const deviceCount = fixtures["engine-ayse"].devices.length / 2; jest.setTimeout(10000); -function getExportedColums(row) { +function getExportedColums(row: string) { const parsedRow = csvParse(row)[0]; return { @@ -36,17 +36,19 @@ function getExportedColums(row) { }; } -describe("AssetsController:exportMeasures", () => { +describe("DevicesController:exportMeasures", () => { const sdk = setupHooks(); it("should prepare export of different devices types and return a CSV as stream", async () => { + const measureDate = Date.now(); + await sendDummyTempPayloads(sdk, [ { deviceEUI: "linked1", temperature: 23.3, battery: 0.8, // ? Use date now - 1s to ensure this asset are second in export - measuredAt: Date.now() - 1000, + measuredAt: measureDate - 1000, }, ]); await sendDummyTempPositionPayloads(sdk, [ @@ -56,10 +58,11 @@ describe("AssetsController:exportMeasures", () => { location: { lat: 42.2, lon: 2.42, accuracy: 2100 }, battery: 0.8, // ? Use date now to ensure this asset is first in export - measuredAt: Date.now(), + measuredAt: measureDate, }, ]); await sdk.collection.refresh("engine-ayse", "devices"); + await sdk.collection.refresh("engine-ayse", "measures"); const { result } = await sdk.query< ApiDeviceExportRequest, ApiDeviceExportResult @@ -67,9 +70,6 @@ describe("AssetsController:exportMeasures", () => { controller: "device-manager/devices", action: "export", engineId: "engine-ayse", - body: { - sort: { lastMeasuredAt: "desc" }, - }, }); expect(typeof result.link).toBe("string"); @@ -86,42 +86,52 @@ describe("AssetsController:exportMeasures", () => { response.data.on("end", resolve); }); - expect(csv[0]).toBe( + expect(csv).toHaveLength(deviceCount + 1); + + const header = csv.shift(); + + expect(header).toBe( "Model,Reference,accelerationSensor.x,accelerationSensor.y,accelerationSensor.z,accelerationSensor.accuracy,battery,position,position.accuracy,position.altitude,temperature,lastMeasuredAt,lastMeasuredAtISO\n", ); - expect(csv).toHaveLength(deviceCount + 1); + const rows = csv + .map(getExportedColums) + .sort((a, b) => b.lastMeasuredAt - a.lastMeasuredAt); - const row1 = getExportedColums(csv[1]); + const row1 = rows[0]; expect(row1.model).toBe("DummyTempPosition"); - expect(typeof row1.reference).toBe("string"); + expect(row1.reference).toBe("linked2"); + expect(parseFloat(row1.battery)).toBe(80); + expect(row1.position).toBe('{"lat":42.2,"lon":2.42}'); + expect(parseFloat(row1.positionAccuracy)).toBe(2100); + expect(parseFloat(row1.temperature)).toBe(23.3); + expect(parseFloat(row1.lastMeasuredAt)).toBe(measureDate); + expect(row1.lastMeasuredAtISO).toBe(new Date(measureDate).toISOString()); + expect(typeof parseFloat(row1.accelerationSensorX)).toBe("number"); expect(typeof parseFloat(row1.accelerationSensorY)).toBe("number"); expect(typeof parseFloat(row1.accelerationSensorZ)).toBe("number"); expect(typeof parseFloat(row1.accelerationSensorAccuracy)).toBe("number"); - expect(typeof parseFloat(row1.battery)).toBe("number"); - expect(typeof row1.position).toBe("string"); - expect(typeof parseFloat(row1.positionAccuracy)).toBe("number"); expect(typeof parseFloat(row1.positionAltitude)).toBe("number"); - expect(typeof parseFloat(row1.temperature)).toBe("number"); - expect(typeof parseFloat(row1.lastMeasuredAt)).toBe("number"); - expect(typeof row1.lastMeasuredAtISO).toBe("string"); - const row2 = getExportedColums(csv[1]); + const row2 = rows[1]; + + expect(row2.model).toBe("DummyTemp"); + expect(row2.reference).toBe("linked1"); + expect(parseFloat(row2.battery)).toBe(0.8); + expect(parseFloat(row2.temperature)).toBe(23.3); + expect(parseFloat(row2.lastMeasuredAt)).toBe(measureDate - 1000); + expect(row2.lastMeasuredAtISO).toBe( + new Date(measureDate - 1000).toISOString(), + ); - expect(row2.model).toBe("DummyTempPosition"); - expect(typeof row2.reference).toBe("string"); expect(typeof parseFloat(row2.accelerationSensorX)).toBe("number"); expect(typeof parseFloat(row2.accelerationSensorY)).toBe("number"); expect(typeof parseFloat(row2.accelerationSensorZ)).toBe("number"); expect(typeof parseFloat(row2.accelerationSensorAccuracy)).toBe("number"); - expect(typeof parseFloat(row2.battery)).toBe("number"); expect(typeof row2.position).toBe("string"); expect(typeof parseFloat(row2.positionAccuracy)).toBe("number"); expect(typeof parseFloat(row2.positionAltitude)).toBe("number"); - expect(typeof parseFloat(row2.temperature)).toBe("number"); - expect(typeof parseFloat(row2.lastMeasuredAt)).toBe("number"); - expect(typeof row2.lastMeasuredAtISO).toBe("string"); }); });