Parquet is an open source file format for Hadoop. Parquet stores nested data structures in a flat columnar format. Compared to a traditional approach where data is stored in a row-oriented approach, parquet is more efficient in terms of storage and performance. A common industry standard is to use parquet files in S3 to query with Athena. As parquet format is best suited and gives optimized performance compared to other data storage formats. You can read more about it here.

However Parquet doesn’t support spaces in column names, this will be an issue if you are using a Kinesis Firehose to stream log data. Typically logs are in JSON format. A common practise is to transform these JSON logs into parquet while writing to S3 so as to query log data in Athena. Json keys are mapped as column names, If your json keys have spaces in it, the transformation results in a failure. You can use the transformation lambda to handle those spaces (replace with underscores) in the keys.

We shall look at a setup, where cloudwatch logs are directly streamed to kinesis firehose (follow Example 3, point 12 to create subscription).

Here, the log format can be a simple json or the cloudwatch embedded metric format.

Creating the transformation lambda:

Create a lambda named ‘cloudwatch_logs_processor_python’ with the following code, set the runtime environment to Python 2.7, timeout at 5min 20sec. Python 2.7 because the following code is just an improved version of a Lambda blueprint. 

Note: This lambda will handle only data sent by Cloudwatch logs to firehose only, if the source is different you might want to tweak the code a bit. Spaces in column names will be replaced with underscores. Give the comments in code a read to understand the functionality.

lambda_function.py

"""
For processing data sent to Firehose by Cloudwatch Logs subscription filters.

Cloudwatch Logs sends to Firehose records that look like this:

{
  "messageType": "DATA_MESSAGE",
  "owner": "123456789012",
  "logGroup": "log_group_name",
  "logStream": "log_stream_name",
  "subscriptionFilters": [
	"subscription_filter_name"
  ],
  "logEvents": [
	{
	  "id": "01234567890123456789012345678901234567890123456789012345",
	  "timestamp": 1510109208016,
	  "message": "log message 1"
	},
	{
	  "id": "01234567890123456789012345678901234567890123456789012345",
	  "timestamp": 1510109208017,
	  "message": "log message 2"
	}
	...
  ]
}

The data is additionally compressed with GZIP.

The code below will:

1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
   processing error output. Such records do not contain any log events. You can modify the code to set the result to
   Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
   each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
   transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
   this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
   method.
6) Any additional records which exceed 6MB will be re-ingested back into Firehose.

"""

import boto3
import StringIO
import gzip
import base64
import json

def transform_metrics(cloudwatchmetrics):
	newcloudwatchmetricslist = []
	for index, cloudwatchmetric in enumerate(cloudwatchmetrics):
		newcloudwatchmetric = {
			"Namespace": cloudwatchmetric['Namespace'],
			"Dimensions": [],
			"Metrics": []
		}
		# print(cloudwatchmetric['Namespace'])
		for key,value in cloudwatchmetric.items():
			
			if key == 'Dimensions':
				for eachlist in value:
					newlist = []
					for eachDimension in eachlist:
						newlist.append(eachDimension.replace(' ', '_'))
					newcloudwatchmetric['Dimensions'].append(newlist)
			
			if key == 'Metrics':
				newmetricslist = []
				for eachMetric in value:
					newmetric = {}
					newmetric['Name'] = eachMetric['Name'].replace(' ','_')
					newmetric['Unit'] = eachMetric['Unit']
					newmetricslist.append(newmetric)
				newcloudwatchmetric['Metrics'] = newmetricslist
		newcloudwatchmetricslist.append(newcloudwatchmetric)
	return newcloudwatchmetricslist

def transform_record(log_json):
	log_transformed = {}
	for k, v in log_json.items():
		if isinstance(v, dict):
			v = transform_record(v)
		log_transformed[k.replace(' ', '_')] = v
	
	if '_aws' in log_transformed:
		transformedmetrics = transform_metrics(log_transformed['_aws']['CloudWatchMetrics'])
		log_transformed['_aws']['CloudWatchMetrics'] = transformedmetrics
	
	return log_transformed

def transformLogEvent(log_event):
	"""Transform each log event.

	The default implementation below just extracts the message and appends a newline to it.

	Args:
	log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}

	Returns:
	str: The transformed log event.
	"""
	log_str = log_event['message'].encode('utf-8')
	log_json = json.loads(log_str)
	log_transformed = json.dumps(transform_record(log_json))
	log_unicode = unicode(log_transformed, "utf-8")
	return log_unicode + "\n"

