作者:Chris McPeek 发表于2024年3月6日,内容包含在 、[Amazon Simple NotificationService (SNS)](https://aws.amazon.com/blogs/compute/category/messaging/amazon- simple-notification-service-sns/)、[Amazon Simple Queue Service (SQS)](https://aws.amazon.com/blogs/compute/category/messaging/amazon-simple- queue-service-sqs/)、 和 相关信息。
本文由 Jeff Harman(高级原型架构师)、Vaibhav Shah(高级解决方案架构师)和 Erik Olsen(高级技术客户经理)撰写。
很多行业需要为决策和交易系统提供审计跟踪。人工智能辅助的决策过程需要近实时监控决策系统的全部输入,以防止欺诈、检测模型漂移和歧视。现代系统通常使用更广泛的输入进行决策,包括图像、非结构化文本、历史数值和其他大型数据元素。这些大型数据元素对传统审计系统构成挑战,因为传统系统主要处理相对较小的结构化文本消息。本文展示了如何使用无服务器技术构建一个可靠、高性能、可追溯、耐用的审计处理管道。
在开发审计记录摄取架构时,需要考虑以下四个需求:
需求 | 说明 |
---|---|
审计记录大小 | 存储和管理大型有效载荷(256k – 6 MB),可能包含文本、二进制数据和对其他存储系统的引用。 |
审计可追溯性 | 存储的数据具有完整的有效载荷和外部过程跟踪能力,以便通过基于订阅的事件进行监控。 |
高性能 | 系统的阻止写入时间限于通过网络传输审计记录所需的时间。 |
高数据耐久性 | 一旦系统发送有效载荷收据,因系统故障而导致的有效载荷丢失风险极低。 |
下图展示了满足这些需求的架构,并模拟了审计记录在系统中的流动。
端点发起 API 调用。一个 函数接收该消息,并由 集群提供低延迟的初始存储机制。一旦数据存储在 ElastiCache 中, 工作流便负责协调通信和持久性功能。
订阅者会收到四个 Amazon Simple Notification Service (SNS) 通知,通知内容包括审计记录有效载荷的到达和存储、审计记录元数据的存储,以及审计记录归档的完成。用户可以将 Amazon Simple QueueService (SQS) 队列订阅到 SNS 主题,并利用 机制实现高可靠性。
三个基本处理步骤中的任何故障:摄取、数据归档和元数据归档,将触发一个 SQS死信队列(DLQ)中的消息,消息会包含原始请求和故障原因的说明。若摄取消息功能失败,则调用摄取消息失败处理函数,该函数会将原始参数存储到 S3失败消息存储桶中以供后续分析。
Step Functions工作流提供系统的编排和并行路径执行。下面是详细的工作流,展示了执行流程和通知操作。转换步骤将内部数据结构转换为消费者所需的格式。
。
摄取消息的 Lambda 函数生成一个消息 ID,用于跟踪消息有效载荷的生命周期。它将完整消息存储在 ElastiCache for Redis缓存中。摄取消息 Lambda 函数生成包含上述所有必要元素的内部消息。最后,Lambda 函数处理代码启动 Step Functions工作流,并传入内部消息有效载荷。
如果摄取消息的 Lambda 函数由于任何原因失败,该 Lambda 函数会调用摄取失败处理 Lambda 函数。此 Lambda 函数会将任何可恢复的 incoming 消息数据写入 S3 存储桶,并在摄取消息死信队列中发送通知。
然后 Step Functions 工作流会并行运行三个过程。
在每个 Lambda 函数处理完成后,Lambda 函数会向 SNS 通知主题发送通知,提醒订阅者每个操作已完成。当消息元数据和消息归档 Lambda函数完成后,最终聚合函数会对 DynamoDB 中的元数据进行最终更新,包括 S3 引用信息,移除 ElastiCache Redis 引用。
此实现的源代码可以在 https://github.com/aws-samples/blog-serverless-reliable-messaging 找到。
git clone https://github.com/aws-samples/blog-serverless-reliable-messaging.git
blog_serverless_reliable_messaging
sam build
sam deploy --guided
。系统会询问您提供以下参数:部署堆栈后,您可以使用与部署输出中引用的 API 密钥相对应的 API Gateway 端点进行测试。可以通过 AWS 控制台(通过输出中的链接 -
ApiKeyConsole
)或通过 AWS CLI (通过输出中的 AWS CLI 引用 - APIKeyCLI
)获取 API 密钥。
您还可以通过 Lambda 服务控制台直接测试,调用摄取消息功能。
项目根目录中有一个测试消息文件 test_message.json,用于直接测试摄取函数的 Lambda 功能。
<项目名称> -IngestMessageFunction-xxxxx
" 函数json { "isBase64Encoded": false, "statusCode": 200, "headers": { "Access- Control-Allow-Headers": "Content-Type", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "OPTIONS,POST" }, "body": "{\"messageID\": \"XXXXXXXXXXXXXX\"}" }
<项目名称>-s3messagearchive-xxxxxx
”中,找到原始 json 有效载荷,其键基于脚本执行的日期和时间,例如:YEAR/MONTH/DAY/HOUR/MINUTE
,文件名称为 messageID
metaDataTable
的 DynamoDB 表中,您应该找到一条记录,其 messageID
与上述 messageID
相等,并包含与有效载荷相关的所有元数据测试文件夹中包含一个 Python 脚本:
<Your API key key here>
和 <Your API Gateway URL here (IngestMessageApi)>
值,以正确对应您的环境python3 -m pip install -r ./test_client/requirements.txt python3 ./test_client/test_client.py
json { "messageID": " XXXXXXXXXXXXXX" }
<项目名称>-s3messagearchive-xxxxxx
”中,应该能够找到原始 json 有效载荷,其键基于脚本执行的日期和时间,例如:YEAR/MONTH/DAY/HOUR/MINUTE,文件名称为 messageID
metaDataTable
的 DynamoDB 表中,您应该找到一条记录,其 messageID
与上述 messageID
相等,并包含与有效载荷相关的所有元数据本文描述了支持大型消息的高可靠性消息系统的架构模式、消息模式和数据结构。使用包括 Lambda 函数、StepFunctions、ElastiCache、DynamoDB 和 S3的无服务器服务,满足现代审计系统可扩展和可靠的要求。本文分享的架构适用于高度监管的环境,可以存储和跟踪比典型日志系统更大的消息,记录大小介于 256k 到 6MB 之间。该架构可作为蓝图,进一步扩展和适应其他无服务器用例。
为了获取无服务器学习资源,请访问 。
标签:,
Leave a Reply