AWS Machine Learning Blog

Redact sensitive data from streaming data in near-real time using Amazon Comprehend and Amazon Kinesis Data Firehose

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.

Near-real-time delivery of data and insights enable businesses to rapidly respond to their customers’ needs. Real-time data can come from a variety of sources, including social media, IoT devices, infrastructure monitoring, call center monitoring, and more. Due to the breadth and depth of data being ingested from multiple sources, businesses look for solutions to protect their customers’ privacy and keep sensitive data from being accessed from end systems. You previously had to rely on personally identifiable information (PII) rules engines that could flag false positives or miss data, or you had to build and maintain custom machine learning (ML) models to identify PII in your streaming data. You also needed to implement and maintain the infrastructure necessary to support these engines or models.

To help streamline this process and reduce costs, you can use Amazon Comprehend, a natural language processing (NLP) service that uses ML to find insights and relationships like people, places, sentiments, and topics in unstructured text. You can now use Amazon Comprehend ML capabilities to detect and redact PII in customer emails, support tickets, product reviews, social media, and more. No ML experience is required. For example, you can analyze support tickets and knowledge articles to detect PII entities and redact the text before you index the documents. After that, documents are free of PII entities and users can consume the data. Redacting PII entities helps you protect your customer’s privacy and comply with local laws and regulations.

In this post, you learn how to implement Amazon Comprehend into your streaming architectures to redact PII entities in near-real time using Amazon Kinesis Data Firehose with AWS Lambda.

This post is focused on redacting data from select fields that are ingested into a streaming architecture using Kinesis Data Firehose, where you want to create, store, and maintain additional derivative copies of the data for consumption by end-users or downstream applications. If you’re using Amazon Kinesis Data Streams or have additional use cases outside of PII redaction, refer to Translate, redact and analyze streaming data using SQL functions with Amazon Kinesis Data Analytics, Amazon Translate, and Amazon Comprehend, where we show how you can use Amazon Kinesis Data Analytics Studio powered by Apache Zeppelin and Apache Flink to interactively analyze, translate, and redact text fields in streaming data.

Solution overview

The following figure shows an example architecture for performing PII redaction of streaming data in real time, using Amazon Simple Storage Service (Amazon S3), Kinesis Data Firehose data transformation, Amazon Comprehend, and AWS Lambda. Additionally, we use the AWS SDK for Python (Boto3) for the Lambda functions. As indicated in the diagram, the S3 raw bucket contains non-redacted data, and the S3 redacted bucket contains redacted data after using the Amazon Comprehend DetectPiiEntities API within a Lambda function.

Costs involved

In addition to Kinesis Data Firehose, Amazon S3, and Lambda costs, this solution will incur usage costs from Amazon Comprehend. The amount you pay is a factor of the total number of records that contain PII and the characters that are processed by the Lambda function. For more information, refer to Amazon Kinesis Data Firehose pricing, Amazon Comprehend Pricing, and AWS Lambda Pricing.

As an example, let’s assume you have 10,000 logs records, and the key value you want to redact PII from is 500 characters. Out of the 10,000 log records, 50 are identified as containing PII. The cost details are as follows:

Contains PII Cost:

  • Size of each key value = 500 characters (1 unit = 100 characters)
  • Number of units (100 characters) per record (minimum is 3 units) = 5
  • Total units = 10,000 (records) x 5 (units per record) x 1 (Amazon Comprehend requests per record) = 50,000
  • Price per unit = $0.000002
    • Total cost for identifying log records with PII using ContainsPiiEntities API = $0.1 [50,000 units x $0.000002] 

Redact PII Cost:

  • Total units containing PII = 50 (records) x 5 (units per record) x 1 (Amazon Comprehend requests per record) = 250
  • Price per unit = $0.0001
    • Total cost for identifying location of PII using DetectPiiEntities API = [number of units] x [cost per unit] = 250 x $0.0001 = $0.025

