Skip to content

Commit

Permalink
Merge pull request #318 from kuzzleio/feat/optim_migrate_assets
Browse files Browse the repository at this point in the history
fix(asset_migration): fixes multiple assets migration, avoid inconsistencies
  • Loading branch information
Olive3034 authored Oct 20, 2023
2 parents 58cc2bd + 382d0e1 commit 76663bf
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 263 deletions.
344 changes: 135 additions & 209 deletions lib/modules/asset/AssetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import {
AssetHistoryContent,
AssetHistoryEventMetadata,
} from "./types/AssetHistoryContent";
import { RecoveryQueue } from "../shared/utils/recoveryQueue";
import { ApiAssetMigrateTenantResult } from "./types/AssetApi";

export class AssetService {
private context: PluginContext;
Expand Down Expand Up @@ -279,239 +279,165 @@ export class AssetService {
assetsList: string[],
engineId: string,
newEngineId: string
): Promise<void> {
return lock(`engine:${engineId}:${newEngineId}`, async () => {
const recovery = new RecoveryQueue();
): Promise<ApiAssetMigrateTenantResult> {
let errors = [];
let successes = [];

//Sanity check
if (assetsList.length === 0) {
throw new BadRequestError("No assets to migrate");
}

await lock(`engine:${engineId}:${newEngineId}`, async () => {
if (!user.profileIds.includes("admin")) {
throw new BadRequestError(
`User ${user._id} is not authorized to migrate assets`
);
}

try {
// check if tenant destination of the the same group
const engine = await this.getEngine(engineId);
const newEngine = await this.getEngine(newEngineId);

if (engine.group !== newEngine.group) {
throw new BadRequestError(
`Engine ${newEngineId} is not in the same group as ${engineId}`
);
}

if (assetsList.length === 0) {
throw new BadRequestError("No assets to migrate");
}
// check if tenant destination is in the same group
const engine = await this.getEngine(engineId);
const newEngine = await this.getEngine(newEngineId);

const assets = await this.sdk.document.mGet<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetsList
if (engine.group !== newEngine.group) {
throw new BadRequestError(
`Engine ${newEngineId} is not in the same group as ${engineId}`
);
}

// check if the assets exists in the other engine
const existingAssets = await this.sdk.document.mGet<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetsList
);
//First of all, as mCreate seems to be buggy, ensure some assets don't
//already exists in the destination tenant
const assetsCheck = await this.sdk.document.mGet<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetsList
);
const assetsCheckedIdExisting = assetsCheck.successes.map((a) => a._id);
errors = [...assetsCheckedIdExisting];

if (existingAssets.successes.length > 0) {
throw new BadRequestError(
`Assets ${existingAssets.successes
.map((asset) => asset._id)
.join(", ")} already exists in engine ${newEngineId}`
);
}
const assetsToMigrate = assets.successes.map((asset) => ({
_id: asset._id,
body: asset._source,
}));
//Get all assets to migrate
const assetsCheckedList = assetsList.filter(
(id) => !assetsCheckedIdExisting.includes(id)
);
const assets = await this.sdk.document.mGet<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetsCheckedList
);
errors = errors.concat(...assets.errors);

const devices = await this.sdk.document.search<AssetContent>(
engineId,
InternalCollection.DEVICES,
{
query: {
bool: {
filter: {
terms: {
assetId: assetsList,
},
},
},
},
}
);
if (assets.successes.length === 0) {
this.context.log.error("No assets found to migrate");
return { errors, successes };
}

// Map linked devices for assets.
const assetLinkedDevices = assets.successes
.filter((asset) => asset._source.linkedDevices.length > 0)
.map((asset) => ({
assetId: asset._id,
linkedDevices: asset._source.linkedDevices,
}));

// Extra recovery step to relink back assets to their devices in case of rollback
recovery.addRecovery(async () => {
// Link the devices to the new assets
const promises: Promise<void>[] = [];

for (const asset of assetLinkedDevices) {
const assetId = asset.assetId;
for (const device of asset.linkedDevices) {
const deviceId = device._id;
const measureNames = device.measureNames;
promises.push(
ask<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
{
assetId,
deviceId,
engineId,
measureNames: measureNames,
user,
}
)
);
}
}
await Promise.all(promises);
});

// detach from current tenant
await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
);
})
);
//Get all the asset content, in order to create thm by batch
const assetsContent = assets.successes.map((asset) => ({
_id: asset._id,
body: asset._source,
}));

