「eino」(发音类似 "I know")是 CloudWeGo 团队开源的 Go 语言 LLM 应用开发框架,基于 Apache 2.0 许可证。框架设计参考了 LangChain、LlamaIndex 等开源框架,针对 Go 语言特性进行了优化,强调 「简洁性、可扩展性和可靠性」。
┌─────────────────────────────────────────────────────────────────────┐
│ 用户代码 │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Flow 层 (预制流程) │
│ - ReAct Agent、RAG Retriever 等开箱即用的高层抽象 │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Compose 层 (编排引擎) │
│ ┌─────────┐ ┌─────────┐ ┌──────────┐ │
│ │ Chain │ │ Graph │ │ Workflow │ ← 三种编排模式 │
│ └─────────┘ └─────────┘ └──────────┘ │
│ │ │ │ │
│ └──────────────┼───────────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Runnable │ ← 统一执行抽象 │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Components │ │ Callbacks │ │ Schema │
│ (组件接口) │ │ (回调系统) │ │ (数据结构) │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌───────────────┼───────────────┬───────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ChatModel│ │ Tool │ │Retriever │ │Embedding │
└────────┘ └──────────┘ └──────────┘ └──────────┘
schema/)Schema 层定义了框架的核心数据结构,是组件间通信的基础。
schema/message.go)// Message 是 ChatModel 交互的核心数据结构
type Message struct {
Role RoleType // system/user/assistant/tool
Content string // 文本内容
MultiContent []ChatMessagePart // 多模态内容(图片/音频/视频)
ToolCalls []ToolCall // 工具调用请求
ToolCallID string // 工具响应ID
ResponseMeta *ResponseMeta // 模型响应元信息(token用量等)
Extra map[string]any // 扩展字段
}
// 支持的角色类型
const (
Assistant RoleType = "assistant"// 模型响应
User RoleType = "user" // 用户输入
System RoleType = "system" // 系统提示
Tool RoleType = "tool" // 工具输出
)
「关键特性」:
schema/stream.go)// StreamReader 流式读取器
type StreamReader[T any] struct {
ch <-chan T // 数据通道
closed *atomic.Bool // 关闭标志
err error // 错误信息
// ...
}
// StreamWriter 流式写入器
type StreamWriter[T any] struct {
ch chan<- T
closed *atomic.Bool
// ...
}
// 创建管道
func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T])
// 流操作
func (sr *StreamReader[T]) Recv() (T, error) // 接收数据
func (sr *StreamReader[T]) Copy(n int) []*StreamReader[T] // 复制流
func (sr *StreamReader[T]) Close() // 关闭流
「设计亮点」:
components/)Components 层定义了所有组件的标准接口。
components/
├── model/ # ChatModel 接口
├── tool/ # Tool 接口
├── prompt/ # ChatTemplate 接口
├── retriever/ # Retriever 接口
├── embedding/ # Embedding 接口
├── indexer/ # Indexer 接口
└── document/ # Loader/Transformer 接口
const (
ComponentOfPrompt = "ChatTemplate"
ComponentOfChatModel = "ChatModel"
ComponentOfEmbedding = "Embedding"
ComponentOfIndexer = "Indexer"
ComponentOfRetriever = "Retriever"
ComponentOfLoader = "Loader"
ComponentOfTransformer = "DocumentTransformer"
ComponentOfTool = "Tool"
)
components/model/interface.go)// BaseChatModel 基础对话模型接口
type BaseChatModel interface {
// Generate 同步生成
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
// Stream 流式生成
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.StreamReader[*schema.Message], error)
}
// ToolCallingChatModel 支持工具调用的模型
type ToolCallingChatModel interface {
BaseChatModel
WithTools(tools []*schema.ToolInfo) (ToolCallingChatModel, error)
}
components/tool/interface.go)// BaseTool 基础工具接口
type BaseTool interface {
Info(ctx context.Context) (*schema.ToolInfo, error)
}
// InvokableTool 可调用工具
type InvokableTool interface {
BaseTool
InvokableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (string, error)
}
// StreamableTool 流式工具
type StreamableTool interface {
BaseTool
StreamableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (*schema.StreamReader[string], error)
}
// Retriever 检索器接口
type Retriever interface {
Retrieve(ctx context.Context, query string, opts ...Option) ([]*schema.Document, error)
}
// Embedder 嵌入接口
type Embedder interface {
EmbedStrings(ctx context.Context, texts []string, opts ...Option) ([][]float64, error)
}
// Indexer 索引器接口
type Indexer interface {
Store(ctx context.Context, docs []*schema.Document, opts ...Option) ([]string, error)
}
// ChatTemplate 提示词模板接口
type ChatTemplate interface {
Format(ctx context.Context, vs map[string]any, opts ...Option) ([]*schema.Message, error)
}
compose/)Compose 层是 eino 的核心,提供三种编排模式和统一的 Runnable 抽象。
compose/runnable.go)// Runnable 是所有可执行单元的核心抽象
type Runnable[I, O any] interface {
// Invoke 同步调用:单进单出
Invoke(ctx context.Context, input I, opts ...Option) (O, error)
// Stream 流式输出:单进流出
Stream(ctx context.Context, input I, opts ...Option) (*schema.StreamReader[O], error)
// Collect 流式输入:流进单出
Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (O, error)
// Transform 流转流:流进流出
Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (*schema.StreamReader[O], error)
}
「四种数据流模式」:
模式 | 输入 | 输出 | 说明 |
|---|---|---|---|
Invoke | I | O | 单进单出 |
Stream | I | StreamReader[O] | 单进流出 |
Collect | StreamReader[I] | O | 流进单出 |
Transform | StreamReader[I] | StreamReader[O] | 流进流出 |
「自动转换机制」:
只实现 Stream → 自动生成 Invoke(合并流输出)
只实现 Invoke → 自动生成 Stream(单值包装)
只实现 Transform → 自动生成 Collect/Stream/Invoke
compose/graph.go)// 创建图
graph := NewGraph[InputType, OutputType](opts...)
// 添加节点
graph.AddChatModelNode("model", chatModel)
graph.AddToolsNode("tools", toolsNode)
graph.AddLambdaNode("process", lambda)
graph.AddRetrieverNode("retriever", retriever)
// 添加边
graph.AddEdge(START, "model")
graph.AddEdge("model", "tools")
graph.AddEdge("tools", END)
// 添加条件分支
graph.AddBranch("model", NewGraphBranch(
func(ctx context.Context, msg *schema.Message) (string, error) {
iflen(msg.ToolCalls) > 0 {
return"tools", nil
}
return END, nil
},
map[string]bool{"tools": true, END: true},
))
// 编译执行
runnable, err := graph.Compile(ctx)
result, err := runnable.Invoke(ctx, input)
「图运行模式」:
「执行流程」:
┌─────────────────────────────────────────────────────────────────┐
│ Graph.Compile() │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. buildEdges() // 构建边关系 │
│ 2. buildNodes() // 构建节点 │
│ 3. checkDAG() // 检查是否为 DAG │
│ 4. topologicalSort() // 拓扑排序 │
│ 5. → CompiledGraph // 返回可执行图 │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ CompiledGraph.Invoke() │
├─────────────────────────────────────────────────────────────────┤
│ │
│ START → node1 → branch → node2 → ... → END │
│ │ │
│ └→ node3 (条件分支) │
│ │
└─────────────────────────────────────────────────────────────────┘
compose/chain.go)// Builder 模式的流畅 API
chain := NewChain[string, *schema.Message]()
chain.AppendChatTemplate(template).
AppendChatModel(model).
AppendToolsNode(tools).
AppendLambda(postProcess)
// 支持并行节点
chain.AppendParallel(
NewChain[I, O]().AppendLambda(func1),
NewChain[I, O]().AppendLambda(func2),
)
// 支持条件分支
chain.AppendBranch(condition, branches)
// 编译执行
runnable, err := chain.Compile(ctx)
result, err := runnable.Invoke(ctx, input)
「执行流程」:
输入 → [Template] → [ChatModel] → [ToolsNode] → [Lambda] → 输出
compose/workflow.go)// 声明式工作流
wf := NewWorkflow[Input, Output]()
// 添加节点
modelNode := wf.AddChatModelNode("model", chatModel)
processNode := wf.AddLambdaNode("process", lambda)
outputNode := wf.AddLambdaNode("output", outputLambda)
// 声明依赖和字段映射
processNode.AddInput("model", MapFields("content", "query"))
outputNode.AddInput("process", MapFields("result", "data"))
// 设置输出
wf.End().AddInput("output")
// 编译执行
runnable, err := wf.Compile(ctx)
「Workflow 特点」:
compose/field_mapping.go)eino 提供了 「三种数据流转方式」,根据不同编排方式选择:
Chain 中的节点 「输出直接作为下一节点的输入」,前提是类型兼容:
// Chain[输入类型, 输出类型]
chain := NewChain[map[string]any, *Message]().
AppendChatTemplate(prompt). // map[string]any → []*Message
AppendChatModel(model). // []*Message → *Message
Compile(ctx)
节点输出 → 下一节点输入,「类型必须匹配」,在编译期检查。
graph := NewGraph[map[string]any, *Message]()
graph.AddEdge("node_template", "node_model") // 输出直接传入
AddEdge 要求:「前节点输出类型 = 后节点输入类型」
这是 eino 独特的解决方案,当 「节点类型不一致」 时,使用 「FieldMapping」 进行字段映射:
wf := NewWorkflow[InputStruct, OutputStruct]()
// 节点 A 输出:struct { Name string; Age int }
// 节点 B 输入:struct { UserName string; UserAge int }
// 显式映射字段
nodeB := wf.AddLambdaNode("B", myLambda)
nodeB.AddInput("A",
MapFields("Name", "UserName"), // A.Name → B.UserName
MapFields("Age", "UserAge"), // A.Age → B.UserAge
)
「FieldMapping 的几种方式」:
字段到字段「多节点输出汇聚示例」:
type MergedInput struct {
FromA string
FromB int
}
nodeC := wf.AddLambdaNode("C", func(ctx context.Context, in MergedInput) (out, error) {
// in.FromA 来自节点 A
// in.FromB 来自节点 B
})
nodeC.AddInput("A", ToField("FromA")) // A 的输出 → C.FromA
nodeC.AddInput("B", ToField("FromB")) // B 的输出 → C.FromB
// validateFieldMapping 在编译期校验映射合法性
func validateFieldMapping(predecessorType, successorType reflect.Type, mappings []*FieldMapping) error {
// 检查字段是否存在
// 检查类型是否可赋值
// 不匹配则编译报错
}
// fieldMap 在运行时执行实际的字段提取和组装
func fieldMap(mappings []*FieldMapping) func(any) (map[string]any, error) {
// 根据 mapping 从前节点输出提取字段
// 组装成 map[string]any
// 再转换为后节点的输入类型
}
「核心特点」:eino 通过 「FieldMapping」 在节点类型不一致时做显式字段映射,并在编译期验证映射合法性,实现「显式映射、编译期安全」。
compose/branch.go)// 单选分支
branch := NewGraphBranch(
func(ctx context.Context, in T) (string, error) {
return"nodeA", nil// 返回下一个节点key
},
map[string]bool{"nodeA": true, "nodeB": true},
)
// 多选分支(并行执行多个分支)
multiBranch := NewGraphMultiBranch(
func(ctx context.Context, in T) (map[string]bool, error) {
returnmap[string]bool{"nodeA": true, "nodeB": true}, nil
},
endNodes,
)
// 流式分支(可以只读第一个chunk来决策)
streamBranch := NewStreamGraphBranch(
func(ctx context.Context, sr *schema.StreamReader[T]) (string, error) {
first, _ := sr.Recv()
return decideByFirst(first), nil
},
endNodes,
)
compose/state.go)// 创建带状态的图
graph := NewGraph[I, O](
WithGenLocalState(func(ctx context.Context) *MyState {
return &MyState{Count: 0}
}),
)
// 状态前置处理器
preHandler := NewStatePreHandler(func(ctx context.Context, input I, state *MyState) (I, error) {
state.Count++
return input, nil
})
// 在节点中安全访问状态
ProcessState[*MyState](ctx, func(ctx context.Context, state *MyState) error {
state.Data = "updated"
returnnil
})
compose/types_lambda.go)// 同步 Lambda
InvokableLambda(func(ctx context.Context, input string) (string, error) {
return input + " processed", nil
})
// 流式输出 Lambda
StreamableLambda(func(ctx context.Context, input string) (*schema.StreamReader[string], error) {
// 返回流式输出
})
// 流式输入 Lambda
CollectableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (string, error) {
// 收集流式输入
})
// 流转流 Lambda
TransformableLambda(func(ctx context.Context, input *schema.StreamReader[I]) (*schema.StreamReader[O], error) {
// 流转流
})
// 完整 Lambda(实现全部四种模式)
AnyLambda(invoke, stream, collect, transform)
compose/tool_node.go)type ToolsNodeConfig struct {
Tools []tool.BaseTool // 工具列表
UnknownToolsHandler func(...) // 未知工具处理
ExecuteSequentially bool // 顺序执行(默认并行)
}
// 创建
toolsNode, err := NewToolNode(ctx, &config)
// 执行
// Input: *schema.Message (包含 ToolCalls)
// Output: []*schema.Message (每个工具的响应)
「关键设计」:
callbacks/)callbacks/interface.go)// Handler 回调处理器接口
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}
// RunInfo 运行信息
type RunInfo struct {
Name string // 节点名称
Type string // 组件实现类型
Component components.Component // 组件类型枚举
}
callbacks/handler_builder.go)handler := NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
log.Printf("Start: %s, Type: %s", info.Name, info.Type)
return ctx
}).
OnEndFn(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
log.Printf("End: %s", info.Name)
return ctx
}).
OnErrorFn(func(ctx context.Context, info *RunInfo, err error) context.Context {
log.Printf("Error in %s: %v", info.Name, err)
return ctx
}).
Build()
// 使用回调
runnable.Invoke(ctx, input, WithCallbacks(handler))
时机 | 说明 |
|---|---|
OnStart | 节点执行开始 |
OnEnd | 节点执行结束 |
OnError | 发生错误 |
OnStartWithStreamInput | 流式输入开始 |
OnEndWithStreamOutput | 流式输出结束 |
// 添加全局回调(进程启动时)
callbacks.AppendGlobalHandlers(handler1, handler2)
flow/)flow/agent/react/)agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: chatModel,
ToolsConfig: compose.ToolsNodeConfig{
Tools: tools,
},
MaxStep: 12,
MessageModifier: func(ctx context.Context, msgs []*schema.Message) []*schema.Message {
// 添加系统提示等
returnappend([]*schema.Message{systemMessage}, msgs...)
},
})
// 同步执行
response, err := agent.Generate(ctx, []*schema.Message{schema.UserMessage("query")})
// 流式执行
stream, err := agent.Stream(ctx, messages)
「ReAct 工作流」:
┌──────────────────┐
│ │
START ──► ChatModel ──► Branch ──► ToolsNode ──┘
│
└──► END (无工具调用时)
「eino 是 LLM 应用开发 SDK」,提供组件抽象和编排能力:
┌───────────────────────────────────────────────────────────────┐
│ 用户代码 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 使用 eino 组件和编排能力构建 LLM 应用 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Flow 层:ReAct Agent、RAG 等预制流程 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Compose 层:Chain / Graph / Workflow 编排 │ │
│ │ ← 业务主要关注这一层 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Components 层:标准化组件接口 │ │
│ └─────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────┘
「适用场景」:
「核心理念」:Runnable 抽象 + 流式优先 + 类型安全
┌─────────────────────────────────────────┐
│ 统一 Runnable 抽象 │
│ └─ 四种流式范式自动互转 │
│ └─ 组件接口标准化 │
│ └─ 泛型类型安全 │
└─────────────────────────────────────────┘
「关键设计决策」:
决策 | 理由 |
|---|---|
不内置 Server | SDK 定位,用户自由选择服务框架 |
Runnable 四范式 | 覆盖所有流式场景,自动转换 |
三种编排模式 | Chain 简单直观,Graph 灵活强大,Workflow 声明式 |
泛型组件接口 | 编译期类型检查,减少运行时错误 |
回调而非 Event | 同步回调更简单,足够可观测 |
「优势」:
「局限」:
掌握这条主线,就理解了框架的设计精髓。