うさラボ

お勉強と備忘録

LangGraph入門 ②MCPのtoolsを組み込んだエージェントを作る

概要

前回に引き続きエージェントを作っていきます

usage-automate.hatenablog.com

今回はMCPサーバと連携しTool実行をするエージェントを作成します。

MCPサーバはnetbox-mcps-serverを利用します。

github.com

前提

環境

今回作成したスクリプトを動かすための環境は以下です

項目 バージョン・内容
Python 3.12
langchain-core 0.3.76
langchain-openai 0.3.33
langgraph 0.6.7
langchain-mcp-adapters 0.1.9
netbox-mcp-server -

環境構築

前回と同じ環境を利用するため、langchain-mcp-adaptersとnetbox-mcp-serverのセットアップのみ行います。

uv add langchain-mcp-adapters

NetBoxのMCPサーバを利用するためにGithubからリポジトリをクローンします。

$ git clone https://github.com/netboxlabs/netbox-mcp-server.git
Cloning into 'netbox-mcp-server'...
remote: Enumerating objects: 65, done.
remote: Counting objects: 100% (39/39), done.
remote: Compressing objects: 100% (22/22), done.
remote: Total 65 (delta 23), reused 19 (delta 17), pack-reused 26 (from 1)
Receiving objects: 100% (65/65), 56.72 KiB | 184.00 KiB/s, done.
Resolving deltas: 100% (25/25), done.

NetBoxはNetBoxLabsでFreeTrial版を利用します。
デモデータも入っているのでテストもしやすいです。

https://netboxlabs.com/

[Start for free]から払い出しを実施し、API Tokenの払い出しまで完了させておきます。

スクリプト解説

インポート

前回のスクリプトからMCPを扱うためのMultiServerMCPClientとTool利用をするためのToolNodetools_conditionをインポートします

import asyncio
from typing import Sequence, Optional
from typing_extensions import Annotated, TypedDict

from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.messages import BaseMessage
from langgraph.pregel import Pregel
from langgraph.graph.message import add_messages
from langgraph.graph.state import StateGraph, START, END

# 追加
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import ToolNode, tools_condition

グラフ

エージェントも更新をします。
①call_modelノードではtoolsをbindすることでLLMに利用できるツールを送信することが可能です。

②次にグラフにTool利用のノードとエッジを追加します。
tools_conditionはステートのmessagesに格納された最後の要素にtool用の変数があればtoolを起動するといった処理するノードのようです。 https://github.com/langchain-ai/langgraph/blob/main/libs/prebuilt/langgraph/prebuilt/tool_node.py#L840

③最後に、起動ですがToolの利用には非同期に処理をさせる必要があるため追加しています。

class SampleAgent:
    (省略)
    # ①
    def call_model(self, state: AgentState):
        """モデル呼び出し"""
        chain = self.llm.bind_tools(self.tools)
        response = chain.invoke(state["messages"])
        return {"messages": [response]}

    (省略)
    # ②
    def create_graph(self) -> Pregel:
        """グラフ作成"""
        graph = StateGraph(AgentState)
        graph.add_node("call_model", self.call_model)
        graph.add_node("tools", ToolNode(self.tools))
        graph.add_node("reflection", self.reflection)

        graph.add_edge(START, "call_model")
        graph.add_conditional_edges("call_model", tools_condition)
        graph.add_edge("tools", "reflection")
        graph.add_conditional_edges(
            "reflection", self.should_continue, {"continue": "call_model", "end": END}
        )
        app = graph.compile()
        return app

    (省略)
    # ③
    async def arun(self, input):
        """グラフ起動(Tool利用には非同期が必要)"""
        return await self.app.ainvoke(input)

起動

MCPサーバを起動するにはlangchain_mcp_adaptersライブラリのMultiServerMCPClientを利用します。ClineやClaude DesktopでMCPの設定をするのに似ています。transportにはスクリプトを直接起動するstdioモードを指定します。ほかにもstreamable-httpやsseなどのモードが存在します。MCPサーバがリモートにある場合はstreamable-httpかsseを利用します。