//We want to create the new asset with linked devices and groups empty
const assetsContentCopy = _.cloneDeep(assetsContent);
for (const asset of assetsContentCopy) {
asset.body.linkedDevices = [];
asset.body.groups = [];
}

// Attach to new tenant
await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId: newEngineId, user }
);
})
);
//Even if they exist in the destination tenant, try creating them all
//using batch
const assetsCreated = await this.sdk.document.mCreate(
newEngineId,
InternalCollection.ASSETS,
assetsContentCopy
);

// recovery function to reattach devices to the old tenant
recovery.addRecovery(async () => {
await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
);
})
);
//We consider here we will return as success what we have been able
//to create, and related errors
const assetsCreatedId = assetsCreated.successes.map((a) => a._id);
const assetsNotCreatedId = assetsCreated.errors.map(
(a) => a.document._id
);
successes = [...assetsCreatedId];
errors = errors.concat(...assetsNotCreatedId);

//Iterate over all created asset, and migrate each one
for (const asset of assetsCreated.successes) {
//We need to recover the linked devices to this asset,
//because we reset them when we create the asset
const assetOriginal = assets.successes.find((a) => a._id === asset._id);

// get linked devices to this asset, if any
const linkedDevices = assetOriginal._source.linkedDevices.map((d) => ({
_id: d._id,
measureNames: d.measureNames,
}));

await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId, user }
);
})
// ... and iterate over this list
for (const device of linkedDevices) {
// detach linked devices from current tenant (it also unkinks asset)
await ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
);
});

// Create the assets in the new tenant
await this.sdk.document.mCreate(
newEngineId,
InternalCollection.ASSETS,
assetsToMigrate
);

recovery.addRecovery(async () => {
await this.sdk.document.mDelete(
newEngineId,
InternalCollection.ASSETS,
assetsList
// ... and attach to new tenant
await ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId: newEngineId, user }
);
});

// Delete the assets in the old tenant
await this.sdk.document.mDelete(
engineId,
InternalCollection.ASSETS,
assetsList
);

recovery.addRecovery(async () => {
await this.sdk.document.mCreate(
engineId,
InternalCollection.ASSETS,
assetsToMigrate
// ... and link this device to the asset in the new tenant
await ask<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
{
assetId: asset._id,
deviceId: device._id,
engineId: newEngineId,
measureNames: device.measureNames,
user,
}
);
});

// Link the devices to the new assets
const promises: Promise<void>[] = [];

for (const asset of assetLinkedDevices) {
const assetId = asset.assetId;
for (const device of asset.linkedDevices) {
const deviceId = device._id;
const measureNames = device.measureNames;
promises.push(
ask<AskDeviceLinkAsset>("ask:device-manager:device:link-asset", {
assetId,
deviceId,
engineId: newEngineId,
measureNames: measureNames,
user,
})
);
}
}
}

await Promise.all(promises);

recovery.addRecovery(async () => {
const promiseRecoveries: Promise<void>[] = [];

for (const asset of assetLinkedDevices) {
for (const device of asset.linkedDevices) {
const deviceId = device._id;
promiseRecoveries.push(
ask<AskDeviceUnlinkAsset>(
"ask:device-manager:device:unlink-asset",
{
deviceId,
user,
}
)
);
}
}
// Finally here, we can delete the newly create assets in the source engine !
await this.sdk.document.mDelete(
engineId,
InternalCollection.ASSETS,
assetsCreatedId
);

await Promise.all(promiseRecoveries);
});

// clear the groups
await this.sdk.document.mUpdate<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetsList.map((assetId) => ({
_id: assetId,
body: {
groups: [],
},
}))
);
} catch (error) {
await recovery.rollback();
throw new BadRequestError(
`An error occured while migrating assets: ${error}`
);
}
//Refresh ES indexes and collections
const collectionsToRefresh = [
{
collection: InternalCollection.ASSETS,
index: engineId,
},
{
collection: InternalCollection.DEVICES,
index: engineId,
},
{
collection: InternalCollection.ASSETS,
index: newEngineId,
},
{
collection: InternalCollection.DEVICES,
index: newEngineId,
},
{
collection: InternalCollection.DEVICES,
index: this.config.adminIndex,
},
].map(({ index, collection }) => {
return this.sdk.collection.refresh(index, collection);
});

await Promise.all(collectionsToRefresh);
});

return { errors, successes };
}

/**
Expand Down
Loading

0 comments on commit 76663bf

Please sign in to comment.