Skip to content

Commit

Permalink
feat: stop using measures from assets and devices in exporter (#369)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuruyia authored Nov 13, 2024
1 parent 4b37c93 commit b49ae3a
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 61 deletions.
2 changes: 1 addition & 1 deletion lib/modules/measure/MeasureExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export class MeasureExporter extends AbstractExporter<MeasureExportParams> {
...measureColumns,
];

const stream = this.getExportStream(result, columns);
const stream = this.getExportStream(result, columns, engineId);
await this.sdk.ms.del(this.exportRedisKey(engineId, exportId));

return stream;
Expand Down
13 changes: 13 additions & 0 deletions lib/modules/shared/services/AbstractExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,19 @@ export interface Column {
isIsoDate?: boolean;
}

export type ExportStreamAugmenter = (
result: SearchResult<KHit<KDocumentContentGeneric>>,
columns: Column[],
engineId: string,
) => Promise<void>;

export abstract class AbstractExporter<P extends ExportParams = ExportParams> {
protected config: ExporterOption = {
expireTime: 2 * 60,
};

protected exportStreamAugmenters: ExportStreamAugmenter[] = [];

constructor(
protected plugin: DeviceManagerPlugin,
protected target: InternalCollection,
Expand Down Expand Up @@ -142,13 +150,18 @@ export abstract class AbstractExporter<P extends ExportParams = ExportParams> {
async getExportStream(
request: SearchResult<KHit<KDocumentContentGeneric>>,
columns: Column[],
engineId: string,
) {
const stream = new PassThrough();

let result = request;
try {
stream.write(stringify([columns.map((column) => column.header)]));
while (result) {
for (const augmenter of this.exportStreamAugmenters) {
await augmenter(result, columns, engineId);
}

for (const hit of result.hits) {
stream.write(stringify([this.formatHit(columns, hit)]));
}
Expand Down
85 changes: 80 additions & 5 deletions lib/modules/shared/services/DigitalTwinExporter.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
import { JSONObject } from "kuzzle";
import { JSONObject, KHit, SearchResult } from "kuzzle";
import { ask } from "kuzzle-plugin-commons";
import { UUID } from "node:crypto";

import { DigitalTwinContent, flattenObject } from "../";
import {
AskDigitalTwinLastMeasuresGet,
DigitalTwinContent,
DigitalTwinMeasures,
EmbeddedMeasure,
flattenObject,
} from "../";
import { NamedMeasures } from "../../decoder";
import { MeasureContent, MeasureOriginDevice } from "../../measure";
import {
AskModelMeasureGet,
AssetModelContent,
DeviceModelContent,
} from "../../model";
import { InternalCollection } from "../../plugin";
import { AbstractExporter, Column } from "./AbstractExporter";
import { DeviceManagerPlugin, InternalCollection } from "../../plugin";
import { AbstractExporter, Column, ExporterOption } from "./AbstractExporter";

interface MeasureColumn extends Column {
isMeasure: boolean;
}

interface DigitalTwinExtraData {
measures: DigitalTwinMeasures;
lastMeasuredAt: number;
}

export class DigitalTwinExporter extends AbstractExporter {
constructor(
protected plugin: DeviceManagerPlugin,
protected target: InternalCollection,
config: Partial<ExporterOption> = {},
) {
super(plugin, target, config);

this.exportStreamAugmenters.push(this.addMeasuresToExportStream.bind(this));
}

protected exportRedisKey(engineId: string, exportId: string) {
return `exports:${engineId}:${this.target}:${exportId}`;
}
Expand Down Expand Up @@ -54,7 +76,7 @@ export class DigitalTwinExporter extends AbstractExporter {
},
];

const stream = this.getExportStream(digitalTwins, columns);
const stream = this.getExportStream(digitalTwins, columns, engineId);

await this.sdk.ms.del(this.exportRedisKey(engineId, exportId));

Expand Down Expand Up @@ -144,4 +166,57 @@ export class DigitalTwinExporter extends AbstractExporter {

return columns;
}

private async addMeasuresToExportStream(
result: SearchResult<KHit<DigitalTwinContent & DigitalTwinExtraData>>,
_: Column[],
engineId: string,
) {
const type = this.target === InternalCollection.ASSETS ? "asset" : "device";

for (const hit of result.hits) {
let lastMeasures: MeasureContent[];

try {
lastMeasures = await ask<AskDigitalTwinLastMeasuresGet>(
`ask:device-manager:${type}:get-last-measures`,
{
digitalTwinId: hit._id,
engineId,
},
);
} catch (e) {
continue;
}

hit._source.measures = lastMeasures.reduce((accumulator, measure) => {
const measureName =
type === "asset"
? measure.asset.measureName
: (measure.origin as MeasureOriginDevice).measureName;

const embeddedMeasure: EmbeddedMeasure = {
measuredAt: measure.measuredAt,
name: measureName,
originId: measure.origin._id,
payloadUuids: measure.origin.payloadUuids,
type: measure.type,
values: measure.values,
};

return {
...accumulator,
[measureName]: embeddedMeasure,
};
}, {});

const lastMeasuredAt = Math.max(
...lastMeasures.map((measure) => measure.measuredAt),
);

if (Number.isFinite(lastMeasuredAt)) {
hit._source.lastMeasuredAt = lastMeasuredAt;
}
}
}
}
73 changes: 42 additions & 31 deletions tests/scenario/modules/assets/action-export.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import fixtures from "../../../fixtures/fixtures";
const assetCount = fixtures["engine-ayse"].assets.length / 2;
jest.setTimeout(10000);

function getExportedColums(row) {
function getExportedColums(row: string) {
const parsedRow = csvParse(row)[0];

return {
Expand All @@ -20,39 +20,43 @@ function getExportedColums(row) {
co2: parsedRow[3],
humidity: parsedRow[4],
illuminance: parsedRow[5],
position: parsedRow[6],
positionAccuracy: parsedRow[7],
positionAltitude: parsedRow[8],
powerConsumptionWatt: parsedRow[9],
temperature: parsedRow[10],
temperatureExt: parsedRow[11],
temperatureInt: parsedRow[12],
temperatureWeather: parsedRow[13],
lastMeasuredAt: parsedRow[14],
lastMeasuredAtISO: parsedRow[15],
magiculeExt: parsedRow[6],
magiculeInt: parsedRow[7],
position: parsedRow[8],
positionAccuracy: parsedRow[9],
positionAltitude: parsedRow[10],
powerConsumptionWatt: parsedRow[11],
temperature: parsedRow[12],
temperatureExt: parsedRow[13],
temperatureInt: parsedRow[14],
temperatureWeather: parsedRow[15],
lastMeasuredAt: parsedRow[16],
lastMeasuredAtISO: parsedRow[17],
};
}

describe("AssetsController:exportMeasures", () => {
const sdk = setupHooks();

it("should prepare export of different assets types and return a CSV as stream", async () => {
const measureDate = Date.now();

await sendDummyTempPositionPayloads(sdk, [
{
deviceEUI: "warehouse",
temperature: 23.3,
location: { lat: 42.2, lon: 2.42, accuracy: 2100 },
battery: 0.8,
// ? Use date now - 1s to ensure this asset are second in export
measuredAt: Date.now() - 2000,
measuredAt: measureDate - 2000,
},
{
deviceEUI: "linked2",
temperature: 23.3,
location: { lat: 42.2, lon: 2.42, accuracy: 2100 },
battery: 0.8,
// ? Use date now to ensure this asset is first in export
measuredAt: Date.now(),
measuredAt: measureDate,
},
]);
await sdk.collection.refresh("engine-ayse", "assets");
Expand All @@ -64,9 +68,6 @@ describe("AssetsController:exportMeasures", () => {
controller: "device-manager/assets",
action: "export",
engineId: "engine-ayse",
body: {
sort: { lastMeasuredAt: "desc" },
},
});

expect(typeof result.link).toBe("string");
Expand All @@ -85,48 +86,58 @@ describe("AssetsController:exportMeasures", () => {

writeFileSync("./assets.csv", csv.join(""));

expect(csv[0]).toBe(
expect(csv).toHaveLength(assetCount + 1);

const header = csv.shift();

expect(header).toBe(
"Model,Reference,brightness.lumens,co2,humidity,illuminance,magiculeExt,magiculeInt,position,position.accuracy,position.altitude,powerConsumption.watt,temperature,temperatureExt,temperatureInt,temperatureWeather,lastMeasuredAt,lastMeasuredAtISO\n",
);

expect(csv).toHaveLength(assetCount + 1);
const rows = csv
.map(getExportedColums)
.sort((a, b) => b.lastMeasuredAt - a.lastMeasuredAt);

const row1 = getExportedColums(csv[1]);
const row1 = rows[0];

expect(row1.model).toBe("Container");
expect(typeof row1.reference).toBe("string");
expect(row1.reference).toBe("linked2");
expect(row1.position).toBe('{"lat":42.2,"lon":2.42}');
expect(parseFloat(row1.positionAccuracy)).toBe(2100);
expect(parseFloat(row1.temperatureExt)).toBe(23.3);
expect(parseFloat(row1.lastMeasuredAt)).toBe(measureDate);
expect(row1.lastMeasuredAtISO).toBe(new Date(measureDate).toISOString());

expect(typeof parseFloat(row1.brightnessLumens)).toBe("number");
expect(typeof parseFloat(row1.co2)).toBe("number");
expect(typeof parseFloat(row1.humidity)).toBe("number");
expect(typeof parseFloat(row1.illuminance)).toBe("number");
expect(typeof row1.position).toBe("string");
expect(typeof parseFloat(row1.positionAccuracy)).toBe("number");
expect(typeof parseFloat(row1.positionAltitude)).toBe("number");
expect(typeof parseFloat(row1.powerConsumptionWatt)).toBe("number");
expect(typeof parseFloat(row1.temperature)).toBe("number");
expect(typeof parseFloat(row1.temperatureExt)).toBe("number");
expect(typeof parseFloat(row1.temperatureInt)).toBe("number");
expect(typeof parseFloat(row1.temperatureWeather)).toBe("number");
expect(typeof parseFloat(row1.lastMeasuredAt)).toBe("number");
expect(typeof row1.lastMeasuredAtISO).toBe("string");

const row2 = getExportedColums(csv[2]);
const row2 = rows[1];

expect(row2.model).toBe("Warehouse");
expect(typeof row2.reference).toBe("string");
expect(row2.reference).toBe("linked");
expect(row2.position).toBe('{"lat":42.2,"lon":2.42}');
expect(parseFloat(row2.positionAccuracy)).toBe(2100);
expect(parseFloat(row2.lastMeasuredAt)).toBe(measureDate - 2000);
expect(row2.lastMeasuredAtISO).toBe(
new Date(measureDate - 2000).toISOString(),
);

expect(typeof parseFloat(row2.brightnessLumens)).toBe("number");
expect(typeof parseFloat(row2.co2)).toBe("number");
expect(typeof parseFloat(row2.humidity)).toBe("number");
expect(typeof parseFloat(row2.illuminance)).toBe("number");
expect(typeof row2.position).toBe("string");
expect(typeof parseFloat(row2.positionAccuracy)).toBe("number");
expect(typeof parseFloat(row2.positionAltitude)).toBe("number");
expect(typeof parseFloat(row2.powerConsumptionWatt)).toBe("number");
expect(typeof parseFloat(row2.temperature)).toBe("number");
expect(typeof parseFloat(row2.temperatureExt)).toBe("number");
expect(typeof parseFloat(row2.temperatureInt)).toBe("number");
expect(typeof parseFloat(row2.temperatureWeather)).toBe("number");
expect(typeof parseFloat(row2.lastMeasuredAt)).toBe("number");
expect(typeof row2.lastMeasuredAtISO).toBe("string");
});
});
Loading

0 comments on commit b49ae3a

Please sign in to comment.