DynamoDB Stream captures item-level changes in the DynamoDb table. Then, Kinesis Data Stream and Firehose save the changes to an S3 bucket. A lambda function transforms the data before dumping it into S3.
Create a customer table with a customer_id as the primary key,
# This function adds a new line in between each record coming from Kenesis data stream
import json
import boto3
import base64
output = []
def lambda_handler(event, context):
print(event)
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)
row_w_newline = payload + "\n"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Configure the Buffer size and interval to dump the data only if the size is one MB or in 60 seconds.
INSERT INTO "customers" value {'customers_id':1, 'name':'Baba Li', 'age':20,'gender':'M'}
INSERT INTO "customers" value {'customers_id':2, 'name':'Lucky Bill', 'age':24,'gender':'M'}
INSERT INTO "customers" value {'customers_id':3, 'name':'Mom Ma', 'age':50,'gender':'F'}
INSERT INTO "customers" value {'customers_id':4, 'name':'Locker Su', 'age':30,'gender':'M'}
INSERT INTO "customers" value {'customers_id':5, 'name':'Abdel ly', 'age':41,'gender':'F'}
INSERT INTO "customers" value {'customers_id':6, 'name':'Abou Sar', 'age':35,'gender':'F'}
update customers set age=26 where customers_id=3
select * from customers;