PendingWrite三元组的第二部分表示写入的Channel但是对于一些特殊的场景比如出错、无写入、中断和恢复它们的值不再是一个普通的Channel名称而是使用如下的值__error__执行Node对应的任务出现异常__no_writes__Node任务成功执行但是没有执行针对Channel的输出__interrupt__任务中断__resume__表示恢复执行提供的数据接下来我们两个例子来产生上述这几种特殊的Pending Write。我们先来模拟出错的场景如下面的代码片段所示我们执行的Pregel对象具有一个唯一的Node它的处理函数直接抛出一个异常。/* by 01130.hk - online tools website : 01130.hk/zh/keyboardcode.html */ from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue from langgraph.checkpoint.memory import InMemorySaver from typing import Any def handle(args: dict[str, Any])-None: raise Exception(manllually raised exception) node NodeBuilder().subscribe_to(start).do(handle) app Pregel( nodes{body: node}, channels{start: LastValue(str)}, checkpointerInMemorySaver(), input_channels[start], output_channels[], ) config {configurable: {thread_id: 123}} try: result app.invoke({start: begin}, configconfig) except Exception as ex: print(fCaught exception:{ex} ) (_, _, _, _, pending_writes) app.checkpointer.get_tuple(config) print(pending_writes)我们在try/except块中完成针对Pregel的调用并捕捉和输出得到的异常信息。接下来我们调用Checkpointer一个InMemorySaver对象的get_tuple方法得到对应的CheckpointTuple元组然后将pending_writes部分输出出来。从如下所示的输出结果可以看出这个Pending Write三元组的Channel名称被设置为__error__整个Exception对象成为了写入的内容。Caught exception:manllually raised exception [(f9ff1e88-4d82-f417-ad11-8fd870bfe647, __error__, Exception(manllually raised exception))]由于并不是所有的Node都有向Channel写入执行结果的需求所以只要处理函数成功执行即使没有Channel输出的行为该任务的状态也会被视为成功Checkpointer只是采用不同的形式来记录这种不需要写入的Pending Write。如下的这个程序不仅仅演示了这种无输出写入的场景还同时模拟了中断和恢复。/* by 01130.hk - online tools website : 01130.hk/zh/keyboardcode.html */ from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue from langgraph.checkpoint.memory import InMemorySaver from typing import Any from langgraph.types import Command, interrupt def foo(args: dict[str, Any]) - list[str]: resume1 interrupt(1st interrupt) assert resume1 1st resume resume2 interrupt(2nd interrupt) assert resume2 2nd resume resume3 interrupt(3rd interrupt) assert resume3 3rd resume return [resume1, resume2, resume3] def bar(args: dict[str, Any]) - None: pass app Pregel( nodes{ foo: NodeBuilder().subscribe_only(start).do(foo).write_to(output), bar: NodeBuilder().subscribe_only(start).do(bar), }, channels{ start: LastValue(str), output: LastValue(list[str]), }, input_channels[start], output_channels[output], checkpointerInMemorySaver(), ) config {configurable: {thread_id: 123}} result app.invoke(input{start: begin}, configconfig, stream_modetasks) (_, _, _, _, pending_writes) app.checkpointer.get_tuple(config) print(fAfter invoke:\n{pending_writes}) app.invoke(inputCommand(resume1st resume), configconfig) (_, _, _, _, pending_writes) app.checkpointer.get_tuple(config) print(f\nAfter resume 1:\n{pending_writes}) app.invoke(inputCommand(resume2nd resume), configconfig) (_, _, _, _, pending_writes) app.checkpointer.get_tuple(config) print(f\nAfter resume 2:\n{pending_writes}) result app.invoke(inputCommand(resume3rd resume), configconfig) assert result {output: [1st resume, 2nd resume, 3rd resume]} (_, _, _, _, pending_writes) app.checkpointer.get_tuple(config) print(f\nAfter resume 3:\n{pending_writes})如上面的代码片段所示我们为Pregel提供了两个并行执行的节点foo和bar其中bar对应的函数并未执行任何有效操作也没有任何的输出。我们为节点foo对应的处理函数制造了三次人为中断所以需要至少四次调用才能结束。我们在创建的RunnableConfig对象中提供了统一的Thread ID并将它作为后续方法调用的参数。针对Pregel的三次调用第一次是为常规调用后面两次分别是针对两次中断的恢复调用。我们在每次调用后得到并输出Checkpointer记录下来的Pending Write。从如下的输出结果可以看出第一次常规调用后 节点foo停在第一个中断处节点bar成功执行但没有输出所以Checkpointer将它们作为Pending Write记录下来Channel名称分别是__interrupt__和__no_writes__前者的写入内容是一个Interrupt对象它具有我们指定的值“1st interrupt”。我们也看到了Interrupt对象具有一个唯一标识在恢复调用时我们可以利用此标识为其指定针对性的恢复数据Command(resume{id:resume value)。After invoke: [(8d407c25-02f6-9101-d1b8-5a99c247edde, __interrupt__, [Interrupt(value1st interrupt, id5603cdf275d8b8ba0633d272fa176fd3)]), (22507855-e257-1b5b-eb1a-3c3fb0a071e9, __no_writes__, None)] After resume 1: [(8d407c25-02f6-9101-d1b8-5a99c247edde, __interrupt__, [Interrupt(value2nd interrupt, id5603cdf275d8b8ba0633d272fa176fd3)]), (22507855-e257-1b5b-eb1a-3c3fb0a071e9, __no_writes__, None), (00000000-0000-0000-0000-000000000000, __resume__, 1st resume), (8d407c25-02f6-9101-d1b8-5a99c247edde, __resume__, [1st resume])] After resume 2: [(8d407c25-02f6-9101-d1b8-5a99c247edde, __interrupt__, [Interrupt(value3rd interrupt, id5603cdf275d8b8ba0633d272fa176fd3)]), (22507855-e257-1b5b-eb1a-3c3fb0a071e9, __no_writes__, None), (00000000-0000-0000-0000-000000000000, __resume__, 2nd resume), (8d407c25-02f6-9101-d1b8-5a99c247edde, __resume__, [1st resume, 2nd resume])] After resume 3: []针对第一个中断的恢复调用后节点foo停在第二个中断处此时Checkpointer会创建两个新的Pending Write持久化我们提供的Resume Value“1st resume”它的Channel名称就是__resume__但为什么是两个呢这实际上反映了 Pregel 处理外部指令注入与Node内部消费的同步机制。第一个被称为全局Resume ValueGlobal Resume Value, 它代表从外部通过Command(resume...)注入到图中的原始指令。由于它不是由图内Node产生的因此 Task ID 为空它是唤醒整个暂停状态的“总开关”。第二个节点foo对全局Resume Value的消费记录所以具有一个明确的Taks ID。当节点foo被唤醒并执行到interrupt行时它会从全局Resume Value读取数据。为了保证幂等性和可回溯性系统会将拿走了哪个Resume Value记录在它的任务路径下。针对Resume的冗余设置是为了解决重入与回溯问题。全局记录证明了用户确实提供了这个值。Node记录证明了这个值确实被这个特定的interrupt函数调用消费了。一个Node内部可以连续调用多次interrupt函数系统需要按顺序记录该Node消费过的所有Resume Value以便在“时间旅行”或重试时能够精确对齐。当我们调用interrupt函数实施人为中断时底层实际上会抛出一个GraphInterrupt异常Pregel通过捕获这个异常进而生成针对性的PendingWrite所以针对同一个任务有可能有一个唯一的中断类型的PendingWrite。由于恢复执行总是会从头执行Node函数所以基于中断的PendingWrite并不会恢复执行造成任何影响。所以当我们完成第二次恢复调用后持久化的中断PendingWrite反映的是针对第二次interrupt函数的调用对应Interrupt对象的值为2nd interrupt。Resume Value必须按照顺序提供因为每遇到一个interrpt函数的调用都会利用前面介绍过的计算器提供的索引从Resume Value列表中读取Resume Value作为该调用的返回值所以持久化的第二个基于恢复的PendingWrite对应的值变成了包含两个Resume Value的列表[1st resume, 2nd resume]。在针对第三个中断的恢复执行结束后fooNode完成了它的执行任务而bar对应的任务本身就是成功状态所以整个Superstep顺利结束自然也就不存在Pending Write了。