Skip to content

Commit

Permalink
Add a way of 'harvesting' a set of files via a list of URLs
Browse files Browse the repository at this point in the history
Since there was too much duplication in the different methods for
CRUDing harvest configurations these have been combined into a
single controller which dispatches to the appropriate service as
necessary.

On the frontend there is also a new way of resizing certain modal
dialogs and some other UI tweaks.

Partial fix for EHRI#1403
  • Loading branch information
mikesname committed Feb 8, 2022
1 parent 36fb615 commit 58a3646
Show file tree
Hide file tree
Showing 48 changed files with 1,468 additions and 673 deletions.
14 changes: 14 additions & 0 deletions conf/evolutions/default/1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ CREATE TABLE resourcesync_config (
ON DELETE CASCADE
);

CREATE TABLE import_url_set_config (
repo_id VARCHAR(50) NOT NULL,
import_dataset_id VARCHAR(50) NOT NULL,
url_map JSONB NOT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
comments TEXT,
PRIMARY KEY (repo_id, import_dataset_id),
CONSTRAINT import_url_set_config_repo_id_import_dataset_id
FOREIGN KEY (repo_id, import_dataset_id)
REFERENCES import_dataset (repo_id, id)
ON DELETE CASCADE
);

CREATE TABLE harvest_event (
id SERIAL PRIMARY KEY,
repo_id VARCHAR(50) NOT NULL,
Expand Down Expand Up @@ -357,6 +370,7 @@ DROP TABLE IF EXISTS import_config CASCADE;
DROP TABLE IF EXISTS data_transformation CASCADE;
DROP TABLE IF EXISTS transformation_config CASCADE;
DROP TABLE IF EXISTS harvest_event CASCADE;
DROP TABLE IF EXISTS import_url_set_config CASCADE;
DROP TABLE IF EXISTS resourcesync_config CASCADE;
DROP TABLE IF EXISTS oaipmh_config CASCADE;
DROP TABLE IF EXISTS import_dataset CASCADE;
Expand Down
12 changes: 12 additions & 0 deletions etc/db_migrations/20220207_add_import_url_set_config_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE import_url_set_config (
repo_id VARCHAR(50) NOT NULL,
import_dataset_id VARCHAR(50) NOT NULL,
url_map JSONB NOT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
comments TEXT,
PRIMARY KEY (repo_id, import_dataset_id),
CONSTRAINT import_url_set_config_repo_id_import_dataset_id
FOREIGN KEY (repo_id, import_dataset_id)
REFERENCES import_dataset (repo_id, id)
ON DELETE CASCADE
);
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ case class ResourceSyncHarvester (client: ResourceSyncClient, storage: FileStora
// Either the hash doesn't match or the file's not there yet
// so upload it now...
case _ =>
val bytes = client.get(item)
val bytes = client.get(job.data.config, item)
storage.putBytes(
path,
bytes,
Expand Down
135 changes: 135 additions & 0 deletions modules/admin/app/actors/harvesting/UrlSetHarvester.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package actors.harvesting

import actors.LongRunningJob.Cancel
import actors.harvesting.Harvester.HarvestJob
import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import models.{BasicAuthConfig, UrlNameMap, UrlSetConfig, UserProfile}
import play.api.http.HeaderNames
import play.api.libs.ws.{WSAuthScheme, WSClient}
import services.storage.FileStorage

import java.time.{Duration, LocalDateTime}
import scala.concurrent.Future.{successful => immediate}
import scala.concurrent.{ExecutionContext, Future}


object UrlSetHarvester {

// Internal message we send ourselves
sealed trait UrlSetAction
private case class Fetch(urls: List[UrlNameMap], count: Int, fresh: Int) extends UrlSetAction

/**
* A description of an URL set harvest task.
*
* @param config the endpoint configuration
* @param prefix the path prefix on which to save files, after
* which the item identifier will be appended
*/
case class UrlSetHarvesterData(
config: UrlSetConfig,
prefix: String,
)

/**
* A single harvest job with a unique ID.
*/
case class UrlSetHarvesterJob(repoId: String, datasetId: String, jobId: String, data: UrlSetHarvesterData)
extends HarvestJob
}

case class UrlSetHarvester (client: WSClient, storage: FileStorage)(
implicit userOpt: Option[UserProfile], ec: ExecutionContext) extends Actor with ActorLogging {
import Harvester._
import UrlSetHarvester._
import akka.pattern.pipe

override def receive: Receive = {
// Start the initial harvest
case job: UrlSetHarvesterJob =>
val msgTo = sender()
context.become(running(job, msgTo, 0, 0, LocalDateTime.now()))
msgTo ! Starting
msgTo ! ToDo(job.data.config.urlMap.size)
self ! Fetch(job.data.config.urls.toList, 0, 0)
}


// The harvest is running
def running(job: UrlSetHarvesterJob, msgTo: ActorRef, done: Int, fresh: Int, start: LocalDateTime): Receive = {
// Harvest an individual item
case Fetch(item :: rest, count, fresh) =>
log.debug(s"Calling become with new total: $count")
context.become(running(job, msgTo, count, fresh, start))

copyItem(job, item).map { case (name, isFresh) =>
msgTo ! DoneFile(name)
Fetch(rest, count + 1, if (isFresh) fresh + 1 else fresh)
}.pipeTo(self)

// Finished harvesting this resource list
case Fetch(Nil, done, fresh) =>
msgTo ! Completed(done, fresh, time(start))

// Cancel harvest
case Cancel =>
msgTo ! Cancelled(done, fresh, time(start))

case Failure(e) =>
msgTo ! e

case m =>
log.error(s"Unexpected message: $m: ${m.getClass}")
}

private def copyItem(job: UrlSetHarvesterJob, item: UrlNameMap): Future[(String, Boolean)] = {
// Strip the hostname from the file URL but use the
// rest of the path
val name = item.name
val path = job.data.prefix + name

val req = job.data.config.auth.fold(client.url(item.url)) { case BasicAuthConfig(username, password) =>
client.url(item.url).withAuth(username, password, WSAuthScheme.BASIC)
}

req.head().flatMap { headReq =>
val etag: Option[String] = headReq.headerValues(HeaderNames.ETAG).headOption
val ct: Option[String] = headReq.headerValues(HeaderNames.CONTENT_TYPE).headOption

// file metadata
val meta = Map(
"source" -> "download",
"download-endpoint" -> item.url,
"download-job-id" -> job.jobId,
) ++ etag.map(tag => "hash" -> tag)

log.debug(s"Item: $meta")

storage.info(path).flatMap {

// If it exists and matches we've got nowt to do..
case Some((_, userMeta)) if userMeta.contains("hash") && userMeta.get("hash") == etag =>
immediate(("~ " + name, false))

// Either the hash doesn't match or the file's not there yet
// so upload it now...
case _ =>
val bytes: Future[Source[ByteString, _]] = req.get().map(r => r.bodyAsSource)
bytes.flatMap { src =>
storage.putBytes(
path,
src,
ct,
meta = meta
).map { _ => ("+ " + name, true) }
}
}
}
}

private def time(from: LocalDateTime): Long =
Duration.between(from, LocalDateTime.now()).toMillis / 1000
}
82 changes: 73 additions & 9 deletions modules/admin/app/assets/css/datasets.scss
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,41 @@ $active-table-row: #e7f1ff;
background-color: rgba(0, 0, 0, 0.1);
}

.modal-content.resizable {
min-height: 30rem;
}

.modal-content.resizable > .modal-body {
display: flex;
flex-direction: column;
flex: 1;
}

.modal-resize-handle {
position: absolute;
right: 0;
bottom: 0;
display: inline;
width: 1rem;
height: 1rem;
cursor: nwse-resize;
}

.modal-resize-handle:after {
content:'';
display:block;
border-left: 5px solid transparent;
border-right: 5px solid transparent;
border-bottom: 5px solid $ehri-border-gray;
width: 2px;
height: 5px;
position: absolute;
pointer-events: none;
right: -1px;
bottom: 1px;
transform: rotate(135deg);
}

#stage-tabs,
#dataset-manager-tabs
{
Expand Down Expand Up @@ -531,26 +566,26 @@ $active-table-row: #e7f1ff;
border: 1px solid $ehri-border-gray;
}

.xquery-editor-data {
.tabular-editor-data {
@extend %expanding-column;
@extend %overflow-contents;
overflow-x: unset;
}

.xquery-editor-toolbar {
.tabular-editor-toolbar {
display: flex;
justify-content: right;
}

.xquery-editor-toolbar-info {
.tabular-editor-toolbar-info {
align-self: center;
margin-left: auto;
font-size: $font-size-xs;
color: $text-muted;
padding: $margin-xs;
}

.xquery-editor-header {
.tabular-editor-header {
position: sticky;
top: 0;
background-color: $white;
Expand All @@ -560,18 +595,43 @@ $active-table-row: #e7f1ff;
}
}

.xquery-editor-header,
.xquery-editor-mappings {
.tabular-editor-header,
.tabular-editor-mappings
{
font-size: $font-size-xs;

display: grid;
grid-template-columns: 1fr 1fr 1fr 1fr;

.selected {
background-color: lighten($blue, 60%);
}
}

.xquery-editor .tabular-editor-header,
.xquery-editor .tabular-editor-mappings {
display: grid;
grid-template-columns: repeat(4, 1fr);
}


.urlset-editor {
display: flex;
flex-direction: column;
flex: 1;
}

.urlset-editor-input {
display: flex;
flex: 1;
background-color: $gray-100;
margin-bottom: $margin-sm;
}


.urlset-editor .tabular-editor-header,
.urlset-editor .tabular-editor-mappings {
display: grid;
grid-template-columns: 2fr 1fr;
}

.xslt-editor {
position: relative;
@extend %expanding-column;
Expand Down Expand Up @@ -883,6 +943,10 @@ $active-table-row: #e7f1ff;
padding: $margin-md;
}

.resizable .options-form {
flex: 1;
}

.options-form .small.form-text {
color: $text-muted;
}
Expand Down
4 changes: 2 additions & 2 deletions modules/admin/app/assets/js/datasets/__mocks__/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {ResourceSyncConfig, FileList} from "../types";
import {FileList, HarvestConfig} from "../types";

export class DatasetManagerApi {
constructor(service: object, repoId: string) {
Expand Down Expand Up @@ -26,7 +26,7 @@ export class DatasetManagerApi {
)
}

getSyncConfig(ds: string): Promise<ResourceSyncConfig | null> {
getHarvestConfig(ds: string): Promise<HarvestConfig | null> {
return Promise.resolve(null);
}
}
Loading

0 comments on commit 58a3646

Please sign in to comment.