Skip to content

Commit

Permalink
fix: snowpipe streaming users table skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 30, 2024
1 parent 3a09181 commit c9c93e3
Show file tree
Hide file tree
Showing 22 changed files with 296 additions and 20 deletions.
29 changes: 28 additions & 1 deletion src/v0/destinations/snowpipe_streaming/transform.js
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
module.exports = require('../snowflake/transform');
const transform = require('../snowflake/transform');
const { processWarehouseMessage } = require('../../../warehouse');

const provider = 'snowpipe_streaming';

function process(event) {
const whSchemaVersion = event.request.query.whSchemaVersion || 'v1';
const whIDResolve = event.request.query.whIDResolve === 'true' || false;
const whStoreEvent = event.destination.Config.storeFullEvent === true;
const destJsonPaths = event.destination?.Config?.jsonPaths || '';
return processWarehouseMessage(event.message, {
metadata: event.metadata,
whSchemaVersion,
whStoreEvent,
whIDResolve,
getDataTypeOverride: transform.getDataTypeOverride,
provider,
sourceCategory: event.metadata ? event.metadata.sourceCategory : null,
destJsonPaths,
destConfig: event.destination?.Config,
});
}

module.exports = {
provider,
process,
getDataTypeOverride: transform.getDataTypeOverride,
};
93 changes: 93 additions & 0 deletions src/warehouse/config/ReservedKeywords.json
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,99 @@
"WHERE": true,
"WITH": true
},
"SNOWPIPE_STREAMING": {
"ACCOUNT": true,
"ALL": true,
"ALTER": true,
"AND": true,
"ANY": true,
"AS": true,
"BETWEEN": true,
"BY": true,
"CASE": true,
"CAST": true,
"CHECK": true,
"COLUMN": true,
"CONNECT": true,
"CONNECTION": true,
"CONSTRAINT": true,
"CREATE": true,
"CROSS": true,
"CURRENT": true,
"CURRENT_DATE": true,
"CURRENT_TIME": true,
"CURRENT_TIMESTAMP": true,
"CURRENT_USER": true,
"DATABASE": true,
"DELETE": true,
"DISTINCT": true,
"DROP": true,
"ELSE": true,
"EXISTS": true,
"FALSE": true,
"FOLLOWING": true,
"FOR": true,
"FROM": true,
"FULL": true,
"GRANT": true,
"GROUP": true,
"GSCLUSTER": true,
"HAVING": true,
"ILIKE": true,
"IN": true,
"INCREMENT": true,
"INNER": true,
"INSERT": true,
"INTERSECT": true,
"INTO": true,
"IS": true,
"ISSUE": true,
"JOIN": true,
"LATERAL": true,
"LEFT": true,
"LIKE": true,
"LOCALTIME": true,
"LOCALTIMESTAMP": true,
"MINUS": true,
"NATURAL": true,
"NOT": true,
"NULL": true,
"OF": true,
"ON": true,
"OR": true,
"ORDER": true,
"ORGANIZATION": true,
"QUALIFY": true,
"REGEXP": true,
"REVOKE": true,
"RIGHT": true,
"RLIKE": true,
"ROW": true,
"ROWS": true,
"SAMPLE": true,
"SCHEMA": true,
"SELECT": true,
"SET": true,
"SOME": true,
"START": true,
"TABLE": true,
"TABLESAMPLE": true,
"THEN": true,
"TO": true,
"TRIGGER": true,
"TRUE": true,
"TRY_CAST": true,
"UNION": true,
"UNIQUE": true,
"UPDATE": true,
"USING": true,
"VALUES": true,
"VIEW": true,
"WHEN": true,
"WHENEVER": true,
"WHERE": true,
"WITH": true
},
"CLICKHOUSE": {},
"S3_DATALAKE": {
"ALL": true,
Expand Down
2 changes: 1 addition & 1 deletion src/warehouse/identity.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const _ = require('lodash');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { getVersionedUtils } = require('./util');

const identityEnabledWarehouses = ['snowflake', 'bq'];
const identityEnabledWarehouses = ['snowflake', 'snowpipe_streaming', 'bq'];
const versionedMergePropColumns = {};
const versionedMergeRuleTableNames = {};

Expand Down
13 changes: 10 additions & 3 deletions src/warehouse/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ function stringLikeObjectToString(obj) {
*/
function getColumns(options, event, columnTypes) {
const columns = {};
const uuidTS = options.provider === 'snowflake' ? 'UUID_TS' : 'uuid_ts';
const uuidTS = (options.provider === 'snowflake' || options.provider === 'snowpipe_streaming') ? 'UUID_TS' : 'uuid_ts';
columns[uuidTS] = 'datetime';
// add loaded_at for bq to be segment compatible
if (options.provider === 'bq') {
Expand Down Expand Up @@ -377,6 +377,7 @@ function getColumns(options, event, columnTypes) {

const fullEventColumnTypeByProvider = {
snowflake: 'json',
snowpipe_streaming: 'json',
rs: 'text',
bq: 'string',
postgres: 'json',
Expand Down Expand Up @@ -613,6 +614,13 @@ function enhanceContextWithSourceDestInfo(message, metadata) {
message.context = context;
}

function shouldSkipUsersTable(options) {
return options.provider === 'snowpipe_streaming' ||
options.destConfig?.skipUsersTable ||
options.integrationOptions?.skipUsersTable ||
false;
}

function processWarehouseMessage(message, options) {
const utils = getVersionedUtils(options.whSchemaVersion);
options.utils = utils;
Expand All @@ -638,8 +646,7 @@ function processWarehouseMessage(message, options) {
const eventType = message.type?.toLowerCase();
const skipTracksTable =
options.destConfig?.skipTracksTable || options.integrationOptions.skipTracksTable || false;
const skipUsersTable =
options.destConfig?.skipUsersTable || options.integrationOptions.skipUsersTable || false;
const skipUsersTable = shouldSkipUsersTable(options);
const skipReservedKeywordsEscaping =
options.integrationOptions.skipReservedKeywordsEscaping || false;

Expand Down
4 changes: 2 additions & 2 deletions src/warehouse/v1/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function safeTableName(options, name = '') {
if (tableName === '') {
throw new TransformationError('Table name cannot be empty.');
}
if (provider === 'snowflake') {
if (provider === 'snowflake' || provider === 'snowpipe_streaming') {
tableName = tableName.toUpperCase();
} else if (provider === 'postgres') {
tableName = tableName.substr(0, 63);
Expand Down Expand Up @@ -41,7 +41,7 @@ function safeColumnName(options, name = '') {
if (columnName === '') {
throw new TransformationError('Column name cannot be empty.');
}
if (provider === 'snowflake') {
if (provider === 'snowflake' || provider === 'snowpipe_streaming') {
columnName = columnName.toUpperCase();
} else if (provider === 'postgres') {
columnName = columnName.substr(0, 63);
Expand Down
7 changes: 7 additions & 0 deletions test/__tests__/data/warehouse/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,13 @@ function output(eventType, provider) {
if (provider === "snowflake") {
return _.cloneDeep(sampleEvents[eventType].output.snowflake);
}
if (provider === "snowpipe_streaming") {
if (eventType === 'identify') {
return _.cloneDeep([sampleEvents[eventType].output.snowflake[0]]);
} else {
return _.cloneDeep(sampleEvents[eventType].output.snowflake);
}
}
if (provider === "s3_datalake") {
return _.cloneDeep(sampleEvents[eventType].output.s3_datalake);
}
Expand Down
13 changes: 13 additions & 0 deletions test/__tests__/data/warehouse/integration_options_events.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ const sampleEvents = {
jsonPaths: ["tMap"]
}
},
SNOWPIPE_STREAMING: {
options: {
skipTracksTable: true,
useBlendoCasing: true,
jsonPaths: ["tMap"]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down Expand Up @@ -1639,6 +1646,12 @@ function opOutput(eventType, provider) {
switch (provider) {
case "snowflake":
return _.cloneDeep(sampleEvents[eventType].output.snowflake);
case "snowpipe_streaming":
if (eventType === 'users') {
return _.cloneDeep([sampleEvents[eventType].output.snowflake[0]]);
} else {
return _.cloneDeep(sampleEvents[eventType].output.snowflake);
}
case "s3_datalake":
return _.cloneDeep(sampleEvents[eventType].output.s3_datalake);
case "rs":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"testMap.nestedMap",
"ctestMap.cnestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"PMap.nestedMap",
"CMap.nestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"testMap.nestedMap",
"ctestMap.cnestedMap",
]
}
},
GCS_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"UPMap.nestedMap",
"CTMap.nestedMap",
"TMap.nestedMap",
"CMap.nestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"testMap.nestedMap",
"ctestMap.cnestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"testMap.nestedMap",
".ctestMap.cnestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"UPMap.nestedMap",
"PMap.nestedMap",
"CMap.nestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ module.exports = {
jsonPaths: ["alias.traits.testMap.nestedMap", "alias.context.ctestMap.cnestedMap"]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: ["alias.traits.testMap.nestedMap", "alias.context.ctestMap.cnestedMap"]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ module.exports = {
]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: [
"extract.properties.PMap.nestedMap",
"extract.context.CMap.nestedMap",
]
}
},
S3_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ module.exports = {
jsonPaths: ["group.traits.testMap.nestedMap", "group.context.ctestMap.cnestedMap"]
}
},
SNOWPIPE_STREAMING: {
options: {
jsonPaths: ["group.traits.testMap.nestedMap", "group.context.ctestMap.cnestedMap"]
}
},
GCS_DATALAKE: {
options: {
skipReservedKeywordsEscaping: true
Expand Down
Loading

0 comments on commit c9c93e3

Please sign in to comment.