Langgraph Human In The Loop with socket
Jaeyoun Nam
Posted on November 13, 2024
langgraph 의 interruption 기능을 통해서 Agent의 수행 중간에 human이 개입할 수 있다는 것을 알았다.
하지만 예시들을 보면 전부 human interaction은 한 셈치고~ 넘어간다. 실제로 User에게 확인을 받으려면 어떻게 해야할까? 크게 세가지 방법이 있을 것 같다.
Langgraph API 서버 사용
langgraph cli 로 langgraph API 서버를 docker로 실행한 후 langgraph SDK로 그래프를 실행하고, 스테이트를 변경하고, 재게하고 할 수 있다.
langgraph에서 제공하는 것들을 제공하는 방법대로 사용해야한다. 뭔간 설정이 많아지고, 내 코드랑 융합하기 까다로울 수 있어보인다.
서버에서 그래프 관리
위의 Langgraph API 서버에서 필요한 부분만 내 커스텀 서버에 구현하는 방법이다. 예를 들어 그래프 실행하면 그래프를 실행한 클라이언트와 그래프 체크포인트를 저장해야하고, 유저의 확인 후에 다시 그래프를 불러와서 유저의 응답에 맞게 상태를 변경해서 재게해야한다.
짜야할게 은근 많을 수도 있다.
소켓 연결
Agent실행 시에 소켓을 연결하고 소켓을 통해서 유저와 인터렉션 하는 것이다. 기존 예시 코드에서 소켓연결과 소켓 통신으로 유저 확인 받는 단계만 추가하면 동작한다.
대신, 글자 타이핑하듯 쳐지는 streaming을 구현하기 까다로울 수도 있다.
소켓 연결로 구현
일단 최대한 복잡성을 늘리지 않는 방향에서 구현을 해보고 싶어서 소켓연결로 구현해보았다.
서버는 NestJs를 사용하고 클라이언트는 NextJs를 사용한다.
서버
일단 Websocket 연결을 위해 Gateway를 만든다. agent/start 시에 커넥션을 만들고 바로 agent를 시행하도록 했다.
@WebSocketGateway({
namespace: "/",
transport: ["websocket", "polling"],
path: "/agent/start",
cors: {
origin: "*",
methods: ["GET", "POST"],
credentials: true,
},
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
protected readonly logger = new Logger(this.constructor.name);
constructor(
private readonly agentFactory: AgentFactory
) {}
private pendingConfirmations = new Map<string, (response: boolean) => void>();
// Handle new connections
handleConnection(client: Socket) {
console.log(`Client connected: ${client.id}`);
// Option 1: Get actionData from query parameters
const actionData: { agent: AgentName } = client.handshake.query.actionData
? JSON.parse(client.handshake.query.actionData as string)
: null;
if (actionData) {
this.startAgentProcess(client, actionData);
} else {
// If no actionData is provided, you can wait for an event
client.emit("error", "No action data provided");
client.disconnect();
}
}
// Handle disconnections
handleDisconnect(client: Socket) {
console.log(`Client disconnected: ${client.id}`);
this.pendingConfirmations.delete(client.id);
}
// Send confirmation request
async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
return new Promise((resolve) => {
this.pendingConfirmations.set(clientId, resolve);
this.server.to(clientId).emit("confirmation_request", data);
// Optional timeout for response
setTimeout(() => {
if (this.pendingConfirmations.has(clientId)) {
this.pendingConfirmations.delete(clientId);
resolve(false); // Default to 'false' if timeout occurs
}
}, 3000000); // 3000 seconds timeout
});
}
// Handle client's confirmation response
@SubscribeMessage("confirmation_response")
handleClientResponse(
@MessageBody() data: { confirmed: boolean },
@ConnectedSocket() client: Socket
) {
const resolve = this.pendingConfirmations.get(client.id);
if (resolve) {
resolve(data.confirmed);
this.pendingConfirmations.delete(client.id);
}
}
// Start the agent process
private async startAgentProcess(
client: Socket,
actionData: { agent: AgentName }
) {
const graph = await this.agentFactory.create({
agentName: actionData.agent,
});
const initialInput = { input: "hello world" };
// Thread
const graphStateConfig = {
configurable: { thread_id: "1" },
streamMode: "values" as const,
};
// Run the graph until the first interruption
for await (const event of await graph.stream(
initialInput,
graphStateConfig
)) {
this.logAndEmit(client, `--- ${event.input} ---`);
}
// Will log when the graph is interrupted, after step 2.
this.logAndEmit(client, "---GRAPH INTERRUPTED---");
const userConfirmed = await this.sendConfirmationRequest(client.id, {
message: "Do you want to proceed with this action?",
actionData,
});
if (userConfirmed) {
// If approved, continue the graph execution. We must pass `null` as
// the input here, or the graph will
for await (const event of await graph.stream(null, graphStateConfig)) {
this.logAndEmit(client, `--- ${event.input} ---`);
}
this.logAndEmit(client, "---ACTION EXECUTED---");
} else {
this.logAndEmit(client, "---ACTION CANCELLED---");
}
// Optionally disconnect the client
client.disconnect();
}
private logAndEmit(client: Socket, message: string) {
console.log(message);
client.emit("message", { message });
}
}
핵심은 간단하다. Socket이 연결되면 바로 agent를 생성하여 수행하고, 수행해서 interrupt 당하면 Client에게 confirmation request message를 보내고 기다린다. confirmation이 resolve되면 이어서 graph를 진행한다.
위 코드에서 사용한 agent는 langgraph 문서에 있는 아래 스텝 1 2 3 을 순차적으로 사용하는 에이전트이다.
const GraphState = Annotation.Root({
input: Annotation<string>,
});
const step1 = (state: typeof GraphState.State) => {
console.log("---Step 1---");
return state;
};
const step2 = (state: typeof GraphState.State) => {
console.log("---Step 2---");
return state;
};
const step3 = (state: typeof GraphState.State) => {
console.log("---Step 3---");
return state;
};
const builder = new StateGraph(GraphState)
.addNode("step1", step1)
.addNode("step2", step2)
.addNode("step3", step3)
.addEdge(START, "step1")
.addEdge("step1", "step2")
.addEdge("step2", "step3")
.addEdge("step3", END);
// Set up memory
const graphStateMemory = new MemorySaver();
const graph = builder.compile({
checkpointer: graphStateMemory,
interruptBefore: ["step3"],
});
return graph;
클라이언트
클라이언트에서는 훅을 만들어서 agent start와 그 상태를 관리한다.
import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";
export const useAgentSocket = () => {
const socketRef = useRef<Socket | null>(null);
const [confirmationRequest, setConfirmationRequest] = useState<any>(null);
const [messages, setMessages] = useState<string[]>([]);
const connectAndRun = (actionData: any) => {
return new Promise((resolve, reject) => {
socketRef.current = io("http://localhost:8000", {
path: "/agent/start",
transports: ["websocket", "polling"],
query: {
actionData: JSON.stringify(actionData),
},
});
socketRef.current.on("connect", () => {
console.log("Connected:", socketRef.current?.id);
resolve(void 0);
});
socketRef.current.on("connect_error", (error) => {
console.error("Connection error:", error);
reject(error);
});
// Listen for confirmation requests
socketRef.current.on("confirmation_request", (data) => {
setConfirmationRequest(data);
});
// Listen for messages
socketRef.current.on("message", (data) => {
console.log("Received message:", data);
setMessages((prevMessages) => [...prevMessages, data.message]);
});
socketRef.current.on("disconnect", () => {
console.log("Disconnected from server");
});
});
};
const sendConfirmationResponse = (confirmed: boolean) => {
if (socketRef.current) {
socketRef.current.emit("confirmation_response", { confirmed });
setConfirmationRequest(null);
}
};
const disconnectSocket = () => {
if (socketRef.current) {
socketRef.current.disconnect();
}
};
const clearMessages = () => {
setMessages([]);
};
return {
confirmationRequest,
sendConfirmationResponse,
connectAndRun,
disconnectSocket,
messages,
clearMessages,
};
};
커넥션을 맺고, confirmation request가 오면 confirmationRequest 상태를 업데이트한다. UI component에서 confirmationRequest 상태를 보고 유저에게 창을 띄워주면 된다.
Posted on November 13, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.