-
Notifications
You must be signed in to change notification settings - Fork 0
/
Netflix Rating System Snowflake
258 lines (204 loc) · 8.78 KB
/
Netflix Rating System Snowflake
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
//==============CREATING SNOWPIPE FOR DYNAMIC USER RATING INFORMATION==========
//=============================================================================
// CREATE WAREHOUSE
//=============================================================================
CREATE WAREHOUSE IF NOT EXISTS NEW_WAREHOUSE
WITH WAREHOUSE_SIZE = XSMALL
SCALING_POLICY = STANDARD
AUTO_RESUME = TRUE
COMMENT = 'To be used for Task of Cleaned Table';
//=============================================================================
// CREATE TRANSIENT DATABASE
//=============================================================================
CREATE OR REPLACE DATABASE NEW_DB;
//=============================================================================
// CREATE SCHEMA
//=============================================================================
CREATE SCHEMA NETFLIX_SCHEMA;
//=============================================================================
// SPECIFY WHICH DB AND SCHEMA TO USE
//=============================================================================
USE NEW_DB.NETFLIX_SCHEMA;
//=============================================================================
// CREATE TRANSIENT STAGING TABLE
//=============================================================================
CREATE OR REPLACE TRANSIENT TABLE TRANSIENT_NETFLIX_RATING_EVENTS(
RAW_DATA VARIANT
);
--TEST
//CREATE OR REPLACE TRANSIENT TABLE TEST_NETFLIX_RATING_EVENTS(
// userId NUMBER,
// movieId NUMBER,
// rating DECIMAL,
// timestamp DATETIME
// );
//=============================================================================
// S3 INTEGRATION (to avoid providing AWS Credentials)
//=============================================================================
CREATE OR REPLACE STORAGE INTEGRATION snowparkdemo_s3_int
TYPE = external_stage
STORAGE_PROVIDER = s3
ENABLED = true
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::745482211359:role/snowflakedemo_role'
STORAGE_ALLOWED_LOCATIONS = ('s3://snowspark-snowflake', 's3://netflix-rating');
//=============================================================================
// CHECK arn and EXTERNAL ID (For updating the TRUST RELATIONSHIP in AWS Role)
//=============================================================================
DESC INTEGRATION snowparkdemo_s3_int;
//=============================================================================
// FILE FORMAT
//=============================================================================
CREATE OR REPLACE FILE FORMAT NEW_DB.NETFLIX_SCHEMA.JSON_FORMAT
TYPE = JSON;
CREATE OR REPLACE file format NEW_DB.NETFLIX_SCHEMA.COMMA_SEPARATED_FORMAT
type = CSV
skip_header = 1
null_if = ('null', 'null')
empty_field_as_null = true;
//=============================================================================
// SETUP EXTERNAL STAGE
//=============================================================================
//FOR REFERENCE TABLE
CREATE OR REPLACE STAGE NEW_DB.NETFLIX_SCHEMA.TRANSIENT_EXT_STAGE
url = 's3://netflix-rating/'
storage_integration = snowparkdemo_s3_int
file_format = NEW_DB.NETFLIX_SCHEMA.JSON_FORMAT;
//
//=============================================================================
// CREATE PIPE
//=============================================================================
//===========For reference table===============================================
CREATE OR REPLACE PIPE NEW_DB.NETFLIX_SCHEMA.TR_PIPE
auto_ingest=true
AS
COPY INTO NEW_DB.NETFLIX_SCHEMA.TRANSIENT_NETFLIX_RATING_EVENTS from @NEW_DB.NETFLIX_SCHEMA.TRANSIENT_EXT_STAGE on_error = continue;
//=============================================================================
// CHECK PIPE FOR NOTIFICATION CHANNEL
//=============================================================================
SHOW PIPES;
SELECT * FROM NEW_DB.NETFLIX_SCHEMA.TRANSIENT_NETFLIX_RATING_EVENTS;
SELECT SYSTEM$PIPE_STATUS('NEW_DB.NETFLIX_SCHEMA.TR_PIPE');
REMOVE @%TRANSIENT_NETFLIX_RATING_EVENTS;
--SELECT * FROM TABLE(COPY_HISTORY(TABLE_NAME =>'SNOWPARK_DEMO_TABLE', START_TIME => '2022-07-08T18:03:52.437Z'));
//=============================================================================
// TO RESUME OR PAUSE PIPE EXECUTION
//=============================================================================
ALTER PIPE NEW_DB.NETFLIX_SCHEMA.TR_PIPE SET PIPE_EXECUTION_PAUSED=False;
ALTER PIPE NEW_DB.NETFLIX_SCHEMA.TR_PIPE REFRESH;
//==============CREATING REQUIRED TABLES AND STREAMS===========================
//=============================================================================
// Set context
//=============================================================================
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE NEW_WAREHOUSE;
USE DATABASE NEW_DB;
USE SCHEMA NETFLIX_SCHEMA;
//=============================================================================
//=============================================================================
// Create tables and add show data
//=============================================================================
// CREATE OR REPLACE TABLE
//TRANSIENT_NETFLIX_RATING_EVENTS(
//RAW_DATA VARIANT); -- Created already
CREATE OR REPLACE TABLE
NETFLIX_SHOWS(
TITLE STRING,
BUDGET NUMBER,
ID NUMBER,
IMDB_ID VARCHAR,
ORIGINAL_LANGUAGE STRING,
POPULARITY DECIMAL,
RELEASE_DATE DATE,
REVENUE NUMBER,
RUNTIME NUMBER,
VOTE_AVERAGE DECIMAL,
VOTE_COUNT NUMBER
);
CREATE OR REPLACE TABLE //END RESULT TABLE
NETFLIX_RATINGS(
SHOW_ID NUMBER,
SHOW_TITLE STRING,
RATING DECIMAL,
USERID NUMBER
);
//=============================================================================
//=============================================================================
// Create streams on the NETFLIX_RATING_EVENTS table
//=============================================================================
CREATE OR REPLACE STREAM STREAM_A ON TABLE TRANSIENT_NETFLIX_RATING_EVENTS;
CREATE OR REPLACE STREAM STREAM_B ON TABLE TRANSIENT_NETFLIX_RATING_EVENTS;
SHOW STREAMS;
//=============================================================================
//=============================================================================
// Modify NETFLIX_RATING_EVENTS and examine streams
//=============================================================================
--INSERT INTO TRANSIENT_NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 0, "rating": 10 } ');
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING, USERID) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:"rating" AS RATING,
STREAM_A.RAW_DATA:"userId" AS USERID
FROM
NETFLIX_SHOWS
JOIN STREAM_A
ON NETFLIX_SHOWS.ID=STREAM_A.RAW_DATA:"movieId"
WHERE
STREAM_A.RAW_DATA:"rating" IS NOT NULL
);
SELECT * FROM STREAM_A; //STREAMS ONLY FLUSH THEMSELVES OUT WHEN DATA IS BEING SUCKED (READ) FROM IT.
SELECT * FROM STREAM_B;
//=============================================================================
// Automate stream ingestion with a task
//=============================================================================
CREATE OR REPLACE TASK NETFLIX_RATINGS_EVENT_PROCESSOR
WAREHOUSE = NEW_WAREHOUSE
SCHEDULE = 'USING CRON * * * * * America/Chicago' // process new records every minute
WHEN
SYSTEM$STREAM_HAS_DATA('STREAM_A')
AS
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING, USERID) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:"rating" AS RATING,
STREAM_A.RAW_DATA:"userId" AS USERID
FROM
NETFLIX_SHOWS
JOIN STREAM_A
ON NETFLIX_SHOWS.ID=STREAM_A.RAW_DATA:"movieId"
WHERE
STREAM_A.RAW_DATA:"rating" IS NOT NULL
);
// Tasks are suspended by default. Resume the task so it will run on schedule
ALTER TASK NETFLIX_RATINGS_EVENT_PROCESSOR RESUME;
// add new events
--Upload new file on S3
//Check count to see if data got ingested in Netflix_Ratings Table.
SELECT COUNT(*) FROM NETFLIX_RATINGS;
// Find the best show on netflix
SELECT
SHOW_TITLE,
AVG(RATING) AS AVG_RATING
FROM
NETFLIX_RATINGS
GROUP BY
SHOW_TITLE
ORDER BY
AVG_RATING DESC
LIMIT 1;
//=============================================================================
//=============================================================================
// Cleanup
//=============================================================================
DROP TASK IF EXISTS NETFLIX_RATINGS_EVENT_PROCESSOR;
DROP TABLE IF EXISTS NETFLIX_SHOWS;
DROP TABLE IF EXISTS TRANSIENT_NETFLIX_RATING_EVENTS;
DROP TABLE IF EXISTS NETFLIX_RATINGS;
DROP STREAM IF EXISTS STREAM_A;
DROP STREAM IF EXISTS STREAM_B;
DROP SCHEMA IF EXISTS NETFLIX_SCHEMA;
//=============================================================================
USE NEW_DB.NETFLIX_SCHEMA;