NetBoxのMCPサーバを利用するにはURLと事前に発行したTokenを定義する必要があります。

async def main():
    mcps = MultiServerMCPClient(
        {
            "netbox":{
                "command": "uv",
                "args": [
                    "run",
                    "netbox-mcp-server/server.py",
                ],
                "transport": "stdio",
                "env": {
                    "NETBOX_TOKEN":"XXXXXXXXXXXXXXXXXXXXXXXXXXXX",
                    "NETBOX_URL": "https://XXXXXXXXXXXXXX.netboxapp.com/"
                }
            }
        }
    )
    tools = await mcps.get_tools()
    print("==Tool一覧" + "="*20)
    [print(i.name) for i in tools]
    print("="*30)

    llm = ChatOpenAI(model="gpt-4o")
    agent = SampleAgent(llm, tools)

    result = await agent.arun(
        {
            "messages": (
                "NetBoxで"
                "192.168.1.1ってどのテナントで使ってますか?"
                ),
            "challenge_count": 0,
            "complete": False,
        }
    )
    print("="*30)
    print(result["messages"][-1].content)
    print("="*30)


if __name__ == "__main__":
    asyncio.run(main())

スクリプト全文

スクリプト全文

import asyncio
from typing import Sequence, Optional
from typing_extensions import Annotated, TypedDict

from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.messages import BaseMessage
from langgraph.pregel import Pregel
from langgraph.graph.message import add_messages
from langgraph.graph.state import StateGraph, START, END

# 追加
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import ToolNode, tools_condition

# NOTE: ステート定義
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    challenge_count: int
    answer: str
    complete: bool

class Reflection(BaseModel):
    advice: str = Field(
        ...,
        description=("内省結果をアドバイスしてください",),
    )
    is_completed: bool = Field(
        ...,
        description="回答の評価結果",
    )


MAX_CHALLENGE_COUNT = 3


class SampleAgent:
    def __init__(self, llm, tools: Optional[list] = []) -> None:
        self.llm = llm
        self.tools = tools
        self.app = self.create_graph()

    def should_continue(self, state: AgentState):
        """継続判断"""
        if state["complete"] or state["challenge_count"] >= MAX_CHALLENGE_COUNT:
            return "end"
        else:
            return "continue"

    def call_model(self, state: AgentState):
        """モデル呼び出し"""
        chain = self.llm.bind_tools(self.tools)
        response = chain.invoke(state["messages"])
        return {"messages": [response]}

    def reflection(self, state: AgentState):
        """内省"""
        parser = PydanticOutputParser(pydantic_object=Reflection)
        format_instructions = parser.get_format_instructions()
        prompt_template = PromptTemplate.from_template(
            "<input>{input}</input>\n"
            "<last_message>{last_message}</last_message>\n"
            "<output_format>{format_instructions}</output_format>\n"
        )
        prompt = prompt_template.partial(format_instructions=format_instructions)
        chain = prompt | self.llm | parser
        response = chain.invoke(
            {
                "last_message": state["messages"][-1].content,
                "input": state["messages"][0].content,
            }
        )

        is_completed = response.is_completed

        return {
            "messages": [response.advice],
            "challenge_count": state["challenge_count"] + 1,
            "complete": is_completed,
        }

    def create_graph(self) -> Pregel:
        """グラフ作成"""
        graph = StateGraph(AgentState)
        graph.add_node("call_model", self.call_model)
        graph.add_node("tools", ToolNode(self.tools))
        graph.add_node("reflection", self.reflection)

        graph.add_edge(START, "call_model")
        graph.add_conditional_edges("call_model", tools_condition)
        graph.add_edge("tools", "reflection")
        graph.add_conditional_edges(
            "reflection", self.should_continue, {"continue": "call_model", "end": END}
        )
        app = graph.compile()
        return app

    def run(self, input):
        """グラフ起動"""
        return self.app.invoke(input)

    async def arun(self, input):
        """グラフ起動(Tool利用には非同期が必要)"""
        return await self.app.ainvoke(input)


