Skip to content

Commit

Permalink
When batch-copying import configs, also copy properties files
Browse files Browse the repository at this point in the history
This went down a bit of a rabbit hole and ended up involving a lot
of changes to achieve idempotent copying. Might want to rework the
storage API to not always use multi-part upload, since this results
in eTags that are different from non-multipart uploads.
  • Loading branch information
mikesname committed Oct 7, 2023
1 parent 9fb47a6 commit 94a7084
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 18 deletions.
6 changes: 5 additions & 1 deletion modules/admin/app/assets/js/datasets/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {apiCall} from "./common";
import {
Cleanup,
CleanupSummary,
ConvertConfig,
ConvertConfig, CopyResult,
Coreference,
DataTransformation,
DataTransformationInfo,
Expand Down Expand Up @@ -84,6 +84,10 @@ export class DatasetManagerApi {
return apiCall(this.service.ImportFiles.fileUrls(this.repoId, ds, stage), paths);
}

copyFile(ds: string, stage: string, key: string, toDs: string, toName?: string, versionId?: string): Promise<CopyResult> {
return apiCall(this.service.ImportFiles.copyFile(this.repoId, ds, stage, key, toDs, toName, versionId));
}

uploadHandle(ds: string, stage: string, fileSpec: FileToUpload): Promise<{presignedUrl: string}> {
return apiCall(this.service.ImportFiles.uploadHandle(this.repoId, ds, stage), fileSpec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,13 @@ export default {
let others: ImportDataset[] = this.datasets.filter(s => s.id !== this.copyFrom);
this.inProgress = true;
for (let set of others) {
await this.api.saveConvertConfig(set.id, config)
if (this.cancelled) {
if (this.throwOnError) {
throw new CancelledTask();
}
break;
}
await this.api.saveConvertConfig(set.id, config);
this.println("Copied", set.name);
}
this.cleanup();
Expand All @@ -258,9 +264,27 @@ export default {
let others: ImportDataset[] = this.datasets.filter(s => s.id !== this.copyFrom);
this.inProgress = true;
for (let set of others) {
if (this.cancelled) {
if (this.throwOnError) {
throw new CancelledTask();
}
break;
}
let existing = await this.api.getImportConfig(set.id);
let newConfig = {...config, batchSize: existing?.batchSize} as ImportConfig;
await this.api.saveImportConfig(set.id, newConfig);
if (config.properties) {
// We need to copy the properties file
this.println("Copying properties file to", set.name + "...")
let copyRes = await this.api.copyFile(this.copyFrom, this.config.config, config.properties, set.id);
if (!copyRes.message) {
this.println("Error copying properties file to", set.name);
} else {
this.println("...", copyRes.message);
}
} else {
this.println("No properties file to copy for", config);
}
this.println("Copied", set.name);
}
this.cleanup();
Expand Down
5 changes: 5 additions & 0 deletions modules/admin/app/assets/js/datasets/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ export interface FileInfo {
versions: FileList,
}

export interface CopyResult {
message: string,
url: string
}

export interface XmlValidationError {
line: number,
pos: number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ case class ImportDatasets @Inject()(
controllers.datasets.routes.javascript.ImportFiles.deleteFiles,
controllers.datasets.routes.javascript.ImportFiles.uploadHandle,
controllers.datasets.routes.javascript.ImportFiles.fileUrls,
controllers.datasets.routes.javascript.ImportFiles.copyFile,
controllers.datasets.routes.javascript.HarvestConfigs.harvest,
controllers.datasets.routes.javascript.HarvestConfigs.get,
controllers.datasets.routes.javascript.HarvestConfigs.save,
Expand Down
38 changes: 35 additions & 3 deletions modules/admin/app/controllers/datasets/ImportFiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import akka.util.ByteString
import controllers.AppComponents
import controllers.base.{AdminController, ApiBodyParsers}
import controllers.generic._
import models.{FileStage, _}
import models._
import play.api.cache.AsyncCacheApi
import play.api.http.{ContentTypes, HeaderNames}
import play.api.libs.json.{Format, Json, Writes}
import play.api.libs.streams.Accumulator
import play.api.libs.ws.WSClient
import play.api.mvc._
import services.data.DataHelpers
import services.datasets.ImportDatasetService
Expand All @@ -20,8 +21,8 @@ import services.storage.FileStorage

import java.net.URI
import javax.inject._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}


case class FileToUpload(name: String, `type`: String, size: Long)
Expand All @@ -47,7 +48,8 @@ case class ImportFiles @Inject()(
importLogService: ImportLogService,
asyncCache: AsyncCacheApi,
datasets: ImportDatasetService,
)(implicit mat: Materializer) extends AdminController with ApiBodyParsers with StorageHelpers with Update[Repository] {
ws: WSClient
)(implicit mat: Materializer, executionContext: ExecutionContext) extends AdminController with ApiBodyParsers with StorageHelpers with Update[Repository] {

def listFiles(id: String, ds: String, stage: FileStage.Value, path: Option[String], from: Option[String]): Action[AnyContent] = EditAction(id).async { implicit request =>
storage.listFiles(prefix = Some(prefix(id, ds, stage) + path.getOrElse("")),
Expand All @@ -73,6 +75,36 @@ case class ImportFiles @Inject()(
Ok(Json.toJson(result))
}

def copyFile(id: String, ds: String, stage: FileStage.Value, fileName: String, toDs: String, toName: Option[String], versionId: Option[String]): Action[AnyContent] = EditAction(id).async { implicit request =>
val fromPrefix = prefix(id, ds, stage)
val toPrefix = prefix(id, toDs, stage)
val toFileName = toName.getOrElse(fileName)
val fromPath = fromPrefix + fileName
val toPath = toPrefix + toFileName

storage.info(fromPath, versionId).flatMap {
case Some((srcMeta, _)) =>
storage.info(toPath).flatMap {
case Some((dstMeta, _)) =>
if (srcMeta.eTag.isDefined && srcMeta.eTag == dstMeta.eTag) {
// Files are the same, no need to copy
Future.successful(Ok(Json.obj(
"message" -> "File with identical contents already exists",
"uri" -> storage.uri(dstMeta.key, contentType = dstMeta.contentType, duration = 2.hours))))
} else {
// Copy the file...
storage.copyFile(fromPath, toPath)
.map(uri => Ok(Json.obj("message" -> "File updated", "uri" -> uri)))
}
case _ =>
storage.copyFile(fromPath, toPath)
.map(uri => Created(Json.obj("message" -> "File copied", "uri" -> uri)))
}

case _ => Future.successful(NotFound)
}
}

private def streamToStorage(id: String, ds: String, stage: FileStage.Value, fileName: String): BodyParser[Source[ByteString, _]] = BodyParser { implicit r =>
Accumulator.source[ByteString]
.mapFuture(src => storage.putBytes(s"${prefix(id, ds, stage)}$fileName", src, r.contentType))
Expand Down
1 change: 1 addition & 0 deletions modules/admin/conf/datasets.routes
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DELETE /api/:id/files/delete/:ds/:stage @controllers.datasets.
POST /api/:id/files/upload/:ds/:stage @controllers.datasets.ImportFiles.uploadHandle(id: String, ds: String, stage: models.FileStage.Value)
POST /api/:id/files/validate/:ds/:stage @controllers.datasets.ImportFiles.validateFiles(id: String, ds: String, stage: models.FileStage.Value)
POST /api/:id/files/urls/:ds/:stage @controllers.datasets.ImportFiles.fileUrls(id: String, ds: String, stage: models.FileStage.Value)
POST /api/:id/files/copy/:ds/:stage @controllers.datasets.ImportFiles.copyFile(id: String, ds: String, stage: models.FileStage.Value, fileName: String, toDs: String, toName: Option[String] ?= None, versionId: Option[String] ?= None)

GET /api/:id/transformations @controllers.datasets.DataTransformations.list(id: String)
POST /api/:id/transformations @controllers.datasets.DataTransformations.create(id: String, generic: Boolean ?= false)
Expand Down
9 changes: 9 additions & 0 deletions modules/portal/app/services/storage/FileStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,13 @@ trait FileStorage {
* @return an optional pair of metadata and a byte source
*/
def fromUri(uri: URI): Future[Option[(FileMeta, Source[ByteString, _])]]

/**
* Copy a file from one location to another.
*
* @param path the path of the file to copy
* @param toPath the path to copy the file to
* @return the URI of the copied file
*/
def copyFile(path: String, toPath: String): Future[URI]
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,22 @@ case class S3CompatibleFileStorage(
else Future.successful(Option.empty)
}

override def copyFile(path: String, toPath: String): Future[URI] = {
val request = CopyObjectRequest.builder()
.copySource(s"$name/$path")
.destinationBucket(name)
.destinationKey(toPath)
.build()
client.copyObject(request)
Future.successful(uri(toPath))
}

private def infoToMeta(path: String, meta: ObjectMetadata): FileMeta = FileMeta(
name,
path,
java.time.Instant.ofEpochMilli(meta.lastModified.clicks),
meta.getContentLength,
meta.eTag,
meta.eTag, // NB: the eTag does NOT have quotes!
meta.contentType,
meta.versionId
)
Expand Down
60 changes: 48 additions & 12 deletions test/integration/admin/ImportFilesSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import models.{FileStage, _}
import org.apache.commons.codec.digest.DigestUtils
import play.api.http.{ContentTypes, HeaderNames, MimeTypes, Writeable}
import play.api.libs.json.Json
import play.api.libs.ws.WSResponse
import play.api.mvc.{Headers, Result}
import play.api.test.FakeRequest
import services.storage.{FileList, FileMeta}
Expand Down Expand Up @@ -35,20 +36,22 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {
|""".stripMargin)
private val repoId = "r1"
private val datasetId = "default"
private val stage = FileStage.Input
private val fileStage = FileStage.Input
private val testFileName = "test.xml"

private def testFilePath(implicit app: play.api.Application): String = {
val bucket = app.configuration.get[String]("storage.dam.classifier")
val host = app.configuration.get[String]("storage.dam.config.endpoint-url")
val testPrefix = s"$host/$bucket/$hostInstance/ingest-data/$repoId/$datasetId/$stage/"
val testPrefix = s"$host/$bucket/$hostInstance/ingest-data/$repoId/$datasetId/$fileStage/"
testPrefix + testFileName
}

private implicit val writeBytes: Writeable[ByteString] = new Writeable[ByteString](s => s, Some(ContentTypes.XML))

private def putFile(name: String, payload: ByteString = testPayload)(implicit app: play.api.Application): Future[Result] = {
FakeRequest(repoDataRoutes.uploadStream(repoId, datasetId, stage, name))
private def putFile(name: String, payload: ByteString = testPayload, ds: String = datasetId, stage: FileStage.Value = fileStage)(
implicit app: play.api.Application): Future[Result] = {

FakeRequest(repoDataRoutes.uploadStream(repoId, ds, stage, name))
.withHeaders(Headers(
HeaderNames.CONTENT_TYPE -> ContentTypes.XML,
HeaderNames.HOST -> "localhost"
Expand All @@ -57,10 +60,26 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {
.callWith(payload)
}

private def putFileNoMultiPart(name: String, payload: ByteString = testPayload, ds: String = datasetId, stage: FileStage.Value = fileStage)(
implicit app: play.api.Application): Future[WSResponse] = {
val ws = app.injector.instanceOf[play.api.libs.ws.WSClient]

val r = FakeRequest(repoDataRoutes.uploadHandle(repoId, ds, stage))
.withUser(privilegedUser)
.callWith(Json.toJson(FileToUpload(
name = name,
`type` = ContentTypes.XML,
size = payload.size
)))
val data = contentAsJson(r).as[Map[String, String]]
val uri = data("presignedUrl")
ws.url(uri).withHttpHeaders(CONTENT_TYPE -> ContentTypes.XML).put(payload)
}

"Import Files API" should {

"provide PUT urls" in new ITestApp {
val r = FakeRequest(repoDataRoutes.uploadHandle(repoId, datasetId, stage)).withUser(privilegedUser).callWith(
val r = FakeRequest(repoDataRoutes.uploadHandle(repoId, datasetId, fileStage)).withUser(privilegedUser).callWith(
Json.toJson(FileToUpload(
name = testFileName,
`type` = ContentTypes.XML,
Expand All @@ -80,7 +99,7 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {

"fetch data" in new ITestApp {
await(putFile(testFileName))
val r = FakeRequest(repoDataRoutes.download(repoId, datasetId, stage, testFileName))
val r = FakeRequest(repoDataRoutes.download(repoId, datasetId, fileStage, testFileName))
.withUser(privilegedUser)
.call()

Expand All @@ -90,7 +109,7 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {

"list files" in new ITestApp {
await(putFile(testFileName))
val r = FakeRequest(repoDataRoutes.listFiles(repoId, datasetId, stage))
val r = FakeRequest(repoDataRoutes.listFiles(repoId, datasetId, fileStage))
.withUser(privilegedUser)
.call()

Expand All @@ -101,7 +120,7 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {

"get file info" in new ITestApp {
await(putFile(testFileName))
val r = FakeRequest(repoDataRoutes.info(repoId, datasetId, stage, testFileName))
val r = FakeRequest(repoDataRoutes.info(repoId, datasetId, fileStage, testFileName))
.withUser(privilegedUser)
.call()

Expand All @@ -113,7 +132,7 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {
"delete files" in new ITestApp {
await(putFile(testFileName))
await(putFile(testFileName + "2"))
val r = FakeRequest(repoDataRoutes.deleteFiles(repoId, datasetId, stage))
val r = FakeRequest(repoDataRoutes.deleteFiles(repoId, datasetId, fileStage))
.withUser(privilegedUser)
.callWith(Json.arr(testFileName))

Expand All @@ -123,17 +142,34 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {
"delete all files" in new ITestApp {
await(putFile(testFileName))
await(putFile(testFileName + "2"))
val r = FakeRequest(repoDataRoutes.deleteFiles(repoId, datasetId, stage))
val r = FakeRequest(repoDataRoutes.deleteFiles(repoId, datasetId, fileStage))
.withUser(privilegedUser)
.callWith(Json.arr())

contentAsJson(r) must_== Json.obj("deleted" -> 2)
}

"copy files in an idempotent manner" in new ITestApp {
await(putFileNoMultiPart(testFileName, ds = "ds1", stage = FileStage.Config))
val r = FakeRequest(repoDataRoutes.copyFile(repoId, "ds1", FileStage.Config, fileName = testFileName, toDs = "ds2"))
.withUser(privilegedUser)
.callWith(Json.arr(testFileName))
status(r) must_== CREATED
val data = contentAsJson(r).as[Map[String, String]]
data.get("message") must beSome("File copied")

val r2 = FakeRequest(repoDataRoutes.copyFile(repoId, "ds1", FileStage.Config, fileName = testFileName, toDs = "ds2"))
.withUser(privilegedUser)
.callWith(Json.arr(testFileName))
status(r2) must_== OK
val data2 = contentAsJson(r2).as[Map[String, String]]
data2.get("message") must beSome("File with identical contents already exists")
}

"validate files" in new ITestApp {
await(putFile(testFileName))
val tag = DigestUtils.md5Hex(testPayload.utf8String)
val r = FakeRequest(repoDataRoutes.validateFiles(repoId, datasetId, stage))
val r = FakeRequest(repoDataRoutes.validateFiles(repoId, datasetId, fileStage))
.withUser(privilegedUser)
.callWith(Json.obj(tag -> testFileName))

Expand All @@ -153,7 +189,7 @@ class ImportFilesSpec extends IntegrationTestRunner with ResourceUtils {
}

"give JSON errors for invalid payloads" in new ITestApp {
val r = FakeRequest(repoDataRoutes.validateFiles(repoId, datasetId, stage))
val r = FakeRequest(repoDataRoutes.validateFiles(repoId, datasetId, fileStage))
.withUser(privilegedUser)
.callWith(Json.arr("not", "an", "object"))

Expand Down
5 changes: 5 additions & 0 deletions test/services/storage/MockFileStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,9 @@ case class MockFileStorage(name: String, db: collection.mutable.Map[String, Seq[
override def setVersioned(enabled: Boolean) = Future.successful(())

override def isVersioned = Future.successful(true)

override def copyFile(path: String, toPath: String): Future[URI] = {
val bytes = getF(path).collect { case Version(_, _, bytes) => bytes }
bytes.map(b => putBytes(toPath, Source.single(b), None, meta = Map.empty)).getOrElse(Future.failed(new Exception("File not found")))
}
}
8 changes: 8 additions & 0 deletions test/services/storage/S3CompatibleFileStorageSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class S3CompatibleFileStorageSpec extends PlaySpecification with TestConfigurati

def putTestItems: (FileStorage, Seq[String]) = {
val storage = S3CompatibleFileStorage(config.get[Config]("storage.test"))
await(storage.deleteFilesWithPrefix(""))
await(storage.setVersioned(enabled = true))
val urls = paths.map { path =>
await(storage.putBytes(path, bytes, public = true)).toString
Expand Down Expand Up @@ -121,5 +122,12 @@ class S3CompatibleFileStorageSpec extends PlaySpecification with TestConfigurati
val versions = await(storage.listVersions("baz"))
versions.files.size must beGreaterThan(2)
}

"copy items" in {
val storage = putTestItems._1
await(storage.copyFile("baz", "baz2"))
val items = await(storage.listFiles())
items.files.map(_.key).sorted must_== Seq("bar", "baz", "baz2", "eggs", "spam")
}
}
}

0 comments on commit 94a7084

Please sign in to comment.