Total Cost for identification and redaction:

  • Total cost: $0.1 (validation if field contains PII) + $0.025 (redact fields that contain PII) = $0.125

Deploy the solution with AWS CloudFormation

For this post, we provide an AWS CloudFormation streaming data redaction template, which provides the full details of the implementation to enable repeatable deployments. Upon deployment, this template creates two S3 buckets: one to store the raw sample data ingested from the Amazon Kinesis Data Generator (KDG), and one to store the redacted data. Additionally, it creates a Kinesis Data Firehose delivery stream with DirectPUT as input, and a Lambda function that calls the Amazon Comprehend ContainsPiiEntities and DetectPiiEntities API to identify and redact PII data. The Lambda function relies on user input in the environment variables to determine what key values need to be inspected for PII.

The Lambda function in this solution has limited payload sizes to 100 KB. If a payload is provided where the text is greater than 100 KB, the Lambda function will skip it.

To deploy the solution, complete the following steps:

  1. Launch the CloudFormation stack in US East (N. Virginia) us-east-1:
  2. Enter a stack name, and leave other parameters at their default
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

Deploy resources manually

If you prefer to build the architecture manually instead of using AWS CloudFormation, complete the steps in this section.

Create the S3 buckets

Create your S3 buckets with the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Create one bucket for your raw data and one for your redacted data.
  4. Note the names of the buckets you just created.

Create the Lambda function

To create and deploy the Lambda function, complete the following steps:

  1. On the Lambda console, choose Create function.
  2. Choose Author from scratch.
  3. For Function Name, enter AmazonComprehendPII-Redact.
  4. For Runtime, choose Python 3.9.
  5. For Architecture, select x86_64.
  6. For Execution role, select Create a new role with Lambda permissions.
  7. After you create the function, enter the following code:
    import json
    import boto3
    import os
    import base64
    import sys
    
    def lambda_handler(event, context):
        
        output = []
        
        for record in event['records']:
            
            # Gathers keys from enviroment variables and makes a list of desired keys to check for PII
            rawkeys = os.environ['keys']
            splitkeys = rawkeys.split(", ")
            print(splitkeys)
            #decode base64
            #Kinesis data is base64 encoded so decode here
            payloadraw=base64.b64decode(record["data"]).decode('utf-8')
            #Loads decoded payload into json
            payloadjsonraw = json.loads(payloadraw)
            
            # Creates Comprehend client
            comprehend_client = boto3.client('comprehend')
            
            
            # This codes handles the logic to check for keys, identify if PII exists, and redact PII if available. 
            for i in payloadjsonraw:
                # checks if the key found in the message matches a redact
                if i in splitkeys:
                    print("Redact key found, checking for PII")
                    payload = str(payloadjsonraw[i])
                    # check if payload size is less than 100KB
                    if sys.getsizeof(payload) < 99999:
                        print('Size is less than 100KB checking if value contains PII')
                        # Runs Comprehend ContainsPiiEntities API call to see if key value contains PII
                        pii_identified = comprehend_client.contains_pii_entities(Text=payload, LanguageCode='en')
                        
                        # If PII is not found, skip over key
                        if (pii_identified['Labels']) == []:
                            print('No PII found')
                        else:
                        # if PII is found, run through redaction logic
                            print('PII found redacting')
                            # Runs Comprehend DetectPiiEntities call to find exact location of PII
                            response = comprehend_client.detect_pii_entities(Text=payload, LanguageCode='en')
                            entities = response['Entities']
                            # creates redacted_payload which will be redacted
                            redacted_payload = payload
                            # runs through a loop that gathers necessary values from Comprehend API response and redacts values
                            for entity in entities:
                                char_offset_begin = entity['BeginOffset']
                                char_offset_end = entity['EndOffset']
                                redacted_payload = redacted_payload[:char_offset_begin] + '*'*(char_offset_end-char_offset_begin) + redacted_payload[char_offset_end:]
                            # replaces original value with redacted value
                            payloadjsonraw[i] = redacted_payload
                            print(str(payloadjsonraw[i]))
                    else:
                        print ('Size is more than 100KB, skipping inspection')
                else:
                    print("Key value not found in redaction list")
            
            redacteddata = json.dumps(payloadjsonraw)
            
            # adds inspected record to record
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data' : base64.b64encode(redacteddata.encode('utf-8'))
            }
            output.append(output_record)
            print(output_record)
            
        print('Successfully processed {} records.'.format(len(event['records'])))
        
        return {'records': output}
  8. Choose Deploy.
  9. In the navigation pane, choose Configuration.
  10. Navigate to Environment variables.
  11. Choose Edit.
  12. For Key, enter keys.
  13. For Value, enter the key values you want to redact PII from, separated by a comma and space. For example, enter Tweet1, Tweet2 if you’re using the sample test data provided in the next section of this post.
  14. Choose Save.
  15. Navigate to General configuration.
  16. Choose Edit.
  17. Change the value of Timeout to 1 minute.
  18. Choose Save.
  19. Navigate to Permissions.
  20. Choose the role name under Execution Role.
    You’re redirected to the AWS Identity and Access Management (IAM) console.
  21. For Add permissions, choose Attach policies.
  22. Enter Comprehend into the search bar and choose the policy ComprehendFullAccess.
  23. Choose Attach policies.

