Skip to content

Commit

Permalink
fix(kafkajs): handle kafka tombstone messages (elastic#3985)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-luna authored and fpm-peter committed Aug 20, 2024
1 parent 474963b commit 5467bb5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ Notes:
See the <<upgrade-to-v4>> guide.
==== Unreleased
[float]
===== Breaking changes
[float]
===== Features
[float]
===== Bug fixes
* Fix message handling for tombstone messages in `kafkajs` instrumentation.
({pull}3985[#3985])
[float]
===== Chores
[[release-notes-4.5.2]]
==== 4.5.2 - 2024/04/12
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/modules/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ module.exports = function (mod, agent, { version, enabled }) {
config.captureBody === 'all' ||
config.captureBody === 'transactions'
) {
messageCtx.body = message.value.toString();
messageCtx.body = message.value?.toString();
}

if (message.headers && config.captureHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async function useKafkajsClient(kafkaClient, options) {
let eachMessagesConsumed = 0;
await consumer.run({
eachMessage: async function ({ message }) {
log.info(`message received: ${message.value.toString()}`);
log.info(`message received: ${message.value?.toString()}`);
eachMessagesConsumed++;
},
});
Expand All @@ -76,9 +76,14 @@ async function useKafkajsClient(kafkaClient, options) {
data = await producer.send({
topic,
messages: [
{ value: 'each message 1', headers: { foo: 'string' } },
{ value: 'each message 2', headers: { foo: Buffer.from('buffer') } },
{ value: 'each message 3', headers: { auth: 'this_is_a_secret' } },
{ value: 'each message 1', headers: { foo: 'foo 1' } },
{ value: 'each message 2', headers: { foo: Buffer.from('foo 2') } },
{
value: 'each message 3',
headers: { foo: 'foo 3', auth: 'this_is_a_secret' },
},
// https://github.com/elastic/apm-agent-nodejs/issues/3980
{ value: null, headers: { foo: 'foo 4' } },
],
});
log.info({ data }, 'messages sent');
Expand Down
34 changes: 32 additions & 2 deletions test/instrumentation/modules/kafkajs/kafkajs.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ const testFixtures = [
message: {
queue: { name: kafkaTopic },
headers: {
foo: 'buffer',
foo: 'foo 1',
traceparent: `00-${tx.trace_id}-${parentId}-01`,
tracestate: 'es=s:1',
},
Expand All @@ -189,7 +189,7 @@ const testFixtures = [
message: {
queue: { name: kafkaTopic },
headers: {
foo: 'string',
foo: 'foo 2',
traceparent: `00-${tx.trace_id}-${parentId}-01`,
tracestate: 'es=s:1',
},
Expand All @@ -206,6 +206,7 @@ const testFixtures = [
message: {
queue: { name: kafkaTopic },
headers: {
foo: 'foo 3',
auth: '[REDACTED]',
traceparent: `00-${tx.trace_id}-${parentId}-01`,
tracestate: 'es=s:1',
Expand All @@ -214,6 +215,23 @@ const testFixtures = [
},
outcome: 'success',
});

t.deepEqual(transactions.shift(), {
name: `Kafka RECEIVE from ${kafkaTopic}`,
type: 'messaging',
context: {
service: {},
message: {
queue: { name: kafkaTopic },
headers: {
foo: 'foo 4',
traceparent: `00-${tx.trace_id}-${parentId}-01`,
tracestate: 'es=s:1',
},
},
},
outcome: 'success',
});
t.equal(transactions.length, 0, 'all transactions accounted for');
},
},
Expand Down Expand Up @@ -665,6 +683,18 @@ const testFixtures = [
},
outcome: 'success',
});

t.deepEqual(transactions.shift(), {
name: `Kafka RECEIVE from ${kafkaTopic}`,
type: 'messaging',
context: {
service: {},
message: {
queue: { name: kafkaTopic },
},
},
outcome: 'success',
});
t.equal(transactions.length, 0, 'all transactions accounted for');
},
},
Expand Down

0 comments on commit 5467bb5

Please sign in to comment.