diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..1767f54 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,54 @@ +name: Deploy Lambda + +on: + push: + branches: + - main + pull_request: + branches: + - main + workflow_dispatch: + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Node.js + uses: actions/setup-node@v4 + with: + node-version: 20 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.11 + + - name: Run python unit tests + run: | + pip install -r requirements.txt + pytest + + - name: Set up AWS configuration + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + + - name: Install dependencies + run: npm i -g serverless@3.x + + - name: Deploy Lambda function + run: | + if [ "${{ github.ref }}" = "refs/heads/main" ]; then + npx serverless deploy --stage prod --verbose + npx serverless info --stage prod + else + npx serverless deploy --stage dev --verbose + npx serverless info --stage dev + fi + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45c34aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg +*/__pycache__/ +src/__pycache__/ + +# Serverless directories +.serverless diff --git a/README.md b/README.md index b8be7a4..25d7337 100644 --- a/README.md +++ b/README.md @@ -1,52 +1,82 @@ -# CloudFront Logs Parser Lambda for OpenSearch + -This repository contains code to parse CloudFront logs from an S3 bucket and send the parsed data to OpenSearch. -## How to Use +# Serverless Framework AWS Python Example -### Step 1: Package Installation +This template demonstrates how to deploy a Python function running on AWS Lambda using the traditional Serverless Framework. The deployed function does not include any event definitions as well as any kind of persistence (database). For more advanced configurations check out the [examples repo](https://github.com/serverless/examples/) which includes integrations with SQS, DynamoDB or examples of functions that are triggered in `cron`-like manner. For details about configuration of specific `events`, please refer to our [documentation](https://www.serverless.com/framework/docs/providers/aws/events/). + +## Usage + +### Deployment + +In order to deploy the example, you need to run the following command: + +``` +$ serverless deploy +``` + +After running deploy, you should see output similar to: + +```bash +Deploying aws-python-project to stage dev (us-east-1) + +✔ Service deployed to stack aws-python-project-dev (112s) + +functions: + hello: aws-python-project-dev-hello (1.5 kB) +``` + +### Invocation + +After successful deployment, you can invoke the deployed function by using the following command: ```bash -mkdir package -pip3 install --target ./package opensearch-py user-agents -cd package -zip -r ../my_deployment_package.zip . -cd .. -zip my_deployment_package.zip lambda_function.py +serverless invoke --function hello ``` -### Step 2: AWS Lambda Deployment -Upload the generated zip file (my_deployment_package.zip) to AWS Lambda. Set the following environment variables in Lambda: +Which should result in response similar to the following: +```json +{ + "statusCode": 200, + "body": "{\"message\": \"Go Serverless v3.0! Your function executed successfully!\", \"input\": {}}" +} +``` + +### Local development -### Step 3: AWS And OpenSearch Credentials -Set the following AWS and OpenSearch credentials as environment variables: +You can invoke your function locally by using the following command: +```bash +serverless invoke local --function hello ``` -aws_access_key = '' -aws_secret_key = '' -opensearch_host = '' -opensearch_username = '' -opensearch_password = '' + +Which should result in response similar to the following: + +``` +{ + "statusCode": 200, + "body": "{\"message\": \"Go Serverless v3.0! Your function executed successfully!\", \"input\": {}}" +} ``` -### Step 4: Set Up S3 Event Trigger -To automatically trigger the Lambda function when new CloudFront logs are added to your S3 bucket, follow these steps: +### Bundling dependencies -1. Navigate to the AWS S3 console. -2. Select the S3 bucket containing your CloudFront logs. -3. Go to the "Properties" tab and click on "Events." -4. Add a new event configuration with the following settings: -5. Event Name: Choose a descriptive name (e.g., "CloudFrontLogsEvent"). -6. Events: Select "PUT" event -7. Prefix: (Optional) Specify a prefix if your CloudFront logs are stored in a specific folder within the bucket. -8. Suffix: (Optional) Specify a suffix if your CloudFront logs have a specific file extension. -9. Click "Add" to save the configuration. - -Now, whenever new CloudFront logs are added to the specified S3 bucket, the Lambda function will be automatically triggered to parse and send the data to OpenSearch. +In case you would like to include third-party dependencies, you will need to use a plugin called `serverless-python-requirements`. You can set it up by running the following command: +```bash +serverless plugin install -n serverless-python-requirements +``` -### Next TODO -1. Use serverless to deploy the lambda -2. Add test cases to the code -3. Create alerting when lambda fails +Running the above will automatically add `serverless-python-requirements` to `plugins` section in your `serverless.yml` file and add it as a `devDependency` to `package.json` file. The `package.json` file will be automatically created if it doesn't exist beforehand. Now you will be able to add your dependencies to `requirements.txt` file (`Pipfile` and `pyproject.toml` is also supported but requires additional configuration) and they will be automatically injected to Lambda package during build process. For more details about the plugin's configuration, please refer to [official documentation](https://github.com/UnitedIncome/serverless-python-requirements). diff --git a/README_old.md b/README_old.md new file mode 100644 index 0000000..b8be7a4 --- /dev/null +++ b/README_old.md @@ -0,0 +1,52 @@ +# CloudFront Logs Parser Lambda for OpenSearch + +This repository contains code to parse CloudFront logs from an S3 bucket and send the parsed data to OpenSearch. + +## How to Use + +### Step 1: Package Installation + +```bash +mkdir package +pip3 install --target ./package opensearch-py user-agents +cd package +zip -r ../my_deployment_package.zip . +cd .. +zip my_deployment_package.zip lambda_function.py +``` + +### Step 2: AWS Lambda Deployment +Upload the generated zip file (my_deployment_package.zip) to AWS Lambda. Set the following environment variables in Lambda: + + +### Step 3: AWS And OpenSearch Credentials +Set the following AWS and OpenSearch credentials as environment variables: + +``` +aws_access_key = '' +aws_secret_key = '' +opensearch_host = '' +opensearch_username = '' +opensearch_password = '' +``` + +### Step 4: Set Up S3 Event Trigger +To automatically trigger the Lambda function when new CloudFront logs are added to your S3 bucket, follow these steps: + +1. Navigate to the AWS S3 console. +2. Select the S3 bucket containing your CloudFront logs. +3. Go to the "Properties" tab and click on "Events." +4. Add a new event configuration with the following settings: +5. Event Name: Choose a descriptive name (e.g., "CloudFrontLogsEvent"). +6. Events: Select "PUT" event +7. Prefix: (Optional) Specify a prefix if your CloudFront logs are stored in a specific folder within the bucket. +8. Suffix: (Optional) Specify a suffix if your CloudFront logs have a specific file extension. +9. Click "Add" to save the configuration. + +Now, whenever new CloudFront logs are added to the specified S3 bucket, the Lambda function will be automatically triggered to parse and send the data to OpenSearch. + + +### Next TODO +1. Use serverless to deploy the lambda +2. Add test cases to the code +3. Create alerting when lambda fails diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..ab4c198 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +python_paths = src diff --git a/requirements-lambda.txt b/requirements-lambda.txt new file mode 100644 index 0000000..6cbfe42 --- /dev/null +++ b/requirements-lambda.txt @@ -0,0 +1,2 @@ +opensearch-py +user-agents \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9335912 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +boto3 +opensearch-py +user-agents +pytest diff --git a/serverless.yml b/serverless.yml new file mode 100644 index 0000000..835e1c7 --- /dev/null +++ b/serverless.yml @@ -0,0 +1,44 @@ +frameworkVersion: "3" + +service: cloudfront-logs-parser-lambda-opensearch +useDotenv: true + +provider: + name: aws + runtime: python3.11 + environment: + PYTHONPATH: ./src + AWS_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY }} + AWS_SECRET_KEY: ${{ secrets.AWS_SECRET_KEY }} + OPENSEARCH_HOST: ${{ secrets.OPENSEARCH_HOST }} + OPENSEARCH_USERNAME: ${{ secrets.OPENSEARCH_USERNAME }} + OPENSEARCH_PASSWORD: ${{ secrets.OPENSEARCH_PASSWORD }} + region: us-west-2 + + +package: + patterns: + # exclude everything + - "!./**" + # include only what's needed + - src/cloudfront_parser.py + - src/lambda_handler.py + - src/version.py + +functions: + genlicense: + handler: src/lambda_handler.handler + events: + - s3: + bucket: your-s3-bucket-name + event: s3:ObjectCreated:* + rules: + - prefix: path/to/your/logs/ + suffix: .gz + +plugins: + - serverless-python-requirements + +custom: + pythonRequirements: + fileName: requirements-lambda.txt diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cloudfront_lambda.py b/src/cloudfront_parser.py similarity index 69% rename from cloudfront_lambda.py rename to src/cloudfront_parser.py index f41f8a0..2e3304d 100644 --- a/cloudfront_lambda.py +++ b/src/cloudfront_parser.py @@ -1,12 +1,7 @@ -import boto3 -import gzip -import json +import urllib.parse import re from datetime import datetime -from opensearchpy import OpenSearch -import urllib.parse import user_agents -import os def parse_cloudfront_logs(log_file): @@ -150,63 +145,3 @@ def parse_cloudfront_logs(log_file): bulk_logs.append({"index": {"_index": opensearch_index}}) bulk_logs.append(log_data) return bulk_logs - - -def lambda_handler(event, context): - try: - # AWS credentials from env variables - aws_access_key = os.environ['aws_access_key'] - aws_secret_key = os.environ['aws_secret_key'] - - # OpenSearch credentials from env variables - opensearch_host = os.environ['opensearch_host'] - opensearch_username = os.environ['opensearch_username'] - opensearch_password = os.environ['opensearch_password'] - - - # Get the S3 bucket and key from the S3 event - bucket = event['Records'][0]['s3']['bucket']['name'] - key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') - - # Initialize S3 client - s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) - - opensearch = OpenSearch( - hosts=[{'host': opensearch_host, 'port': 443}], - http_auth=(opensearch_username, opensearch_password), - scheme="https", - timeout=30, - max_retries=3, - retry_on_timeout=True - ) - - try: - object = s3_client.get_object(Bucket=bucket, Key=key)["Body"] - batch_size = 2000 - with gzip.open(object) as log_file: - log_data = parse_cloudfront_logs(log_file) - - # Loop through the array in batches of 2000 - for i in range(0, len(log_data), batch_size): - batch = log_data[i:i + batch_size] - opensearch.bulk(body=batch) - - - except Exception as e: - return { - 'statusCode': 500, - 'body': json.dumps(f'Error: {e}') - } - - - return { - 'statusCode': 200, - 'body': json.dumps(f'CloudFront logs for ingested into OpenSearch successfully!') - } - - except Exception as e: - print(f"Unexpected error: {e}") - return { - 'statusCode': 500, - 'body': json.dumps(f'Error: {e}') - } diff --git a/src/lambda_handler.py b/src/lambda_handler.py new file mode 100644 index 0000000..efdb08f --- /dev/null +++ b/src/lambda_handler.py @@ -0,0 +1,66 @@ +import json +import boto3 +import os +import urllib.parse +import gzip +from opensearchpy import OpenSearch +from src.cloudfront_parser import parse_cloudfront_logs + +def lambda_handler(event, context): + try: + # AWS credentials from env variables + aws_access_key = os.environ['aws_access_key'] + aws_secret_key = os.environ['aws_secret_key'] + + # OpenSearch credentials from env variables + opensearch_host = os.environ['opensearch_host'] + opensearch_username = os.environ['opensearch_username'] + opensearch_password = os.environ['opensearch_password'] + + + # Get the S3 bucket and key from the S3 event + bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + + # Initialize S3 client + s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) + + opensearch = OpenSearch( + hosts=[{'host': opensearch_host, 'port': 443}], + http_auth=(opensearch_username, opensearch_password), + scheme="https", + timeout=30, + max_retries=3, + retry_on_timeout=True + ) + + try: + object = s3_client.get_object(Bucket=bucket, Key=key)["Body"] + batch_size = 2000 + with gzip.open(object) as log_file: + log_data = parse_cloudfront_logs(log_file) + + # Loop through the array in batches of 2000 + for i in range(0, len(log_data), batch_size): + batch = log_data[i:i + batch_size] + opensearch.bulk(body=batch) + + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps(f'Error: {e}') + } + + + return { + 'statusCode': 200, + 'body': json.dumps(f'CloudFront logs for ingested into OpenSearch successfully!') + } + + except Exception as e: + print(f"Unexpected error: {e}") + return { + 'statusCode': 500, + 'body': json.dumps(f'Error: {e}') + } diff --git a/src/version.py b/src/version.py new file mode 100644 index 0000000..5becc17 --- /dev/null +++ b/src/version.py @@ -0,0 +1 @@ +__version__ = "1.0.0" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/sample_cloudfront_logs.gz b/tests/fixtures/sample_cloudfront_logs.gz new file mode 100644 index 0000000..5699566 Binary files /dev/null and b/tests/fixtures/sample_cloudfront_logs.gz differ diff --git a/tests/test_cloudfront_parser.py b/tests/test_cloudfront_parser.py new file mode 100644 index 0000000..d0c7b8c --- /dev/null +++ b/tests/test_cloudfront_parser.py @@ -0,0 +1,19 @@ +import gzip +import io +from datetime import datetime +from src.cloudfront_parser import parse_cloudfront_logs + +import pytest + +@pytest.fixture(scope="module") +def sample_log_file_path(): + return "tests/fixtures/sample_cloudfront_logs.gz" + +def test_parse_cloudfront_logs(sample_log_file_path): + with gzip.open(sample_log_file_path, 'rb') as f: + logs = parse_cloudfront_logs(f) + + assert logs[1]['x-edge-location'] == 'ATL56-C2' + assert logs[1]['cs-method'] == 'GET' + assert logs[1]['cs-uri-stem'] == '/api/auth/user' + assert logs[1]['sc-bytes'] == 252 diff --git a/tests/test_lambda_handler.py b/tests/test_lambda_handler.py new file mode 100644 index 0000000..131554a --- /dev/null +++ b/tests/test_lambda_handler.py @@ -0,0 +1,53 @@ +import json +from unittest.mock import patch, MagicMock +from src.lambda_handler import lambda_handler + +import pytest + +@pytest.fixture +def mock_environment_variables(): + return { + 'aws_access_key': 'fake_access_key', + 'aws_secret_key': 'fake_secret_key', + 'opensearch_host': 'fake_host', + 'opensearch_username': 'fake_username', + 'opensearch_password': 'fake_password', + 'aws_access_key': 'fake_access', + 'aws_secret_key': 'fake_secret', + } + +@pytest.fixture +def event(): + return { + 'Records': [{ + 's3': { + 'bucket': {'name': 'test_bucket'}, + 'object': {'key': 'test_key.gz'} + } + }] + } + +@pytest.fixture +def context(): + return {} + +# Add happy path test + +# def test_lambda_handler_success(event, context, mock_environment_variables): +# with patch.dict('os.environ', mock_environment_variables): +# with patch('src.lambda_handler.boto3.client') as mock_s3_client: +# with patch('src.lambda_handler.OpenSearch') as mock_opensearch: +# mock_s3_client.return_value.get_object.return_value = {"Body": MagicMock()} +# mock_opensearch.return_value.bulk.return_value = None +# result = lambda_handler(event, None) +# assert result['statusCode'] == 200 +# assert 'CloudFront logs ingested into OpenSearch successfully!' in result['body'] + +def test_lambda_handler_missing_environment_variables(event, context): + with patch.dict('os.environ', {}): + result = lambda_handler(event, context) + + assert result['statusCode'] == 500 + + +