You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
-- 1.create databaseCREATE OR REPLACEDATABASESnowpipe_demo;
-- 2.create a staging table CREATE OR REPLACETABLEStaging_Table (
customer_id Text,
datetime Text,
attd Text,
credit_score Text,
state_id Text,
type Text
);
-- 3. Create a file format
CREATE OR REPLACE FILE FORMAT SNOWPIPE_CSV_Format
TYPE ='CSV'
skip_header =1
field_delimiter =','
FIELD_OPTIONALLY_ENCLOSED_BY ='"';
-- 4. create an external S3 stage
CREATE OR REPLACE STAGE Snowpipe_Demo_Stage
URL="S3landingbucket"
CREDENTIALS = (AWS_KEY_ID ='***************' AWS_SECRET_KEY ='***************+***************')
file_format = SNOWPIPE_CSV_Format;
-- 5. create a snowpipe
CREATE OR REPLACE PIPE Snowpipe_Demo
AUTO_INGEST = TRUE
AS COPY INTO Staging_Table
FROM @Snowpipe_Demo_Stage
FILE_FORMAT = (FORMAT_NAME = SNOWPIPE_CSV_Format);
-- 6.save notification_channel url for S3 Event Notification
SHOW PIPES;
arn:aws:sqs:ap-southeast-2:***************:sf-snowpipe-AIDAXCWW6DZILFHBVQMMV-C8qof2NHZLas0q_FNvbOcw
--------------Table Transformation----------- 7. Create a Target table CREATE OR REPLACETABLETarget_Table(
customer_id Text,
datetime datetime,
attd Float8,
credit_score Text,
state_id Text,
type Text
);
-- 8. Create a View for staging table with data transformationCREATE OR REPLACEVIEWStaging_Table_VIEWAS(
SELECT DISTINCT
CUSTOMER_ID,
CONCAT((split_part(DATETIME, '', 0)||''||split_part(DATETIME, '', 2)))::datetime AS DATETIME,
ATTD::FLOAT8 AS ATTD,
CREDIT_SCORE,
STATE_ID,
TYPE
FROM Staging_Table
);
-- 9. Create STREAM on Staging_Table
CREATE OR REPLACE STREAM Stream_demo ON TABLE Staging_Table;
-- 10. Create STREAM TASK. -- Once there is new data trigger snowpipe, snowpipe will load data into stage table, -- stream task will periodically merge data from stage table to the target table.
CREATE OR REPLACE TASK MERAGE_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE ='60 minute'
WHEN SYSTEM$STREAM_HAS_DATA('Stream_demo') -- Merage into target table by stream AS
MERGE INTO TARGET_TABLE AS DEST
USING Staging_Table_VIEW AS SRC
ONsrc.customer_id=dest.customer_idANDsrc.datetime=dest.datetimeANDsrc.attd=dest.attdANDsrc.credit_score=dest.credit_scoreANDsrc.state_id=dest.state_idANDsrc.type=dest.type
WHEN NOT matched
THEN INSERT (customer_id, datetime, attd, credit_score, state_id, type)
VALUES (src.customer_id, src.datetime, src.attd, src.credit_score, src.state_id, src.type);
//11.RUN STREAM TASK
ALTER TASK MERAGE_TASK RESUME;