在 AWS Lambda 函数返回响应后

AWS Lambda 函数响应后执行代码

关键要点

  • 可以在 AWS Lambda 函数内进行同步与异步任务结合,通过多种方式实现响应后的代码执行。
  • 可选择将代码拆分为多个函数或使用响应流、Lambda 扩展或自定义运行时来实现。
  • 选择合适的方式取决于用例和需求,包括任务复杂性、部署及成本等。

当您同步调用一个 函数时,通常希望该函数能尽快返回响应。例如,当客户端通过 或调用 Lambda函数时,这种情况十分常见。然而,在某些情况下,您可能需要执行一些额外的工作,这些工作不影响响应,可以在发送响应后异步进行。例如,您可以将数据存储到数据库中或将信息发送到日志系统。

一旦您发送了函数响应,Lambda 服务会冻结运行环境,无法再运行额外代码。即使您为在后台运行任务创建了一个线程,Lambda服务一旦返回处理程序的响应,运行环境也会被冻结,导致线程在下一次调用之前无法继续执行。虽然您可以延迟返回响应,直到所有工作完成,但这种做法可能会对用户体验产生负面影响。

本博客将探讨在函数返回响应后仍可运行的任务。

调用异步 Lambda 函数

第一种选择是将代码拆分为两个函数。第一个函数处理同步代码;第二个函数处理异步代码。在同步功能返回之前,它会异步调用第二个函数,可以直接使用 或间接触发,例如,通过发送消息到。

以下 Python 代码演示了如何实现这一点:


logger = Logger() client = boto3.client('lambda')

def calc_response(event): logger.info(f"[Function] 正在计算响应") time.sleep(1) #
模拟同步工作 return { "message": "来自异步的问候" }

def submit_async_task(response): # 调用异步函数继续 logger.info(f"[Function] 调用异步任务")
client.invoke_async(FunctionName=os.getenv('ASYNC_FUNCTION'),
InvokeArgs=json.dumps(response))

