diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b28cc50c98..1a9e629ca2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -33,6 +33,23 @@ Notes: See the <> guide. +==== Unreleased + +[float] +===== Breaking changes + +[float] +===== Features + +[float] +===== Bug fixes + +* Fix message handling for tombstone messages in `kafkajs` instrumentation. + ({pull}3985[#3985]) + +[float] +===== Chores + [[release-notes-4.5.2]] ==== 4.5.2 - 2024/04/12 diff --git a/lib/instrumentation/modules/kafkajs.js b/lib/instrumentation/modules/kafkajs.js index e0849412e4..8838f3dd64 100644 --- a/lib/instrumentation/modules/kafkajs.js +++ b/lib/instrumentation/modules/kafkajs.js @@ -138,7 +138,7 @@ module.exports = function (mod, agent, { version, enabled }) { config.captureBody === 'all' || config.captureBody === 'transactions' ) { - messageCtx.body = message.value.toString(); + messageCtx.body = message.value?.toString(); } if (message.headers && config.captureHeaders) { diff --git a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js index 1511caae66..0e2ad4e3b7 100644 --- a/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js +++ b/test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js @@ -64,7 +64,7 @@ async function useKafkajsClient(kafkaClient, options) { let eachMessagesConsumed = 0; await consumer.run({ eachMessage: async function ({ message }) { - log.info(`message received: ${message.value.toString()}`); + log.info(`message received: ${message.value?.toString()}`); eachMessagesConsumed++; }, }); @@ -76,9 +76,14 @@ async function useKafkajsClient(kafkaClient, options) { data = await producer.send({ topic, messages: [ - { value: 'each message 1', headers: { foo: 'string' } }, - { value: 'each message 2', headers: { foo: Buffer.from('buffer') } }, - { value: 'each message 3', headers: { auth: 'this_is_a_secret' } }, + { value: 'each message 1', headers: { foo: 'foo 1' } }, + { value: 'each message 2', headers: { foo: Buffer.from('foo 2') } }, + { + value: 'each message 3', + headers: { foo: 'foo 3', auth: 'this_is_a_secret' }, + }, + // https://github.com/elastic/apm-agent-nodejs/issues/3980 + { value: null, headers: { foo: 'foo 4' } }, ], }); log.info({ data }, 'messages sent'); diff --git a/test/instrumentation/modules/kafkajs/kafkajs.test.js b/test/instrumentation/modules/kafkajs/kafkajs.test.js index 206496efcc..c3f5fb12bc 100644 --- a/test/instrumentation/modules/kafkajs/kafkajs.test.js +++ b/test/instrumentation/modules/kafkajs/kafkajs.test.js @@ -172,7 +172,7 @@ const testFixtures = [ message: { queue: { name: kafkaTopic }, headers: { - foo: 'buffer', + foo: 'foo 1', traceparent: `00-${tx.trace_id}-${parentId}-01`, tracestate: 'es=s:1', }, @@ -189,7 +189,7 @@ const testFixtures = [ message: { queue: { name: kafkaTopic }, headers: { - foo: 'string', + foo: 'foo 2', traceparent: `00-${tx.trace_id}-${parentId}-01`, tracestate: 'es=s:1', }, @@ -206,6 +206,7 @@ const testFixtures = [ message: { queue: { name: kafkaTopic }, headers: { + foo: 'foo 3', auth: '[REDACTED]', traceparent: `00-${tx.trace_id}-${parentId}-01`, tracestate: 'es=s:1', @@ -214,6 +215,23 @@ const testFixtures = [ }, outcome: 'success', }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + headers: { + foo: 'foo 4', + traceparent: `00-${tx.trace_id}-${parentId}-01`, + tracestate: 'es=s:1', + }, + }, + }, + outcome: 'success', + }); t.equal(transactions.length, 0, 'all transactions accounted for'); }, }, @@ -665,6 +683,18 @@ const testFixtures = [ }, outcome: 'success', }); + + t.deepEqual(transactions.shift(), { + name: `Kafka RECEIVE from ${kafkaTopic}`, + type: 'messaging', + context: { + service: {}, + message: { + queue: { name: kafkaTopic }, + }, + }, + outcome: 'success', + }); t.equal(transactions.length, 0, 'all transactions accounted for'); }, },