def processRecords(records):
	for r in records:
		data = base64.b64decode(r['data'])
		striodata = StringIO.StringIO(data)
		with gzip.GzipFile(fileobj=striodata, mode='r') as f:
			data = json.loads(f.read())

		recId = r['recordId']
		"""
		CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
		They do not contain actual data.
		"""
		if data['messageType'] == 'CONTROL_MESSAGE':
			yield {
				'result': 'Dropped',
				'recordId': recId
			}
		elif data['messageType'] == 'DATA_MESSAGE':
			data = ''.join([transformLogEvent(e) for e in data['logEvents']])
			data = base64.b64encode(data)
			yield {
				'data': data,
				'result': 'Ok',
				'recordId': recId
			}
		else:
			yield {
				'result': 'ProcessingFailed',
				'recordId': recId
			}


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
	failedRecords = []
	codes = []
	errMsg = ''
	# if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
	# response will prevent this
	response = None
	try:
		response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
	except Exception as e:
		failedRecords = records
		errMsg = str(e)

	# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
	if not failedRecords and response and response['FailedPutCount'] > 0:
		for idx, res in enumerate(response['RequestResponses']):
			# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
			if 'ErrorCode' not in res or not res['ErrorCode']:
				continue

			codes.append(res['ErrorCode'])
			failedRecords.append(records[idx])

		errMsg = 'Individual error codes: ' + ','.join(codes)

	if len(failedRecords) > 0:
		if attemptsMade + 1 < maxAttempts:
			print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
			putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
		else:
			raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
	failedRecords = []
	codes = []
	errMsg = ''
	# if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
	# response will prevent this
	response = None
	try:
		response = client.put_records(StreamName=streamName, Records=records)
	except Exception as e:
		failedRecords = records
		errMsg = str(e)

	# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
	if not failedRecords and response and response['FailedRecordCount'] > 0:
		for idx, res in enumerate(response['Records']):
			# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
			if 'ErrorCode' not in res or not res['ErrorCode']:
				continue

			codes.append(res['ErrorCode'])
			failedRecords.append(records[idx])

		errMsg = 'Individual error codes: ' + ','.join(codes)

	if len(failedRecords) > 0:
		if attemptsMade + 1 < maxAttempts:
			print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
			putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
		else:
			raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def createReingestionRecord(isSas, originalRecord):
	if isSas:
		return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
	else:
		return {'data': base64.b64decode(originalRecord['data'])}


def getReingestionRecord(isSas, reIngestionRecord):
	if isSas:
		return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
	else:
		return {'Data': reIngestionRecord['data']}


def handler(event, context):
	isSas = 'sourceKinesisStreamArn' in event
	streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
	region = streamARN.split(':')[3]
	streamName = streamARN.split('/')[1]
	records = list(processRecords(event['records']))
	projectedSize = 0
	dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
	putRecordBatches = []
	recordsToReingest = []
	totalRecordsToBeReingested = 0

	for idx, rec in enumerate(records):
		if rec['result'] != 'Ok':
			continue
		projectedSize += len(rec['data']) + len(rec['recordId'])
		# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
		if projectedSize > 6000000:
			totalRecordsToBeReingested += 1
			recordsToReingest.append(
				getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
			)
			records[idx]['result'] = 'Dropped'
			del(records[idx]['data'])

		# split out the record batches into multiple groups, 500 records at max per group
		if len(recordsToReingest) == 500:
			putRecordBatches.append(recordsToReingest)
			recordsToReingest = []

	if len(recordsToReingest) > 0:
		# add the last batch
		putRecordBatches.append(recordsToReingest)

	# iterate and call putRecordBatch for each group
	recordsReingestedSoFar = 0
	if len(putRecordBatches) > 0:
		client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
		for recordBatch in putRecordBatches:
			if isSas:
				putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
			else:
				putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
			recordsReingestedSoFar += len(recordBatch)
			print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
	else:
		print('No records to be reingested')
	return {"records": records}

Save it.

Enable transform source record with AWS Lambda:

On your Firehose stream, enable Transform source records with AWS lambda, and replace/use the above-created lambda function.

Note: Make sure that your Athena reference table schema has the same column names that will be there after replacing the spaces with underscores in column names of the JSON log. Or else the transformed parquet files will have null value columns.

After setting up, it would like this:

To check if the data is transformed properly, download the transformed parquet file and use this online parquet viewer. If the data is transformed without any loss, build a table in Athena using a crawler, load partitions, and query your parquet log data.

I hope it was helpful. Thank-you!

This story is authored by Koushik. He is a software engineer specializing in AWS Cloud Services.

Last modified: July 27, 2020

Author

Comments

Write a Reply or Comment

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.