Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel execution of multi-process action if express request fails #406

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@types/nodemailer": "^4.6.5",
"@types/normalize-url": "3.3.0",
"@types/oboe": "^2.0.28",
"@types/p-cancelable": "^1.0.1",
"@types/raven": "^2.1.4",
"@types/request": "^2.48.4",
"@types/request-promise-native": "^1.0.8",
Expand Down Expand Up @@ -77,6 +78,7 @@
"nodemon": "^1.12.1",
"normalize-url": "4.0.0",
"oboe": "^2.1.4",
"p-cancelable": "^2.1.1",
"queue": "^4.4.2",
"raven": "^2.4.0",
"request": "https://github.com/request/request/archive/392db7d127536ff296fb06492db9430790a32d6c.tar.gz",
Expand Down
11 changes: 5 additions & 6 deletions src/actions/amazon/test_amazon_s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import * as Hub from "../../hub"
import { AmazonS3Action } from "./amazon_s3"

import concatStream = require("concat-stream")
import * as Pcancel from "p-cancelable"

const action = new AmazonS3Action()
const cancel: Pcancel<string>[] = []

export function expectAmazonS3Match(thisAction: AmazonS3Action, request: Hub.ActionRequest, match: any) {

Expand All @@ -26,8 +28,7 @@ export function expectAmazonS3Match(thisAction: AmazonS3Action, request: Hub.Act
}))
const stubSuggestedFilename = sinon.stub(request as any, "suggestedFilename")
.callsFake(() => "stubSuggestedFilename")

