我已经阅读了许多有关在Firehose中添加换行符的类似问题,但它们都是围绕在源代码中添加换行符的问题。问题是我无权访问源,并且第三方正在将数据管道传输到我们的Kinesis实例,并且无法将'\ n'添加到源。
我尝试使用以下代码进行firehose数据转换:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
/* Process the list of records and transform them */
const output = [];
event.records.forEach((record) => {
const results = {
/* This transformation is the "identity" transformation, the data is left intact */
recordId: record.recordId,
result: record.data.event_type === 'alert' ? 'Dropped' : 'Ok',
data: record.data + '\n'
};
output.push(results);
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
但是换行符仍然丢失。我也尝试过,JSON.stringify(record.data) + '\n'
但随后出现Invalid output structure
错误。
尝试解码record.data添加新行,然后再次将其编码为base 64。
这是python但想法是一样的
for record in event['records']: payload = base64.b64decode(record['data']) # Do custom processing on the payload here payload = payload + '\n' output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(json.dumps(payload)) } output.append(output_record) return {'records': output}
来自@Matt Westlake的评论:
对于那些寻找节点答案的人来说
const data = JSON.parse(newBuffer.from(record.data,'base64')。toString('utf8'));
和
新的Buffer.from(JSON.stringify(data)+'\ n')。toString('base64')
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句