Create the Firehose delivery stream

To create your Firehose delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Source, select Direct PUT.
  3. For Destination, select Amazon S3.
  4. For Delivery stream name, enter ComprehendRealTimeBlog.
  5. Under Transform source records with AWS Lambda, select Enabled.
  6. For AWS Lambda function, enter the ARN for the function you created, or browse to the function AmazonComprehendPII-Redact.
  7. For Buffer Size, set the value to 1 MB.
  8. For Buffer Interval, leave it as 60 seconds.
  9. Under Destination Settings, select the S3 bucket you created for the redacted data.
  10. Under Backup Settings, select the S3 bucket that you created for the raw records.
  11. Under Permission, either create or update an IAM role, or choose an existing role with the proper permissions.
  12. Choose Create delivery stream.

Deploy the streaming data solution with the Kinesis Data Generator

You can use the Kinesis Data Generator (KDG) to ingest sample data to Kinesis Data Firehose and test the solution. To simplify this process, we provide a Lambda function and CloudFormation template to create an Amazon Cognito user and assign appropriate permissions to use the KDG.

  1. On the Amazon Kinesis Data Generator page, choose Create a Cognito User with CloudFormation.You’re redirected to the AWS CloudFormation console to create your stack.
  2. Provide a user name and password for the user with which you log in to the KDG.
  3. Leave the other settings at their defaults and create your stack.
  4. On the Outputs tab, choose the KDG UI link.
  5. Enter your user name and password to log in.

Send test records and validate redaction in Amazon S3

To test the solution, complete the following steps:

  1. Log in to the KDG URL you created in the previous step.
  2. Choose the Region where the AWS CloudFormation stack was deployed.
  3. For Stream/delivery stream, choose the delivery stream you created (if you used the template, it has the format accountnumber-awscomprehend-blog).
  4. Leave the other settings at their defaults.
  5. For the record template, you can create your own tests, or use the following template.If you’re using the provided sample data below for testing, you should have updated environment variables in the AmazonComprehendPII-Redact Lambda function to Tweet1, Tweet2. If deployed via CloudFormation, update environment variables to Tweet1, Tweet2 within the created Lambda function. The sample test data is below:
    {"User":"12345", "Tweet1":" Good morning, everybody. My name is Van Bokhorst Serdar, and today I feel like sharing a whole lot of personal information with you. Let's start with my Email address SerdarvanBokhorst@dayrep.com. My address is 2657 Koontz Lane, Los Angeles, CA. My phone number is 818-828-6231.", "Tweet2": "My Social security number is 548-95-6370. My Bank account number is 940517528812 and routing number 195991012. My credit card number is 5534816011668430, Expiration Date 6/1/2022, my C V V code is 121, and my pin 123456. Well, I think that's it. You know a whole lot about me. And I hope that Amazon comprehend is doing a good job at identifying PII entities so you can redact my personal information away from this streaming record. Let's check"}
  6. Choose Send Data, and allow a few seconds for records to be sent to your stream.
  7. After few seconds, stop the KDG generator and check your S3 buckets for the delivered files.

