Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced AWS Lambda serverless functionality #2811

Merged
merged 14 commits into from
Aug 19, 2024
Merged
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

## dev

Version <dev> adds experimental OpenSearch instrumentation, updates framework detection, fixes Falcon dispatcher detection, and addresses a JRuby specific concurrency issue.
Version <dev> enhances support for AWS Lambda functions, adds experimental OpenSearch instrumentation, updates framework detection, fixes Falcon dispatcher detection, and addresses a JRuby specific concurrency issue.

- **Feature: Enhanced AWS Lambda function instrumentation**
fallwith marked this conversation as resolved.
Show resolved Hide resolved

When utilized via the latest [New Relic Ruby layer for AWS Lambda](https://layers.newrelic-external.com/), the agent now offers enhanced support for AWS Lambda function instrumentation. The agent's instrumentation for AWS Lambda functions now supports distributed tracing. Web triggered invocations are now identified as being "web" based when an API Gateway call is involved and both API Gateway versions 1.0 and 2.0 are supported. Web based calls have the HTTP method, URI, and status code recorded. The agent will now recognize and report on fully 12 separate AWS resources that are capable of triggering a Lambda function invocation: ALB, API Gateway V1, API Gateway V2, CloudFront, CloudWatch Scheduler, DynamoStreams, Firehose, Kinesis, S3, SES, SNS, and SQS. The type of the triggering resource and its arn will be recorded for each resource and for many of them extra resource specific attributes will be recorded as well. For example, Lambda function invocations triggered by S3 bucket activity will now result in the S3 bucket name being recorded. [PR#2811](https://github.com/newrelic/newrelic-ruby-agent/pull/2811)
fallwith marked this conversation as resolved.
Show resolved Hide resolved

- **Feature: Add experimental OpenSearch instrumentation**

Expand Down
249 changes: 237 additions & 12 deletions lib/new_relic/agent/serverless_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

require 'json'
require 'new_relic/base64'
require 'uri'

require_relative 'serverless_handler_event_sources'

module NewRelic
module Agent
class ServerlessHandler
ATTRIBUTE_ARN = 'aws.lambda.arn'
ATTRIBUTE_COLD_START = 'aws.lambda.coldStart'
ATTRIBUTE_REQUEST_ID = 'aws.requestId'
AGENT_ATTRIBUTE_DESTINATIONS = NewRelic::Agent::AttributeFilter::DST_TRANSACTION_TRACER |
NewRelic::Agent::AttributeFilter::DST_TRANSACTION_EVENTS
EXECUTION_ENVIRONMENT = "AWS_Lambda_ruby#{RUBY_VERSION.rpartition('.').first}".freeze
Expand All @@ -22,25 +22,29 @@ class ServerlessHandler
SUPPORTABILITY_METRIC = 'Supportability/AWSLambda/HandlerInvocation'
FUNCTION_NAME = 'lambda_function'
PAYLOAD_VERSION = ENV.fetch('NEW_RELIC_SERVERLESS_PAYLOAD_VERSION', 2)
DIGIT = /\d/.freeze
fallwith marked this conversation as resolved.
Show resolved Hide resolved
EVENT_SOURCES = NewRelic::Agent::ServerlessHandlerEventSources.to_hash

def self.env_var_set?
ENV.key?(LAMBDA_ENVIRONMENT_VARIABLE)
end

def initialize
@event = nil
@context = nil
@payloads = {}
end

def invoke_lambda_function_with_new_relic(event:, context:, method_name:, namespace: nil)
NewRelic::Agent.increment_metric(SUPPORTABILITY_METRIC)

@context = context
@event, @context = event, context
fallwith marked this conversation as resolved.
Show resolved Hide resolved

NewRelic::Agent::Tracer.in_transaction(category: :other, name: function_name) do
add_agent_attributes
NewRelic::Agent::Tracer.in_transaction(category: category, name: function_name) do
prep_transaction

NewRelic::LanguageSupport.constantize(namespace).send(method_name, event: event, context: context)
process_response(NewRelic::LanguageSupport.constantize(namespace)
.send(method_name, event: event, context: context))
end
ensure
harvest!
Expand Down Expand Up @@ -86,6 +90,12 @@ def error_data(errors)

private

def prep_transaction
process_api_gateway_info
process_headers
add_agent_attributes
end

def harvest!
NewRelic::Agent.instance.harvest_and_send_analytic_event_data
NewRelic::Agent.instance.harvest_and_send_custom_event_data
Expand All @@ -109,6 +119,11 @@ def function_name
ENV.fetch(LAMBDA_ENVIRONMENT_VARIABLE, FUNCTION_NAME)
end

def category
@category ||=
@event&.dig('requestContext', 'http', 'method') || @event&.fetch('httpMethod', nil) ? :web : :other
end

def write_output
string = PAYLOAD_VERSION == 1 ? payload_v1 : payload_v2

Expand All @@ -120,7 +135,7 @@ def write_output
"BEGIN PAYLOAD>>>\n#{string}\n<<<END PAYLOAD"
end

def payload_v1
def payload_v1 # New Relic serverless payload v1
payload_hash = {'metadata' => metadata, 'data' => @payloads}
json = NewRelic::Agent.agent.service.marshaller.dump(payload_hash)
gzipped = NewRelic::Agent::NewRelicService::Encoders::Compressed::Gzip.encode(json)
Expand All @@ -129,14 +144,92 @@ def payload_v1
::JSON.dump(array)
end

def payload_v2
def payload_v2 # New Relic serverless payload v2
json = NewRelic::Agent.agent.service.marshaller.dump(@payloads)
gzipped = NewRelic::Agent::NewRelicService::Encoders::Compressed::Gzip.encode(json)
base64_encoded = NewRelic::Base64.strict_encode64(gzipped)
array = [PAYLOAD_VERSION, LAMBDA_MARKER, metadata, base64_encoded]
::JSON.dump(array)
end

def determine_api_gateway_version
return unless @event

version = @event.fetch('version', '')
if version.start_with?('2.')
return 2
elsif version.start_with?('1.')
return 1
end

headers = headers_from_event
return unless headers

if @event.dig('requestContext', 'http', 'path') && @event.dig('requestContext', 'http', 'method')
2
elsif @event.fetch('path', nil) && @event.fetch('httpMethod', nil)
1
end
end

def process_api_gateway_info
api_v = determine_api_gateway_version
return unless api_v

info = api_v == 2 ? info_for_api_gateway_v2 : info_for_api_gateway_v1
info[:query_parameters] = @event.fetch('queryStringParameters', nil)

@http_method = info[:method]
@http_uri = http_uri(info)
end

def http_uri(info)
return unless info[:host] && info[:path]
fallwith marked this conversation as resolved.
Show resolved Hide resolved

url_str = "https://#{info[:host]}:#{info[:port]}#{info[:path]}"
if info[:query_parameters]
qp = info[:query_parameters].map { |k, v| "#{k}=#{v}" }.join('&')
url_str += "?#{qp}"
end

URI.parse(url_str)
rescue StandardError
end
fallwith marked this conversation as resolved.
Show resolved Hide resolved

def info_for_api_gateway_v2
ctx = @event.fetch('requestContext', nil)
return {} unless ctx

{method: ctx.dig('http', 'method'),
path: ctx.dig('http', 'path'),
host: ctx.fetch('domainName', @event.dig('headers', 'Host')),
port: @event.dig('headers', 'X-Forwarded-Port') || 443}
end

def info_for_api_gateway_v1
headers = headers_from_event
{method: @event.fetch('httpMethod', nil),
path: @event.fetch('path', nil),
host: headers.fetch('Host', nil),
port: headers.fetch('X-Forwarded-Port', 443)}
end

def process_headers
return unless ::NewRelic::Agent.config[:'distributed_tracing.enabled']

headers = headers_from_event
return unless headers && !headers.empty?

dt_headers = headers.fetch(NewRelic::NEWRELIC_KEY, nil)
return unless dt_headers

::NewRelic::Agent::DistributedTracing::accept_distributed_trace_headers(dt_headers, 'Other')
end

def headers_from_event
@headers ||= @event&.dig('requestContext', 'http') || @event&.dig('headers')
end

def use_named_pipe?
return @use_named_pipe if defined?(@use_named_pipe)

Expand All @@ -146,15 +239,142 @@ def use_named_pipe?
def add_agent_attributes
return unless NewRelic::Agent::Tracer.current_transaction

add_agent_attribute(ATTRIBUTE_COLD_START, true) if cold?
add_agent_attribute(ATTRIBUTE_ARN, @context.invoked_function_arn)
add_agent_attribute(ATTRIBUTE_REQUEST_ID, @context.aws_request_id)
add_agent_attribute('aws.lambda.coldStart', true) if cold?
add_agent_attribute('aws.lambda.arn', @context.invoked_function_arn)
add_agent_attribute('aws.requestId', @context.aws_request_id)

add_event_source_attributes
add_http_attributes if api_gateway_event?
end

def add_http_attributes
return unless category == :web

if @http_uri
add_agent_attribute('uri.host', @http_uri.host)
add_agent_attribute('uri.port', @http_uri.port)
if NewRelic::Agent.instance.attribute_filter.allows_key?('http.url', AttributeFilter::DST_SPAN_EVENTS)
add_agent_attribute('http.url', @http_uri.to_s)
end
end

if @http_method
add_agent_attribute('http.method', @http_method)
add_agent_attribute('http.request.method', @http_method)
end
end

def api_gateway_event?
return false unless @event

# '1.0' for API Gateway V1, '2.0' for API Gateway V2
return true if @event.fetch('version', '').start_with?(DIGIT)

return false unless headers_from_event

# API Gateway V1 - look for toplevel 'path' and 'httpMethod' keys if a version is unset
return true if @event.fetch('path', nil) && @event.fetch('httpMethod', nil)

# API Gateway V2 - look for 'requestContext/http' inner nested 'path' and 'method' keys if a version is unset
return true if @event.dig('requestContext', 'http', 'path') && @event.dig('requestContext', 'http', 'method')

false
end

def add_event_source_attributes
arn = event_source_arn
add_agent_attribute('aws.lambda.eventSource.arn', arn) if arn

info = event_source_event_info
return unless info

add_agent_attribute('aws.lambda.eventSource.eventType', info['name'])

info['attributes'].each do |name, elements|
next if elements.empty?

size = false
if elements.last.eql?('#size')
elements = elements.dup
elements.pop
size = true
end
value = @event.dig(*elements)
value = value.size if size
next unless value

add_agent_attribute(name, value)
end
end

def event_source_arn
return unless @event

# SQS/Kinesis Stream/DynamoDB/CodeCommit/S3/SNS
return event_source_arn_for_records if @event.fetch('Records', nil)

# Kinesis Firehose
ds_arn = @event.fetch('deliveryStreamArn', nil) if @event.fetch('records', nil)
return ds_arn if ds_arn

# ELB
elb_arn = @event.dig('requestContext', 'elb', 'targetGroupArn')
return elb_arn if elb_arn

# (other)
es_arn = @event.dig('resources', 0)
return es_arn if es_arn

NewRelic::Agent.logger.debug 'Unable to determine an event source arn'

nil
end

def event_source_event_info
return unless @event

# if every required key for a source is found, consider that source
# to be a match
EVENT_SOURCES.each do |_type, info|
return info unless info['required_keys'].detect { |r| @event.dig(*r).nil? }
end
fallwith marked this conversation as resolved.
Show resolved Hide resolved

nil
end

def event_source_arn_for_records
record = @event['Records'].first
unless record
NewRelic::Agent.logger.debug "Unable to find any records in the event's 'Records' array"
return
end

arn = record.fetch('eventSourceARN', nil) || # SQS/Kinesis Stream/DynamoDB/CodeCommit
record.dig('s3', 'bucket', 'arn') || # S3
record.fetch('EventSubscriptionArn', nil) # SNS

unless arn
NewRelic::Agent.logger.debug "Unable to determine an event source arn from the event's 'Records' array"
end

arn
end

def add_agent_attribute(attribute, value)
NewRelic::Agent::Tracer.current_transaction.add_agent_attribute(attribute, value, AGENT_ATTRIBUTE_DESTINATIONS)
end

def process_response(response)
return response unless category == :web && response.respond_to?(:fetch)

http_status = response.fetch(:statusCode, response.fetch('statusCode', nil))
return unless http_status

add_agent_attribute('http.statusCode', http_status)

response
end

def cold?
return @cold if defined?(@cold)

Expand All @@ -163,7 +383,12 @@ def cold?
end

def reset!
@event = nil
@category = nil
@context = nil
@headers = nil
@http_method = nil
@http_uri = nil
@payloads.replace({})
end
end
Expand Down
Loading
Loading