Skip to content

Commit

Permalink
Merge pull request #2811 from newrelic/sselrevres
Browse files Browse the repository at this point in the history
Enhanced AWS Lambda serverless functionality
  • Loading branch information
fallwith authored Aug 19, 2024
2 parents 6bb8311 + f3768db commit 4f76606
Show file tree
Hide file tree
Showing 7 changed files with 653 additions and 14 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@

## dev

Version <dev> adds experimental OpenSearch instrumentation, updates framework detection, fixes Falcon dispatcher detection, fixes a bug with Redis instrumentation installation, and addresses a JRuby specific concurrency issue.
Version <dev> enhances support for AWS Lambda functions, adds experimental OpenSearch instrumentation, updates framework detection, fixes Falcon dispatcher detection, fixes a bug with Redis instrumentation installation, and addresses a JRuby specific concurrency issue.

- **Feature: Enhance AWS Lambda function instrumentation**

When utilized via the latest [New Relic Ruby layer for AWS Lambda](https://layers.newrelic-external.com/), the agent now offers enhanced support for AWS Lambda function instrumentation.
* The agent's instrumentation for AWS Lambda functions now supports distributed tracing.
* Web-triggered invocations are now identified as being "web"-based when an API Gateway call is involved, with support for both API Gateway versions 1.0 and 2.0.
* Web-based calls have the HTTP method, URI, and status code recorded.
* The agent now recognizes and reports on 12 separate AWS resources that are capable of triggering a Lambda function invocation: ALB, API Gateway V1, API Gateway V2, CloudFront, CloudWatch Scheduler, DynamoStreams, Firehose, Kinesis, S3, SES, SNS, and SQS.
* The type of the triggering resource and its ARN will be recorded for each resource, and for many of them, extra resource-specific attributes will be recorded as well. For example, Lambda function invocations triggered by S3 bucket activity will now result in the S3 bucket name being recorded.
[PR#2811](https://github.com/newrelic/newrelic-ruby-agent/pull/2811)

- **Feature: Add experimental OpenSearch instrumentation**

Expand Down
248 changes: 236 additions & 12 deletions lib/new_relic/agent/serverless_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

require 'json'
require 'new_relic/base64'
require 'uri'

require_relative 'serverless_handler_event_sources'

module NewRelic
module Agent
class ServerlessHandler
ATTRIBUTE_ARN = 'aws.lambda.arn'
ATTRIBUTE_COLD_START = 'aws.lambda.coldStart'
ATTRIBUTE_REQUEST_ID = 'aws.requestId'
AGENT_ATTRIBUTE_DESTINATIONS = NewRelic::Agent::AttributeFilter::DST_TRANSACTION_TRACER |
NewRelic::Agent::AttributeFilter::DST_TRANSACTION_EVENTS
EXECUTION_ENVIRONMENT = "AWS_Lambda_ruby#{RUBY_VERSION.rpartition('.').first}".freeze
Expand All @@ -22,25 +22,29 @@ class ServerlessHandler
SUPPORTABILITY_METRIC = 'Supportability/AWSLambda/HandlerInvocation'
FUNCTION_NAME = 'lambda_function'
PAYLOAD_VERSION = ENV.fetch('NEW_RELIC_SERVERLESS_PAYLOAD_VERSION', 2)
DIGIT = /\d/
EVENT_SOURCES = NewRelic::Agent::ServerlessHandlerEventSources.to_hash

def self.env_var_set?
ENV.key?(LAMBDA_ENVIRONMENT_VARIABLE)
end

def initialize
@event = nil
@context = nil
@payloads = {}
end

def invoke_lambda_function_with_new_relic(event:, context:, method_name:, namespace: nil)
NewRelic::Agent.increment_metric(SUPPORTABILITY_METRIC)

@context = context
@event, @context = event, context

NewRelic::Agent::Tracer.in_transaction(category: :other, name: function_name) do
add_agent_attributes
NewRelic::Agent::Tracer.in_transaction(category: category, name: function_name) do
prep_transaction

NewRelic::LanguageSupport.constantize(namespace).send(method_name, event: event, context: context)
process_response(NewRelic::LanguageSupport.constantize(namespace)
.send(method_name, event: event, context: context))
end
ensure
harvest!
Expand Down Expand Up @@ -86,6 +90,12 @@ def error_data(errors)

private

def prep_transaction
process_api_gateway_info
process_headers
add_agent_attributes
end

def harvest!
NewRelic::Agent.instance.harvest_and_send_analytic_event_data
NewRelic::Agent.instance.harvest_and_send_custom_event_data
Expand All @@ -109,6 +119,11 @@ def function_name
ENV.fetch(LAMBDA_ENVIRONMENT_VARIABLE, FUNCTION_NAME)
end

def category
@category ||=
@event&.dig('requestContext', 'http', 'method') || @event&.fetch('httpMethod', nil) ? :web : :other
end

def write_output
string = PAYLOAD_VERSION == 1 ? payload_v1 : payload_v2

Expand All @@ -120,7 +135,7 @@ def write_output
"BEGIN PAYLOAD>>>\n#{string}\n<<<END PAYLOAD"
end

def payload_v1
def payload_v1 # New Relic serverless payload v1
payload_hash = {'metadata' => metadata, 'data' => @payloads}
json = NewRelic::Agent.agent.service.marshaller.dump(payload_hash)
gzipped = NewRelic::Agent::NewRelicService::Encoders::Compressed::Gzip.encode(json)
Expand All @@ -129,14 +144,91 @@ def payload_v1
::JSON.dump(array)
end

def payload_v2
def payload_v2 # New Relic serverless payload v2
json = NewRelic::Agent.agent.service.marshaller.dump(@payloads)
gzipped = NewRelic::Agent::NewRelicService::Encoders::Compressed::Gzip.encode(json)
base64_encoded = NewRelic::Base64.strict_encode64(gzipped)
array = [PAYLOAD_VERSION, LAMBDA_MARKER, metadata, base64_encoded]
::JSON.dump(array)
end

def determine_api_gateway_version
return unless @event

version = @event.fetch('version', '')
if version.start_with?('2.')
return 2
elsif version.start_with?('1.')
return 1
end

headers = headers_from_event
return unless headers

if @event.dig('requestContext', 'http', 'path') && @event.dig('requestContext', 'http', 'method')
2
elsif @event.fetch('path', nil) && @event.fetch('httpMethod', nil)
1
end
end

def process_api_gateway_info
api_v = determine_api_gateway_version
return unless api_v

info = api_v == 2 ? info_for_api_gateway_v2 : info_for_api_gateway_v1
info[:query_parameters] = @event.fetch('queryStringParameters', nil)

@http_method = info[:method]
@http_uri = http_uri(info)
end

def http_uri(info)
return unless info[:host] && info[:path]

url_str = "https://#{info[:host]}:#{info[:port]}#{info[:path]}"
if info[:query_parameters]
qp = info[:query_parameters].map { |k, v| "#{k}=#{v}" }.join('&')
url_str += "?#{qp}"
end

URI.parse(url_str)
end

def info_for_api_gateway_v2
ctx = @event.fetch('requestContext', nil)
return {} unless ctx

{method: ctx.dig('http', 'method'),
path: ctx.dig('http', 'path'),
host: ctx.fetch('domainName', @event.dig('headers', 'Host')),
port: @event.dig('headers', 'X-Forwarded-Port') || 443}
end

def info_for_api_gateway_v1
headers = headers_from_event
{method: @event.fetch('httpMethod', nil),
path: @event.fetch('path', nil),
host: headers.fetch('Host', nil),
port: headers.fetch('X-Forwarded-Port', 443)}
end

def process_headers
return unless ::NewRelic::Agent.config[:'distributed_tracing.enabled']

headers = headers_from_event
return unless headers && !headers.empty?

dt_headers = headers.fetch(NewRelic::NEWRELIC_KEY, nil)
return unless dt_headers

::NewRelic::Agent::DistributedTracing::accept_distributed_trace_headers(dt_headers, 'Other')
end

def headers_from_event
@headers ||= @event&.dig('requestContext', 'http') || @event&.dig('headers')
end

def use_named_pipe?
return @use_named_pipe if defined?(@use_named_pipe)

Expand All @@ -146,15 +238,142 @@ def use_named_pipe?
def add_agent_attributes
return unless NewRelic::Agent::Tracer.current_transaction

add_agent_attribute(ATTRIBUTE_COLD_START, true) if cold?
add_agent_attribute(ATTRIBUTE_ARN, @context.invoked_function_arn)
add_agent_attribute(ATTRIBUTE_REQUEST_ID, @context.aws_request_id)
add_agent_attribute('aws.lambda.coldStart', true) if cold?
add_agent_attribute('aws.lambda.arn', @context.invoked_function_arn)
add_agent_attribute('aws.requestId', @context.aws_request_id)

add_event_source_attributes
add_http_attributes if api_gateway_event?
end

def add_http_attributes
return unless category == :web

if @http_uri
add_agent_attribute('uri.host', @http_uri.host)
add_agent_attribute('uri.port', @http_uri.port)
if NewRelic::Agent.instance.attribute_filter.allows_key?('http.url', AttributeFilter::DST_SPAN_EVENTS)
add_agent_attribute('http.url', @http_uri.to_s)
end
end

if @http_method
add_agent_attribute('http.method', @http_method)
add_agent_attribute('http.request.method', @http_method)
end
end

def api_gateway_event?
return false unless @event

# '1.0' for API Gateway V1, '2.0' for API Gateway V2
return true if @event.fetch('version', '').start_with?(DIGIT)

return false unless headers_from_event

# API Gateway V1 - look for toplevel 'path' and 'httpMethod' keys if a version is unset
return true if @event.fetch('path', nil) && @event.fetch('httpMethod', nil)

# API Gateway V2 - look for 'requestContext/http' inner nested 'path' and 'method' keys if a version is unset
return true if @event.dig('requestContext', 'http', 'path') && @event.dig('requestContext', 'http', 'method')

false
end

def add_event_source_attributes
arn = event_source_arn
add_agent_attribute('aws.lambda.eventSource.arn', arn) if arn

info = event_source_event_info
return unless info

add_agent_attribute('aws.lambda.eventSource.eventType', info['name'])

info['attributes'].each do |name, elements|
next if elements.empty?

size = false
if elements.last.eql?('#size')
elements = elements.dup
elements.pop
size = true
end
value = @event.dig(*elements)
value = value.size if size
next unless value

add_agent_attribute(name, value)
end
end

def event_source_arn
return unless @event

# SQS/Kinesis Stream/DynamoDB/CodeCommit/S3/SNS
return event_source_arn_for_records if @event.fetch('Records', nil)

# Kinesis Firehose
ds_arn = @event.fetch('deliveryStreamArn', nil) if @event.fetch('records', nil)
return ds_arn if ds_arn

# ELB
elb_arn = @event.dig('requestContext', 'elb', 'targetGroupArn')
return elb_arn if elb_arn

# (other)
es_arn = @event.dig('resources', 0)
return es_arn if es_arn

NewRelic::Agent.logger.debug 'Unable to determine an event source arn'

nil
end

def event_source_event_info
return unless @event

# if every required key for a source is found, consider that source
# to be a match
EVENT_SOURCES.each_value do |info|
return info unless info['required_keys'].detect { |r| @event.dig(*r).nil? }
end

nil
end

def event_source_arn_for_records
record = @event['Records'].first
unless record
NewRelic::Agent.logger.debug "Unable to find any records in the event's 'Records' array"
return
end

arn = record.fetch('eventSourceARN', nil) || # SQS/Kinesis Stream/DynamoDB/CodeCommit
record.dig('s3', 'bucket', 'arn') || # S3
record.fetch('EventSubscriptionArn', nil) # SNS

unless arn
NewRelic::Agent.logger.debug "Unable to determine an event source arn from the event's 'Records' array"
end

arn
end

def add_agent_attribute(attribute, value)
NewRelic::Agent::Tracer.current_transaction.add_agent_attribute(attribute, value, AGENT_ATTRIBUTE_DESTINATIONS)
end

def process_response(response)
return response unless category == :web && response.respond_to?(:fetch)

http_status = response.fetch(:statusCode, response.fetch('statusCode', nil))
return unless http_status

add_agent_attribute('http.statusCode', http_status)

response
end

def cold?
return @cold if defined?(@cold)

Expand All @@ -163,7 +382,12 @@ def cold?
end

def reset!
@event = nil
@category = nil
@context = nil
@headers = nil
@http_method = nil
@http_uri = nil
@payloads.replace({})
end
end
Expand Down
Loading

0 comments on commit 4f76606

Please sign in to comment.