def handler(event, context): logger.info(f"[Function] 收到事件:
{json.dumps(event)}")

    
    
    response = calc_response(event)
    
    # 完成响应计算,提交异步任务
    submit_async_task(response)
    
    # 将响应返回给客户端
    logger.info(f"[Function] 返回响应给客户端")
    return {
        "statusCode": 200,
        "body": json.dumps(response)
    }
    

以下是执行异步工作的 Lambda 函数:


logger = Logger()

def handler(event, context): logger.info(f"[异步任务] 开始异步任务:
{json.dumps(event)}") time.sleep(3) # 模拟异步工作 logger.info(f"
允许开发者在收到响应的第一字节后即开始流式发送响应,而无需等待整个响应的完成。您通常在需要缩短或在响应大于6MB(Lambda 响应有效负载大小限制)的情况下使用响应流。

通过这种方法,函数可以使用响应流发送响应,并在发送响应的最后一个字节后继续运行代码。这样,客户端会收到响应,而 Lambda 函数也可以继续运行。

以下 Node.js 代码示例演示如何实现此功能:

```javascript import { Logger } from '@aws-lambda-powertools/logger';

const logger = new Logger();

export const handler = awslambda.streamifyResponse(async (event,
responseStream, _context) => { logger.info("[Function] 收到事件: ", event);

    
    
    // 处理事件
    let response = await calc_response(event);
    
    // 将响应返回给客户端
    logger.info("[Function] 返回响应给客户端");
    responseStream.setContentType('application/json');
    responseStream.write(response);
    responseStream.end();
    
    await async_task(response);
    

});

const calc_response = async (event) => { logger.info("[Function] 正在计算响应");
await sleep(1); // 模拟同步工作

    
    
    return {
        message: "来自流式的问候"
    };
    

};

const async_task = async (response) => { logger.info("[异步任务] 开始异步任务"); awaitsleep(3); // 模拟异步工作 logger.info("[异步任务] 完成"); };

const sleep = async (sec) => { return new Promise((resolve) => {
setTimeout(resolve, sec * 1000); }); }; ```

## 使用 Lambda 扩展

 允许增强 Lambda函数,与您偏好的监控、可观察性、安全和治理工具集成。您还可以使用扩展在后台运行自己的代码,以便在您的函数返回响应给客户端后继续运行。

Lambda 扩展分为两种类型:外部和内部扩展。外部扩展作为与执行环境分开的进程运行。Lambda 函数可以通过/tmp文件夹中的文件或使用本地网络(例如通过 HTTP 请求)与扩展进行通信。您必须将外部扩展打包为 Lambda 层。

内部扩展作为运行处理程序的同一进程中的独立线程运行。处理程序可以通过任何进程内机制进行通信,例如内部队列。以下示例显示了一个内部扩展,这是处理程序进程中的专用线程。

当 Lambda 服务调用函数时,它还会通知所有扩展。只有在 Lambda 函数返回响应并且所有扩展向运行时发出信号表示它们完成时,Lambda服务才冻结执行环境。通过这种方式,函数可以使用扩展独立运行任务,扩展在任务处理完成后通知 Lambda 运行时。这使得执行环境在任务完成之前保持活跃。

以下 Python 代码示例将扩展代码隔离到自己的文件中,处理程序导入并使用它来运行后台任务:

```python import json import time import async_processor as ap fromaws_lambda_powertools import Logger

logger = Logger()

def calc_response(event): logger.info(f"[Function] 正在计算响应") time.sleep(1) #
模拟同步工作 return { "message": "来自扩展的问候" }

# 这是在处理程序代码调用submit_async_task 之后执行的函数

# 它可以在函数返回后继续运行

def async_task(response): logger.info(f"[异步任务] 开始异步任务:
{json.dumps(response)}") time.sleep(3) # 模拟异步工作 logger.info(f"[异步任务] 完成")

def handler(event, context): logger.info(f"[Function] 收到事件:
{json.dumps(event)}")

    
    
    # 计算响应
    response = calc_response(event)
    
    # 完成响应计算
    # 调用异步处理器继续
    logger.info(f"[Function] 调用扩展中的异步任务")
    ap.start_async_task(async_task, response)
    
    # 将响应返回给客户端
    logger.info(f"[Function] 返回响应给客户端")
    return {
        "statusCode": 200,
        "body": json.dumps(response)
    }
    

以下 Python 代码演示如何实现运行后台任务的扩展:


logger = Logger() LAMBDA_EXTENSION_NAME = "AsyncProcessor"

# 由处理程序使用的内部队列,用于通知扩展可以开始处理异步任务。

async_tasks_queue = queue.Queue()

def start_async_processor(): # 注册内部扩展 logger.debug(f"[{LAMBDA_EXTENSION_NAME}]
正在与 Lambda 服务注册...") response = requests.post(
url=f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/register",
json={'events': ['INVOKE']}, headers={'Lambda-Extension-Name':
LAMBDA_EXTENSION_NAME} ) ext_id = response.headers['Lambda-Extension-
Identifier'] logger.debug(f"[{LAMBDA_EXTENSION_NAME}] 注册成功,ID: {ext_id}")

    
    
    def process_tasks():
        while True:
            logger.debug(f"[{LAMBDA_EXTENSION_NAME}] 等待调用事件...")
            response = requests.get(
                url=f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/event/next",
                headers={'Lambda-Extension-Identifier': ext_id},
                timeout=None
            )
    
            # 从内部队列获取下一个任务
            logger.debug(f"[{LAMBDA_EXTENSION_NAME}] 唤醒,等待处理处理程序的异步任务")
            async_task, args = async_tasks_queue.get()
    
            if async_task is None:
                # 没有任务可运行
                logger.debug(f"[{LAMBDA_EXTENSION_NAME}] 收到空任务。忽略。")
            else:
                # 调用任务
                logger.debug(f"[{LAMBDA_EXTENSION_NAME}] 从处理器收到异步任务,开始任务。")
                async_task(args)
    
            logger.debug(f"[{LAMBDA_EXTENSION_NAME}] 完成任务处理")
    
    # 在单独的线程中启动处理扩展事件
    threading.Thread(target=process_tasks, daemon=True, name='AsyncProcessor').start()
    

# 由函数使用,用于表示有任务需要由异步任务处理器执行

def start_async_task(async_task=None, args=None):
async_tasks_queue.put((async_task, args))

# 启动异步任务处理器

start_async_processor() ```

## 使用自定义运行时

Lambda 支持多种内置运行时:Python、Node.js、Java、.NET 和 Ruby。Lambda也支持自定义运行时,这使您可以使用所需的任何其他编程语言开发 Lambda 函数。

当您调用使用自定义运行时的 Lambda 函数时,Lambda服务触发名为“bootstrap”的进程,其中包含您的自定义代码。自定义代码需要与进行交互。它调用/next 端点以获取下一个调用的信息。此 API调用是阻塞的,它会一直等待直到请求到达。当函数处理完请求后,它必须调用/response 端点将响应发送回客户端,然后必须再次调用/next端点以等待下一个调用。Lambda 在您调用 /next 之后冻结执行环境,直到请求到达。

使用这种方法,您可以在调用 /response、将响应发送回客户端后,在调用 /next 之前运行异步任务,指示处理完成。

以下 Python 代码示例将自定义运行时代码隔离到自己的文件中,函数导入并使用它与运行时 API 交互:

```python import time import json import runtime_interface as rt fromaws_lambda_powertools import Logger

logger = Logger()

def calc_response(event): logger.info(f"[Function] 正在计算响应") time.sleep(1) #
模拟同步工作 return { "message": "来自自定义的问候" }

def async_task(response): logger.info(f"[异步任务] 开始异步任务:
{json.dumps(response)}") time.sleep(3) # 模拟异步工作 logger.info(f"[异步任务] 完成")

def main(): # 您可以在这里添加初始化代码

    
    
    # 以下循环无限运行,等待下一个调用并发送响应回客户端
    while True:
        requestId, event = rt.get_next()
        logger.info(f"[Function] 收到事件: {json.dumps(event)}")
    
        # 计算响应
        response = calc_response(event)
    
        # 完成计算响应,发送响应给客户端
        logger.info(f"[Function] 返回响应给客户端")
        rt.send_response(requestId, {
            "statusCode": 200,
            "body": json.dumps(response)
        })
    
        logger.info(f"[Function] 正在调用异步任务")
        async_task(response)
    

main() ```

以下 Python 代码演示如何与运行时 API 进行交互:

```python import requests import os from aws_lambda_powertools import Logger

logger = Logger() run_time_endpoint = os.environ['AWS_LAMBDA_RUNTIME_API']

def get_next(): logger.debug("[自定义运行时] 等待调用...") request = requests.get(
url=f"http://{run_time_endpoint}/2018-06-01/runtime/invocation/next",
timeout=None ) event = request.json() requestId = request.headers["Lambda-
Runtime-Aws-Request-Id"] return requestId, event

def send_response(requestId, response): logger.debug("[自定义运行时] 发送响应")
requests.post(
url=f"http://{run_time_endpoint}/2018-06-01/runtime/invocation/{requestId}/response",
json=response, timeout=None ) ```

## 结论

本博客展示了在 Lambda 函数中结合同步和异步任务的四种方式,允许您在函数向客户端返回响应后继续运行任务。以下表格总结了每种解决方案的优缺点:

**异步调用** | **响应流** | **Lambda 扩展** | **自定义运行时**  
---|---|---|---  
**复杂性** | 实现较简单 | 最容易实现 | 实现复杂,需与扩展 API 和独立线程交互  
**部署** | 需要两个工件:同步函数和异步函数 | 一个包含所有代码的单一部署工件 | 一个包含所有代码的单一部署工件  
**成本** | 成本最高,因额外调用和两个函数的总时长高于单一函数 | 成本最低 | 成本最低  
**启动异步任务** | 在处理程序返回之前 | 在处理调用中的任何时间 | 在处理调用中的任何时间  
**限制** | 发送给异步函数的有效负载不能超过256 KB | 仅支持 Node.js 和自定义运行时。要求,不能与 API Gateway 一起使用,始终公开 | –  
**额外好处** | 同步和异步代码更好地解耦 | 能够分阶段发送响应。支持大于6 MB 的有效负载(需额外费用) | 异步任务在自己的线程中运行,这可以降低总时长和成本  
**失败时的重试** | 由 Lambda 服务管理 | 由开发者负责 | 由开发者负责  
  
选择正确的方案取决于您的用例。如果您使用 Node.js 编写函数并通过 Lambda 函数 URL调用它,请使用响应流。这是最简单的实现方式,也是最具成本效益的方式。

如果异步任务可能发生故障(例如,数据库不可访问),并且您必须确保任务完成,请使用异步 Lambda 调用方法。Lambda服务会重试您的异步函数,直到其成功为止。如果所有重试都失败,最后会触发 Lambda 目标,您可以采取相应措施。

如果您需要自定义运行时,因为您需要使用 Lambda 默认不支持的编程语言,请使用自定义运行时选项。否则,请使用 Lambda扩展选项。虽然实现更复杂,但它成本效益高,可以将代码打包为单个工件,并在向客户端发送响应之前开始处理异步任务。

欲了解更多无服务学习资源,请访问。

标签:,


Leave a Reply

Required fields are marked *