Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code optimized to forward logs efficiently #87

Merged
merged 3 commits into from
Apr 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 66 additions & 61 deletions BlobForwarder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,58 @@ module.exports = async function main(context, myBlob) {
context.log.warn('logs format is invalid');
return;
}
let compressedPayload;
let payloads = generatePayloads(buffer, context);
for (const payload of payloads) {
try {
compressedPayload = await compressData(JSON.stringify(payload));
try {
await retryMax(httpSend, NR_MAX_RETRIES, NR_RETRY_INTERVAL, [
let logLines = appendMetaDataToAllLogLines(buffer);
await compressAndSend(logLines, context);
};

/**
* Compress and send logs with Promise
* @param {Object[]} data - array of JSON object containing log message and meta data
* @param {Object} context - context object passed while invoking this function
* @returns {Promise} A promise that resolves when logs are successfully sent.
*/

function compressAndSend(data, context) {
Sivakumar3695 marked this conversation as resolved.
Show resolved Hide resolved
return compressData(JSON.stringify(getPayload(data, context)))
.then((compressedPayload) => {
if (compressedPayload.length > NR_MAX_PAYLOAD_SIZE) {
if (data.length === 1) {
Sivakumar3695 marked this conversation as resolved.
Show resolved Hide resolved
context.log.error(
'Cannot send the payload as the size of single line exceeds the limit'
);
return;
}

let halfwayThrough = Math.floor(data.length / 2);

let arrayFirstHalf = data.slice(0, halfwayThrough);
let arraySecondHalf = data.slice(halfwayThrough, data.length);

return Promise.all([
compressAndSend(arrayFirstHalf, context),
compressAndSend(arraySecondHalf, context),
]);
jsubirat marked this conversation as resolved.
Show resolved Hide resolved
} else {
return retryMax(httpSend, NR_MAX_RETRIES, NR_RETRY_INTERVAL, [
compressedPayload,
context,
]);
context.log('Logs payload successfully sent to New Relic.');
} catch (e) {
context.log.error(
'Max retries reached: failed to send logs payload to New Relic'
);
context.log.error('Exception: ', JSON.stringify(e));
])
.then(() =>
context.log('Logs payload successfully sent to New Relic.')
)
.catch((e) => {
context.log.error(
'Max retries reached: failed to send logs payload to New Relic'
);
context.log.error('Exception: ', JSON.stringify(e));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we won't log headers (api key) here? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think headers like api-key would be part of error response from the logs api. And so, the error object would not contain such details I believe. Can you please suggest me on how to check this scenario - max retries reached error?

});
}
} catch (e) {
})
.catch((e) => {
context.log.error('Error during payload compression.');
context.log.error('Exception: ', JSON.stringify(e));
}
}
};
});
}

function compressData(data) {
return new Promise((resolve, reject) => {
Expand All @@ -78,8 +107,21 @@ function compressData(data) {
});
}

function generatePayloads(logs, context) {
const common = {
function appendMetaDataToAllLogLines(logs) {
return logs.map((log) => addMetadata(log));
}

function getPayload(logs, context) {
return [
{
common: getCommonAttributes(context),
logs: logs,
},
];
}

function getCommonAttributes(context) {
return {
attributes: {
plugin: {
type: NR_LOGS_SOURCE,
Expand All @@ -92,34 +134,6 @@ function generatePayloads(logs, context) {
tags: getTags(),
},
};
let payload = [
{
common: common,
logs: [],
},
];
let payloads = [];

logs.forEach((logLine) => {
const log = addMetadata(logLine);
if (
JSON.stringify(payload).length + JSON.stringify(log).length <
NR_MAX_PAYLOAD_SIZE
) {
payload[0].logs.push(log);
} else {
payloads.push(payload);
payload = [
{
common: common,
logs: [],
},
];
payload[0].logs.push(log);
}
});
payloads.push(payload);
return payloads;
}

function getTags() {
Expand Down Expand Up @@ -209,25 +223,16 @@ function transformData(logs, context) {
}

function parseData(logs, context) {
let newLogs = logs;

if (!Array.isArray(logs)) {
try {
newLogs = JSON.parse(logs); // for strings let's see if we can parse it into Object
return JSON.parse(logs); // for strings let's see if we can parse it into Object
} catch {
context.log.warn('cannot parse logs to JSON');
}
} else {
newLogs = logs.map((log) => {
// for arrays let's see if we can parse it into array of Objects
try {
return JSON.parse(log);
} catch {
return log;
}
});
} else if (typeof logs[0] === 'object' && logs[0] !== null) {
return logs.map((log) => JSON.parse(log));
}
return newLogs;
return logs;
}

function httpSend(data, context) {
Expand Down
Loading