return chai.expect(thisAction.validateAndExecute(request)).to.be.fulfilled.then(() => {
return chai.expect(thisAction.validateAndExecute(request, cancel)).to.be.fulfilled.then(() => {
chai.expect(uploadSpy).to.have.been.called
stubClient.restore()
stubSuggestedFilename.restore()
Expand All @@ -49,8 +50,7 @@ describe(`${action.constructor.name} unit tests`, () => {
request.attachment = {}
request.attachment.dataBuffer = Buffer.from("1,2,3,4", "utf8")
request.type = Hub.ActionType.Dashboard

return chai.expect(action.validateAndExecute(request)).to.eventually
return chai.expect(action.validateAndExecute(request, cancel)).to.eventually
.be.rejectedWith("Need Amazon S3 bucket.")
})

Expand All @@ -66,8 +66,7 @@ describe(`${action.constructor.name} unit tests`, () => {
secret_access_key: "mysecret",
region: "us-east-1",
}

return chai.expect(action.validateAndExecute(request)).to.eventually
return chai.expect(action.validateAndExecute(request, cancel)).to.eventually
.be.rejectedWith(
"A streaming action was sent incompatible data. The action must have a download url or an attachment.")
})
Expand Down
4 changes: 3 additions & 1 deletion src/actions/auger/test_auger_train.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as sinon from "sinon"
import * as Hub from "../../hub"
import { Transaction } from "./poller"

import * as Pcancel from "p-cancelable"
import { AugerTrainAction } from "./auger_train"

const action = new AugerTrainAction()
Expand Down Expand Up @@ -56,8 +57,9 @@ describe(`${action.constructor.name} unit tests`, () => {
stubStartProject = sinon.stub(action as any, "startProject")
stubUploadToS3 = sinon.stub(action as any, "uploadToS3")
stubChunkToS3 = sinon.stub(action as any, "chunkToS3")
const cancel: Pcancel<string>[] = []

chai.expect(action.validateAndExecute(request)).to.be.fulfilled.then(() => {
chai.expect(action.validateAndExecute(request, cancel)).to.be.fulfilled.then(() => {
chai.expect(postSpy).to.have.been.called
chai.expect(stubPoller, "stubPoller").to.have.been.called
chai.expect(stubStartProject, "stubStartProject").to.have.been.called
Expand Down
4 changes: 3 additions & 1 deletion src/actions/braze/test_braze.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as sinon from "sinon"

import * as Hub from "../../hub"

import * as Pcancel from "p-cancelable"
import { BrazeAction } from "./braze"

const sampleBrazeData = {
Expand Down Expand Up @@ -34,7 +35,8 @@ function expectBrazeMatch(request: Hub.ActionRequest) {
return {success: true, message: "ok"}
})
const stubPost = sinon.stub(req, "post").callsFake(postSpy)
return chai.expect(action.validateAndExecute(request)).to.be.fulfilled.then(() => {
const cancel: Pcancel<string>[] = []
return chai.expect(action.validateAndExecute(request, cancel)).to.be.fulfilled.then(() => {
chai.expect(postSpy).to.have.been.called
stubPost.restore()
})
Expand Down
4 changes: 3 additions & 1 deletion src/actions/datarobot/test_datarobot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as sinon from "sinon"

import * as Hub from "../../hub"

import * as Pcancel from "p-cancelable"
import { DataRobotAction } from "./datarobot"

const action = new DataRobotAction()
Expand Down Expand Up @@ -38,8 +39,9 @@ describe(`${action.constructor.name} unit tests`, () => {
}
})
stubHttpPost = sinon.stub(httpRequest, "post").callsFake(postSpy)
const cancel: Pcancel<string>[] = []

chai.expect(action.validateAndExecute(request)).to.be.fulfilled.then(() => {
chai.expect(action.validateAndExecute(request, cancel)).to.be.fulfilled.then(() => {
chai.expect(postSpy).to.have.been.called
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import * as Hub from "../../hub"

import { DigitalOceanObjectStorageAction } from "./digitalocean_object_storage"

import * as Pcancel from "p-cancelable"
import { expectAmazonS3Match } from "../amazon/test_amazon_s3"

const action = new DigitalOceanObjectStorageAction()
const cancel: Pcancel<string>[] = []

describe(`${action.constructor.name} unit tests`, () => {

Expand All @@ -25,8 +27,7 @@ describe(`${action.constructor.name} unit tests`, () => {
request.formParams = {}
request.attachment = {}
request.attachment.dataBuffer = Buffer.from("1,2,3,4", "utf8")

return chai.expect(action.validateAndExecute(request)).to.eventually
return chai.expect(action.validateAndExecute(request, cancel)).to.eventually
.be.rejectedWith("Need Amazon S3 bucket.")
})

Expand All @@ -44,8 +45,7 @@ describe(`${action.constructor.name} unit tests`, () => {
secret_access_key: "mysecret",
region: "us-east-1",
}

return chai.expect(action.validateAndExecute(request)).to.eventually
return chai.expect(action.validateAndExecute(request, cancel)).to.eventually
.be.rejectedWith(
"A streaming action was sent incompatible data. The action must have a download url or an attachment.")

Expand Down
4 changes: 3 additions & 1 deletion src/actions/dropbox/test_dropbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as sinon from "sinon"

import * as Hub from "../../hub"

import * as Pcancel from "p-cancelable"
import {ActionCrypto} from "../../hub"
import { DropboxAction } from "./dropbox"

Expand Down Expand Up @@ -73,7 +74,8 @@ describe(`${action.constructor.name} unit tests`, () => {
.callsFake(() => ({
filesUpload: async () => Promise.reject("reject"),
}))
const resp = action.validateAndExecute(request)
const cancel: Pcancel<string>[] = []
const resp = action.validateAndExecute(request, cancel)
chai.expect(resp).to.eventually.deep.equal({
success: false,
state: {data: "reset"},
Expand Down
41 changes: 16 additions & 25 deletions src/actions/google/analytics/test_data_import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { expect } from "chai"
import concatStream = require("concat-stream")
import * as gaxios from "gaxios"
import { analytics_v3, google } from "googleapis"
import * as Pcancel from "p-cancelable"
import * as sinon from "sinon"
import * as Hub from "../../../hub"
import { GoogleOAuthHelper } from "../common/oauth_helper"
Expand All @@ -12,6 +13,7 @@ const teastOAuthClientId = "test_oauth_client_id"
const teastOAuthClientSecret = "test_oauth_client_secret"

const action = new GoogleAnalyticsDataImportAction(teastOAuthClientId, teastOAuthClientSecret)
const cancel: Pcancel<string>[] = []

describe(`${action.constructor.name} class`, () => {
// Use a sandbox to avoid interaction with other test files
Expand Down Expand Up @@ -153,7 +155,7 @@ describe(`${action.constructor.name} class`, () => {
+ " MissingAuthError: User state was missing or did not contain tokens & redirect",
})

const actualResponse = await action.validateAndExecute(request)
const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal(expectedResponse)
})
}
Expand All @@ -171,7 +173,7 @@ describe(`${action.constructor.name} class`, () => {
+ " MissingRequiredParamsError: Did not receive required form param 'dataSourceCompositeId'",
})

const actualResponse = await action.validateAndExecute(request)
const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal(expectedResponse)
})

Expand All @@ -186,7 +188,7 @@ describe(`${action.constructor.name} class`, () => {
+ " MissingRequiredParamsError: Did not receive required form param 'dataSourceSchema'",
})

const actualResponse = await action.validateAndExecute(request)
const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal(expectedResponse)
})
})
Expand All @@ -201,8 +203,7 @@ describe(`${action.constructor.name} class`, () => {
const expectedResponse = makeErrorResponse({withReset: false})
expectedResponse.message = `Error during action setup: ${testException.toString()}`

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal(expectedResponse)
})
})
Expand All @@ -218,8 +219,7 @@ describe(`${action.constructor.name} class`, () => {
const expectedResponse = makeErrorResponse({withReset: false})
expectedResponse.message = `Error during data upload step: ${testException.toString()}`

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal(expectedResponse)
})

Expand All @@ -236,9 +236,7 @@ describe(`${action.constructor.name} class`, () => {

uploadDataStub.resolves({data: {id: "NewUploadIdFromTest"}})

const actualResponse = await action.validateAndExecute(request)

// Check the basics
const actualResponse = await action.validateAndExecute(request, cancel)// Check the basics
expect(actualResponse).to.exist
expect(actualResponse.success).to.be.true
expect(gaClientStub).to.be.calledOnce
Expand All @@ -264,8 +262,7 @@ describe(`${action.constructor.name} class`, () => {
const testException = new TestException("message from uploadData test stub")
uploadDataStub.throws(testException)

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal({
success: false,
message: `Error during data upload step: ${testException.toString()}`,
Expand All @@ -282,8 +279,7 @@ describe(`${action.constructor.name} class`, () => {
const testException = new TestException("message from uploadData test stub")
uploadDataStub.throws(testException)

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.deep.equal({
success: false,
message: `Error during data upload step: ${testException.toString()}`,
Expand All @@ -297,8 +293,7 @@ describe(`${action.constructor.name} class`, () => {

uploadDataStub.resolves({data: {id: "fakeNewIdFromTest"}})

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse).to.have.nested.property("state.data").that.is.a("string")

const state = JSON.parse(actualResponse.state!.data!)
Expand All @@ -319,8 +314,7 @@ describe(`${action.constructor.name} class`, () => {

uploadDataStub.resolves({data: {id: "fakeNewIdFromTest"}})

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse.success).to.be.true
expect(deleteUploadDataStub).to.not.be.called
})
Expand All @@ -332,8 +326,7 @@ describe(`${action.constructor.name} class`, () => {

uploadDataStub.resolves({data: {id: "fakeNewIdFromTest"}})

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse.success).to.be.true
expect(deleteUploadDataStub).to.not.be.called
})
Expand All @@ -343,8 +336,7 @@ describe(`${action.constructor.name} class`, () => {

uploadDataStub.throws(new TestException("message from uploadData test stub"))

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse.success).to.be.false
expect(deleteUploadDataStub).to.not.be.called
})
Expand All @@ -361,8 +353,7 @@ describe(`${action.constructor.name} class`, () => {
{name: "surprise no id"},
]}})

const actualResponse = await action.validateAndExecute(request)

const actualResponse = await action.validateAndExecute(request, cancel)
expect(actualResponse.success).to.be.true
expect(deleteUploadDataStub).to.be.calledOnce
expect(deleteUploadDataStub.getCall(0).args[0]).to.deep.equal({
Expand All @@ -383,7 +374,7 @@ describe(`${action.constructor.name} class`, () => {
const testException = new TestException("message from uploads.list test stub")
uploadsListStub.throws(testException)

const response = await action.validateAndExecute(request)
const response = await action.validateAndExecute(request, cancel)

expect(response).to.be.an.instanceOf(Hub.ActionResponse)
expect(response).to.have.property("success", false)
Expand Down
13 changes: 7 additions & 6 deletions src/actions/google/drive/sheets/test_google_sheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import concatStream = require("concat-stream")

import * as Hub from "../../../../hub"

import * as Pcancel from "p-cancelable"
import { ActionCrypto } from "../../../../hub"
import { GoogleSheetsAction } from "./google_sheets"

const action = new GoogleSheetsAction()
const cancel: Pcancel<string>[] = []
action.executeInOwnProcess = false

const stubFileName = "stubSuggestedFilename"
Expand All @@ -34,8 +36,7 @@ function expectGoogleSheetsMatch(request: Hub.ActionRequest, paramsMatch: any) {
create: createSpy,
},
})

return chai.expect(action.validateAndExecute(request)).to.be.fulfilled.then(() => {
return chai.expect(action.validateAndExecute(request, cancel)).to.be.fulfilled.then(() => {
chai.expect(createSpy).to.have.been.called
stubClient.restore()
})
Expand Down Expand Up @@ -171,7 +172,7 @@ describe(`${action.constructor.name} unit tests`, () => {
state_url: "https://looker.state.url.com/action_hub_state/asdfasdfasdfasdf",
state_json: `{"tokens": {"access_token": "token"}, "redirect": "fake.com"}`,
}
chai.expect(action.validateAndExecute(request)).to.eventually.be.fulfilled.then( () => {
chai.expect(action.validateAndExecute(request, cancel)).to.eventually.be.fulfilled.then( () => {
chai.expect(flushSpy).to.have.been.calledOnce
stubDriveClient.restore()
stubSheetClient.restore()
Expand Down Expand Up @@ -246,7 +247,7 @@ describe(`${action.constructor.name} unit tests`, () => {
state_url: "https://looker.state.url.com/action_hub_state/asdfasdfasdfasdf",
state_json: `{"tokens": {"access_token": "token"}, "redirect": "fake.com"}`,
}
const requestResult = action.validateAndExecute(request)
const requestResult = action.validateAndExecute(request, cancel)
chai.expect(requestResult).to.eventually.have.property("success", true)
.then( () => {
chai.expect(flushSpy).to.have.been.calledOnce
Expand Down Expand Up @@ -318,7 +319,7 @@ describe(`${action.constructor.name} unit tests`, () => {
state_url: "https://looker.state.url.com/action_hub_state/asdfasdfasdfasdf",
state_json: `{"tokens": {"access_token": "token"}, "redirect": "fake.com"}`,
}
chai.expect(action.validateAndExecute(request)).to.eventually.be.fulfilled.then( () => {
chai.expect(action.validateAndExecute(request, cancel)).to.eventually.be.fulfilled.then( () => {
chai.expect(sendSpy).to.have.been.calledOnce
chai.expect(overwriteStub).to.not.have.been.called
stubDriveClient.restore()
Expand Down Expand Up @@ -386,7 +387,7 @@ describe(`${action.constructor.name} unit tests`, () => {
state_url: "https://looker.state.url.com/action_hub_state/asdfasdfasdfasdf",
state_json: `{"tokens": {"access_token": "token"}, "redirect": "fake.com"}`,
}
chai.expect(action.validateAndExecute(request)).to.eventually.be.fulfilled.then( () => {
chai.expect(action.validateAndExecute(request, cancel)).to.eventually.be.fulfilled.then( () => {
chai.expect(sendSpy).to.have.been.calledOnce
stubDriveClient.restore()
stubSheetClient.restore()
Expand Down
Loading