async def main():
    mcps = MultiServerMCPClient(
        {
            "netbox":{
                "command": "uv",
                "args": [
                    "run",
                    "netbox-mcp-server/server.py",
                ],
                "transport": "stdio",
                "env": {
                    "NETBOX_TOKEN":"XXXXXXXXXXXXXXXXXXXXXXXXXXXX",
                    "NETBOX_URL": "https://XXXXXXXXXXXXXX.netboxapp.com/"
                }
            }
        }
    )
    tools = await mcps.get_tools()
    print("==Tool一覧" + "="*20)
    [print(i.name) for i in tools]
    print("="*30)

    llm = ChatOpenAI(model="gpt-4o")
    agent = SampleAgent(llm, tools)

    result = await agent.arun(
        {
            "messages": (
                "NetBoxで"
                "192.168.1.1ってどのテナントで使ってますか?"
                ),
            "challenge_count": 0,
            "complete": False,
        }
    )
    print("="*30)
    print(result["messages"][-1].content)
    print("="*30)


if __name__ == "__main__":
    asyncio.run(main())

出力

コードを追加したスクリプトを実行します。 今回は、192.168.1.1に紐づいたテナントを聞いています。 (NetBoxで..とつけているのが小賢しいですね)

NetBoxで192.168.1.1ってどのテナントで使ってますか?

事前に答えを確認しておきます。

Consultingというテナントと紐づいているようです。

実行して確認します。

$ uv run main.py 
Using selector: EpollSelector
Received message: root=InitializedNotification(method='notifications/initialized', params=None, jsonrpc='2.0')
Received message: <mcp.shared.session.RequestResponder object at 0x7472f44e91f0>
Processing request of type ListToolsRequest
Dispatching request of type ListToolsRequest
Response sent
==Tool一覧====================
netbox_get_objects
netbox_get_object_by_id
netbox_get_changelogs
==============================
Using selector: EpollSelector
Received message: root=InitializedNotification(method='notifications/initialized', params=None, jsonrpc='2.0')
Received message: <mcp.shared.session.RequestResponder object at 0x712d91b0b0b0>
Processing request of type CallToolRequest
Dispatching request of type CallToolRequest
Tool cache miss for netbox_get_objects, refreshing cache
Response sent
Using selector: EpollSelector
Received message: root=InitializedNotification(method='notifications/initialized', params=None, jsonrpc='2.0')
Received message: <mcp.shared.session.RequestResponder object at 0x701c5f34afc0>
Processing request of type CallToolRequest
Dispatching request of type CallToolRequest
Tool cache miss for netbox_get_objects, refreshing cache
Starting new HTTPS connection (1): zwpr4045.cloud.netboxapp.com:443
https://zwpr4045.cloud.netboxapp.com:443 "GET /api/ipam/ip-addresses/?address=192.168.1.1 HTTP/1.1" 200 None
Response sent
Received message: <mcp.shared.session.RequestResponder object at 0x701c5f4ebc50>
Processing request of type ListToolsRequest
Dispatching request of type ListToolsRequest
Response sent
==============================
IPアドレス192.168.1.1は、'Consulting'というテナントで使用されています。
==============================

MCPからツールを取得
netbox_mcp_serversで提供されているツールが取得できていることが確認できます。

==Tool一覧====================
netbox_get_objects
netbox_get_object_by_id
netbox_get_changelogs
==============================

回答
ツールを使って生成したであろう回答が確認できました。

==============================
IPアドレス192.168.1.1は、'Consulting'というテナントで使用されています。
==============================

そのほかの出力はMCPサーバ(スクリプト)の起動ログや通信ログになります。

回答は想定通りもらえました。

LangSmithを確認してみます

Tool利用が一度失敗していますね。プロンプトの調整することで精度をあげることはできそうです。

内省後の再実施でToolの実行が成功していることも確認できました。

まとめ

MCPサーバと連携して動くエージェントを作成することができました。
前回に引き続き今回作った内容もLangGraphのライブラリ内で提供されているcreate_react_agentでカバーできる範囲です。今後はもう少し複雑なエージェントにも挑戦してみたいと思います。