The following is an example of the raw data in the raw S3 bucket:

{"User":"12345", "Tweet1":" Good morning, everybody. My name is Van Bokhorst Serdar, and today I feel like sharing a whole lot of personal information with you. Let's start with my Email address SerdarvanBokhorst@dayrep.com. My address is 2657 Koontz Lane, Los Angeles, CA. My phone number is 818-828-6231.", "Tweet2": "My Social security number is 548-95-6370. My Bank account number is 940517528812 and routing number 195991012. My credit card number is 5534816011668430, Expiration Date 6/1/2022, my C V V code is 121, and my pin 123456. Well, I think that's it. You know a whole lot about me. And I hope that Amazon comprehend is doing a good job at identifying PII entities so you can redact my personal information away from this streaming record. Let's check"}

The following is an example of the redacted data in the redacted S3 bucket:

{"User":"12345", "Tweet1":"Good morning, everybody. My name is *******************, and today I feel like sharing a whole lot of personal information with you. Let's start with my Email address ****************************. My address is ********************************** My phone number is ************.", "Tweet"2: "My Social security number is ***********. My Bank account number is ************ and routing number *********. My credit card number is ****************, Expiration Date ********, my C V V code is ***, and my pin ******. Well, I think that's it. You know a whole lot about me. And I hope that Amazon comprehend is doing a good job at identifying PII entities so you can redact my personal information away from this streaming record. Let's check"}

The sensitive information has been removed from the redacted messages, providing confidence that you can share this data with end systems.

Cleanup

When you’re finished experimenting with this solution, clean up your resources by using the AWS CloudFormation console to delete all the resources deployed in this example. If you followed the manual steps, you will need to manually delete the two buckets, the AmazonComprehendPII-Redact function, the ComprehendRealTimeBlog stream, the log group for the ComprehendRealTimeBlog stream, and any IAM roles that were created.

Conclusion

This post showed you how to integrate PII redaction into your near-real-time streaming architecture and reduce data processing time by performing redaction in flight. In this scenario, you provide the redacted data to your end-users and a data lake administrator secures the raw bucket for later use. You could also build additional processing with Amazon Comprehend to identify tone or sentiment, identify entities within the data, and classify each message.

We provided individual steps for each service as part of this post, and also included a CloudFormation template that allows you to provision the required resources in your account. This template should be used for proof of concept or testing scenarios only. Refer to the developer guides for Amazon Comprehend, Lambda, and Kinesis Data Firehose for any service limits.

To get started with PII identification and redaction, see Personally identifiable information (PII). With the example architecture in this post, you could integrate any of the Amazon Comprehend APIs with near-real-time data using Kinesis Data Firehose data transformation. To learn more about what you can build with your near-real-time data with Kinesis Data Firehose, refer to the Amazon Kinesis Data Firehose Developer Guide. This solution is available in all AWS Regions where Amazon Comprehend and Kinesis Data Firehose are available.


About the authors

Joe Morotti is a Solutions Architect at Amazon Web Services (AWS), helping Enterprise customers across the Midwest US. He has held a wide range of technical roles and enjoy showing customer’s art of the possible. In his free time, he enjoys spending quality time with his family exploring new places and overanalyzing his sports team’s performance

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing Tennis, binge-watching TV shows, and playing Tabla.