|
| 1 | +--- |
| 2 | +title: "以一次 RPC 请求为例探索 MOSN的 工作流程" |
| 3 | +linkTitle: "以一次 RPC 请求为例探索 MOSN 的工作流程" |
| 4 | +date: 2024-03-11 |
| 5 | +author: "[wangchengming666](https://github.com/wangchengming666)" |
| 6 | +description: "本文以工作中非常常见的一个思路为出发点,详细描述了 MOSN 内部网络转发的详细流程,可以帮助小伙伴加深对 MOSN 的理解。" |
| 7 | +aliases: "/zh/blog/posts/mosn-workflow" |
| 8 | +--- |
| 9 | + |
| 10 | +## 1. 前言 |
| 11 | + |
| 12 | +MOSN(Modular Open Smart Network)是一款主要使用 Go 语言开发的云原生网络代理平台,由蚂蚁集团开源并经过双11大促几十万容器的生产级验证。 |
| 13 | +MOSN 为服务提供多协议、模块化、智能化、安全的代理能力,融合了大量云原生通用组件,同时也可以集成 Envoy 作为网络库,具备高性能、易扩展的特点。MOSN 可以和 Istio 集成构建 Service Mesh,也可以作为独立的四、七层负载均衡,API Gateway、云原生 Ingress 等使用。 |
| 14 | + |
| 15 | +MOSN 作为数据面,整体 NET/IO、Protocol、Stream、Proxy 四个层次组成,其中 |
| 16 | +- NET/IO 用于底层的字节流传输 |
| 17 | +- Protocol 用于协议的 decode/encode |
| 18 | +- Stream 用于封装请求和响应,在一个 conn 上做连接复用 |
| 19 | +- Proxy 做 downstream 和 upstream 之间 stream 的转发 |
| 20 | + |
| 21 | +那么 MOSN 是如何工作的呢?下图展示的是使用 Sidecar 方式部署运行 MOSN 的示意图,您可以在配置文件中设置 MOSN 的上游和下游协议,协议可以在 HTTP、HTTP2.0、以及SOFA RPC 等中选择。 |
| 22 | + |
| 23 | +以上内容来自官网 https://mosn.io/ |
| 24 | + |
| 25 | +## 2. RPC 场景下 MOSN 的工作机制 |
| 26 | +RPC 场景下 MOSN 的工作机制示意图如下 |
| 27 | + |
| 28 | + |
| 29 | +我们简单理解一下上面这张图的意义: |
| 30 | +1. Server 端 MOSN 会将自身 ingress 的协议端口写入到注册中心 |
| 31 | +2. Client 端 MOSN 会从注册中心订阅地址列表,第一次订阅也会返回全量地址列表,端口号是 Server 端 ingress 绑定的端口号 |
| 32 | +3. 注册中心会实时推送地址列表变更到 Client 端(全量) |
| 33 | +4. Client 端发起rpc 调用时,请求会被 SDK 打到本地 Client 端 MOSN 的 egress 端口上 |
| 34 | +5. Client 端 MOSN 将 RPC 请求通过网络转发,将流量通过负载均衡转发到某一台 Server 端 MOSN 的 ingress 端口处理 |
| 35 | +6. 最终到了 Server 端 ingress listener,会转发给本地 Server 应用 |
| 36 | +7. 最终会根据原来的 TCP 链路返回 |
| 37 | + |
| 38 | +## 3. 全局视野下的 MOSN 工作流程 |
| 39 | + |
| 40 | + |
| 41 | + |
| 42 | +为了方便大家理解,我将以上时序图内容进行拆分,我们一一攻破。 |
| 43 | + |
| 44 | +### 3.1 建立连接 |
| 45 | +MOSN 在启动期间,会暴露本地 egress 端口接收 Client 的请求。MOSN 会开启 2 个协程,分别死循环去对 TCP 进行读取和写处理。MOSN 会通过读协程获取到请求字节流,进入 MOSN 的协议层处理。 |
| 46 | + |
| 47 | +``` |
| 48 | +// 代码路径 mosn.io/mosn/pkg/network/connection.go |
| 49 | +func (c *connection) Start(lctx context.Context) { |
| 50 | + // udp downstream connection do not use read/write loop |
| 51 | + if c.network == "udp" && c.rawConnection.RemoteAddr() == nil { |
| 52 | + return |
| 53 | + } |
| 54 | + c.startOnce.Do(func() { |
| 55 | + // UseNetpollMode = false |
| 56 | + if UseNetpollMode { |
| 57 | + c.attachEventLoop(lctx) |
| 58 | + } else { |
| 59 | + // 启动读/写循环 |
| 60 | + c.startRWLoop(lctx) |
| 61 | + } |
| 62 | + }) |
| 63 | +} |
| 64 | +
|
| 65 | +func (c *connection) startRWLoop(lctx context.Context) { |
| 66 | + // 标记读循环已经启动 |
| 67 | + c.internalLoopStarted = true |
| 68 | +
|
| 69 | + utils.GoWithRecover(func() { |
| 70 | + // 开始读操作 |
| 71 | + c.startReadLoop() |
| 72 | + }, func(r interface{}) { |
| 73 | + c.Close(api.NoFlush, api.LocalClose) |
| 74 | + }) |
| 75 | + // 省略。。。 |
| 76 | +} |
| 77 | +``` |
| 78 | +### 3.2 Protocol 处理 |
| 79 | +Protocol 作为多协议引擎层,对数据包进行检测,并使用对应协议做 decode/encode 处理。MOSN 会循环解码,一旦收到完整的报文就会创建与其关联的 xstream,用于保持 tcp 连接用于后续响应。 |
| 80 | +``` |
| 81 | +// 代码路径 mosn.io/mosn/pkg/stream/xprotocol/conn.go |
| 82 | +func (sc *streamConn) Dispatch(buf types.IoBuffer) { |
| 83 | + // decode frames |
| 84 | + for { |
| 85 | + // 协议 decode,比如 dubbo、bolt 协议等 |
| 86 | + frame, err := sc.protocol.Decode(streamCtx, buf) |
| 87 | +
|
| 88 | + if frame != nil { |
| 89 | + // 创建和请求 frame 关联的 xstream,用于保持 tcp 连接用于后续响应 |
| 90 | + sc.handleFrame(streamCtx, xframe) |
| 91 | + } |
| 92 | + } |
| 93 | +} |
| 94 | +
|
| 95 | +func (sc *streamConn) handleFrame(ctx context.Context, frame api.XFrame) { |
| 96 | + switch frame.GetStreamType() { |
| 97 | + case api.Request: |
| 98 | + // 创建和请求 frame 关联的 xstream,用于保持 tcp 连接用于后续响应,之后进入 proxy 层 |
| 99 | + sc.handleRequest(ctx, frame, false) |
| 100 | + } |
| 101 | +} |
| 102 | +
|
| 103 | +func (sc *streamConn) handleRequest(ctx context.Context, frame api.XFrame, oneway bool) { |
| 104 | + // 创建和请求 frame 关联的 xstream |
| 105 | + serverStream := sc.newServerStream(ctx, frame) |
| 106 | + // 进入 proxy 层并创建 downstream |
| 107 | + serverStream.receiver = sc.serverCallbacks.NewStreamDetect(serverStream.ctx, sender, span) |
| 108 | + serverStream.receiver.OnReceive(serverStream.ctx, frame.GetHeader(), frame.GetData(), nil) |
| 109 | +} |
| 110 | +``` |
| 111 | +### 3.3 Proxy 层处理 |
| 112 | +proxy 层负责 filter 请求/响应链、路由匹配、负载均衡最终将请求转发到集群的某台机器上。 |
| 113 | +#### 3.3.1 downStream 部分 |
| 114 | +``` |
| 115 | +// 代码路径 mosn.io/mosn/pkg/proxy/downstream.go |
| 116 | +func (s *downStream) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) { |
| 117 | + s.downstreamReqHeaders = headers |
| 118 | + // filter 请求/响应链、路由匹配、负载均衡 |
| 119 | + phase = s.receive(s.context, id, phase) |
| 120 | +} |
| 121 | +
|
| 122 | +func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase { |
| 123 | + for i := 0; i <= int(types.End-types.InitPhase); i++ { |
| 124 | + s.phase = phase |
| 125 | +
|
| 126 | + switch phase { |
| 127 | +
|
| 128 | + // downstream filter 相关逻辑 |
| 129 | + case types.DownFilter: |
| 130 | + s.printPhaseInfo(phase, id) |
| 131 | + s.tracks.StartTrack(track.StreamFilterBeforeRoute) |
| 132 | +
|
| 133 | + s.streamFilterChain.RunReceiverFilter(s.context, api.BeforeRoute, |
| 134 | + s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers, s.receiverFilterStatusHandler) |
| 135 | + s.tracks.EndTrack(track.StreamFilterBeforeRoute) |
| 136 | +
|
| 137 | + if p, err := s.processError(id); err != nil { |
| 138 | + return p |
| 139 | + } |
| 140 | + phase++ |
| 141 | +
|
| 142 | + // route 相关逻辑 |
| 143 | + case types.MatchRoute: |
| 144 | + s.printPhaseInfo(phase, id) |
| 145 | +
|
| 146 | + s.tracks.StartTrack(track.MatchRoute) |
| 147 | + s.matchRoute() |
| 148 | + s.tracks.EndTrack(track.MatchRoute) |
| 149 | +
|
| 150 | + if p, err := s.processError(id); err != nil { |
| 151 | + return p |
| 152 | + } |
| 153 | + phase++ |
| 154 | + |
| 155 | + // 在集群中选择一个机器、包含cluster和loadblance |
| 156 | + case types.ChooseHost: |
| 157 | + s.printPhaseInfo(phase, id) |
| 158 | +
|
| 159 | + s.tracks.StartTrack(track.LoadBalanceChooseHost) |
| 160 | + // 这里很重要,在选中一个机器之后,这里upstreamRequest对象有两个作用 |
| 161 | + // 1. 这里通过持有downstream保持着对客户端app的tcp引用,用来接收请求 |
| 162 | + // 2. 转发服务端tcp引用,转发客户端app请求以及响应服务端response时的通知 |
| 163 | + s.chooseHost(s.downstreamReqDataBuf == nil && s.downstreamReqTrailers == nil) |
| 164 | + s.tracks.EndTrack(track.LoadBalanceChooseHost) |
| 165 | +
|
| 166 | + if p, err := s.processError(id); err != nil { |
| 167 | + return p |
| 168 | + } |
| 169 | + phase++ |
| 170 | + } |
| 171 | + } |
| 172 | +} |
| 173 | +``` |
| 174 | +#### 3.3.2 upStream 部分 |
| 175 | +至此已经选中一台服务端的机器,开始准备转发。 |
| 176 | +``` |
| 177 | +// 代码路径 mosn.io/mosn/pkg/proxy/upstream.go |
| 178 | +func (r *upstreamRequest) appendHeaders(endStream bool) { |
| 179 | +
|
| 180 | + if r.downStream.oneway { |
| 181 | + _, streamSender, failReason = r.connPool.NewStream(r.downStream.context, nil) |
| 182 | + } else { |
| 183 | + // 会使用 ChooseHost 中选中的机器 host 创建 sender,xstream 是客户端的流对象 |
| 184 | + _, streamSender, failReason = r.connPool.NewStream(r.downStream.context, r) |
| 185 | + } |
| 186 | +} |
| 187 | +``` |
| 188 | +接下来会到达 conn.go 的 handleFrame 的 handleResponse 方法,此时 handleResponse 方法继续调用 downStream 的 receiveData 方法接收数据。 |
| 189 | +``` |
| 190 | +//代码路径 mosn.io/mosn/pkg/stream/xprotocol/conn.go |
| 191 | +func (sc *streamConn) handleFrame(ctx context.Context, frame api.XFrame) { |
| 192 | + switch frame.GetStreamType() { |
| 193 | + case api.Response: |
| 194 | + // 调用 downStream 的 receiveData 方法接收数据 |
| 195 | + // 因为 mosn 在转发之前修改了请求id,因此会重新 encode 请求 |
| 196 | + sc.handleResponse(ctx, frame) |
| 197 | + } |
| 198 | +} |
| 199 | +``` |
| 200 | +一旦准备好转发就会通过 upstreamRequest 选择的下游主机直接发送 write 请求,请求的协程此时会被阻塞。 |
| 201 | +``` |
| 202 | +// 代码路径 mosn.io/mosn/pkg/stream/xprotocol/stream.go |
| 203 | +func (s *xStream) endStream() { |
| 204 | + defer func() { |
| 205 | + if s.direction == stream.ServerStream { |
| 206 | + s.DestroyStream() |
| 207 | + } |
| 208 | + }() |
| 209 | +
|
| 210 | + if log.Proxy.GetLogLevel() >= log.DEBUG { |
| 211 | + log.Proxy.Debugf(s.ctx, "[stream] [xprotocol] connection %d endStream, direction = %d, requestId = %v", s.sc.netConn.ID(), s.direction, s.id) |
| 212 | + } |
| 213 | +
|
| 214 | + if s.frame != nil { |
| 215 | + // replace requestID |
| 216 | + s.frame.SetRequestId(s.id) |
| 217 | + // 因为 mosn 在转发之前修改了请求 id,因此会重新 encode 请求 |
| 218 | + buf, err := s.sc.protocol.Encode(s.ctx, s.frame) |
| 219 | + if err != nil { |
| 220 | + log.Proxy.Errorf(s.ctx, "[stream] [xprotocol] encode error:%s, requestId = %v", err.Error(), s.id) |
| 221 | + s.ResetStream(types.StreamLocalReset) |
| 222 | + return |
| 223 | + } |
| 224 | +
|
| 225 | + tracks := track.TrackBufferByContext(s.ctx).Tracks |
| 226 | +
|
| 227 | + tracks.StartTrack(track.NetworkDataWrite) |
| 228 | + // 一旦准备好转发就会通过upstreamRequest选择的下游主机直接发送 write 请求,请求的协程此时会被阻塞 |
| 229 | + err = s.sc.netConn.Write(buf) |
| 230 | + tracks.EndTrack(track.NetworkDataWrite) |
| 231 | + } |
| 232 | + } |
| 233 | +} |
| 234 | +``` |
| 235 | +### 3.4 准备将响应写回客户端 |
| 236 | +接下来客户端 xstream 将通过读协程接收响应的字节流,proxy.go 的 OnData 方法作为 proxy 层的数据接收点。 |
| 237 | +``` |
| 238 | +// 代码位置 mosn.io/mosn/pkg/proxy/proxy.go |
| 239 | +func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus { |
| 240 | + // 这里会做两件事 |
| 241 | + // 1. 调用 protocol 层进行decode |
| 242 | + // 2. 完成后通知upstreamRequest对象,唤醒downstream阻塞的协程 |
| 243 | + p.serverStreamConn.Dispatch(buf) |
| 244 | +
|
| 245 | + return api.Stop |
| 246 | +} |
| 247 | +
|
| 248 | +// 代码位置 mosn.io/mosn/pkg/proxy/upstream.go |
| 249 | +func (r *upstreamRequest) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) { |
| 250 | + // 结束当前stream |
| 251 | + r.endStream() |
| 252 | +
|
| 253 | + // 唤醒 |
| 254 | + r.downStream.sendNotify() |
| 255 | +} |
| 256 | +``` |
| 257 | +downstream 被唤醒处理收到的响应,重新替换回正确的请求ID,并调用 protocol 层重新编码成字节流写回客户端,最后销毁请求相关的资源,流程执行完毕。 |
| 258 | +``` |
| 259 | +// 比如我的 demo 是 dubbo 协议 |
| 260 | +func encodeFrame(ctx context.Context, frame *Frame) (types.IoBuffer, error) { |
| 261 | +
|
| 262 | + // 1. fast-path, use existed raw data |
| 263 | + if frame.rawData != nil { |
| 264 | + // 1.1 replace requestId |
| 265 | + binary.BigEndian.PutUint64(frame.rawData[IdIdx:], frame.Id) |
| 266 | +
|
| 267 | + // hack: increase the buffer count to avoid premature recycle |
| 268 | + frame.data.Count(1) |
| 269 | + return frame.data, nil |
| 270 | + } |
| 271 | +
|
| 272 | + // alloc encode buffer |
| 273 | + frameLen := int(HeaderLen + frame.DataLen) |
| 274 | + buf := buffer.GetIoBuffer(frameLen) |
| 275 | + // encode header |
| 276 | + buf.WriteByte(frame.Magic[0]) |
| 277 | + buf.WriteByte(frame.Magic[1]) |
| 278 | + buf.WriteByte(frame.Flag) |
| 279 | + buf.WriteByte(frame.Status) |
| 280 | + buf.WriteUint64(frame.Id) |
| 281 | + buf.WriteUint32(frame.DataLen) |
| 282 | + // encode payload |
| 283 | + buf.Write(frame.payload) |
| 284 | + return buf, nil |
| 285 | +} |
| 286 | +``` |
| 287 | +## 4. 总结 |
| 288 | +本文以工作中非常常见的一个思路为出发点,详细描述了 MOSN 内部网络转发的详细流程,可以帮助小伙伴加深对 MOSN 的理解。MOSN 是一款非常优秀的开源产品, |
| 289 | +MOSN 支持多种网络协议(如HTTP/2, gRPC, Dubbo等)并且能够很容易地增加对新协议的支持;MOSN 提供了丰富的流量治理功能,例如限流、熔断、重试、 |
| 290 | +负载均衡等;MOSN 在性能方面进行了大量优化,比如内存零拷贝、自适应缓冲区、连接池、协程池等,这些都有助于提升其在高并发环境下的表现。除此之外 |
| 291 | +MOSN 在连接管理方面,MOSN 设计了多协议连接池;在内存管理方面,MOSN 在 sync.Pool 之上封装了一层资源对的注册管理模块,可以方便的扩展各种类型的 |
| 292 | +对象进行复用和管理。总的来说,MOSN 的设计体现了可扩展性、高性能、安全性、以及对现代云环境的适应性等多方面的考虑。对于开发者来说,深入研究MOSN的 |
| 293 | +代码和架构,无疑可以学到很多关于高性能网络编程和云原生技术的知识。 |
| 294 | + |
| 295 | +- MOSN 官网:[https://mosn.io/](https://mosn.io/) |
| 296 | +- MOSN Github:[https://github.com/mosn/mosn](https://github.com/mosn/mosn) |
0 commit comments