Context上下文
Context
Context 概要
Go 1.7 标准库引入 Context,中文名为上下文,是一个跨 API 和进程用来传递截止日期、取消信号和请求范围的值的接口。
context.Context 定义如下:
1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
Deadline()返回一个完成工作的截止时间,表示上下文应该被取消的时间。如果 ok==false 表示没有设置截止时间。
Done()返回一个 Channel,这个 Channel 会在当前工作完成时被关闭,表示上下文应该被取消。如果无法取消此上下文,则 Done 可能返回 nil。多次调用 Done 方法会返回同一个 Channel。
- Err()返回 Context 结束的原因,它只会在 Done 方法对应的 Channel 关闭时返回非空值。如果 Context 被取消,会返回context.Canceled 错误;如果 Context 超时,会返回context.DeadlineExceeded错误。
- Value()从 Context 中获取键对应的值。如果未设置 key 对应的值则返回 nil。以相同 key 多次调用会返回相同的结果。
两个创建空 Context 的函数:
1
2
3
4
5
6
7
// TODO 返回一个非 nil 但空的上下文。
// 当不清楚要使用哪种上下文或无可用上下文尚应使用 context.TODO。
func TODO() Context
// Background 返回一个非 nil 但空的上下文。
// 它不会被 cancel,没有值,也没有截止时间。它通常由 main 函数、初始化和测试使用,并作为处理请求的顶级上下文。
func Background() Context
四个基于父级创建不同类型 Context 的函数:
1
2
3
4
5
6
7
8
9
10
11
// WithCancel 基于父级创建一个具有 Done channel 的 context
func WithCancel(parent Context) (Context, CancelFunc)
// WithDeadline 基于父级创建一个不晚于 d 结束的 context
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
// WithTimeout 等同于 WithDeadline(parent, time.Now().Add(timeout))
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
// WithValue 基于父级创建一个包含指定 key 和 value 的 context
func WithValue(parent Context, key, val interface{}) Context
Context 目的
在 Go 的服务里,通常每来一个请求都会启动若干个goroutine 同时工作:有些执行业务逻辑,有些去数据库拿数据,有些调用下游接口获取相关数据…
协程 a 生 b c d,c 生 e,e 生 f。父协程与子孙协程之间是关联在一起的,他们需要共享请求的相关信息,比如用户登录态,请求超时时间等。如何将这些协程联系在一起,context 应运而生。
要将这些协程关联在一起的原因: 以超时为例,当请求被取消或是处理时间太长,这有可能是使用者关闭了浏览器或是已经超过了请求方规定的超时时间,请求方直接放弃了这次请求结果。此时所有正在为这个请求工作的 goroutine 都需要快速退出,因为它们的“工作成果”不再被需要了。在相关联的 goroutine 都退出后,系统就可以回收相关资源了。
Context 的作用是为了在一组 goroutines 间传递上下文信息(cancel signal,deadline,request-scoped value)以达到对它们的管理控制。
Context 实现
context 包中定义了 Context 接口后,并且给出了四个实现,分别是:
- emptyCtx
- cancelCtx
- timerCtx
- valueCtx
可以根据不同场景选择使用不同的 Context。
Context 用法
使用建议
一般情况下,使用Background()获取一个空的 Context 作为根节点,有了根结点 Context,便可以根据不同的业务场景选择使用如下四个函数创建对应类型的子 Context。
1
2
3
4
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
1.不要将 Context 塞到结构体里;直接将 Context 类型作为函数的第一参数,且命名为 ctx。
2.不要向函数传入一个 nil Context,如果你实在不知道传哪个 Context 请传 context.TODO。
3.不要把本应该作为函数参数的数据放到 Context 中传给函数,Context 只存储请求范围内在不同进程和 API 间共享的数据(如登录信息 Cookie)。
4.同一个 context 可能会被传递到多个 goroutine,context 是并发安全的。
这些 context 函数主要用于在 Go 程序中管理请求的生命周期,特别是在并发或网络编程中,确保资源在适当时机释放,避免阻塞或过时的操作。它们的使用场景如下:
WithCancel:- 用于创建一个可以手动取消的上下文。通常在需要控制操作是否被取消时使用。
- 例子:当你有多个 goroutine,并且希望能够在某个时刻取消所有 goroutine 时使用。
1 2 3 4 5 6 7
goCopy Codectx, cancel := context.WithCancel(parentCtx) go func() { // 执行任务 if someCondition { cancel() // 取消任务 } }()- 如果
ctx是一个不支持取消的上下文(例如context.Background()或context.TODO()),则ctx.Done()永远不会关闭,相关逻辑不会生效。 - 如果
ctx是由context.WithCancel、context.WithDeadline或context.WithTimeout创建的,则ctx.Done()可以正常工作,关闭 channel 表示上下文被取消或超时。
WithDeadline:- 用于设置一个具体的截止时间,超过这个时间,操作会被自动取消。适用于需要在特定时间之前完成的任务。
- 例子:执行一个网络请求时,确保不会超时。
1 2
goCopy Codectx, cancel := context.WithDeadline(parentCtx, time.Now().Add(5*time.Second)) defer cancel()
WithTimeout:- 与
WithDeadline类似,但它是基于持续时间来设置超时,而不是固定时间点。适用于需要设定最大执行时间的场景。 - 例子:执行一个数据库查询时,设置最大超时时间。
1 2
goCopy Codectx, cancel := context.WithTimeout(parentCtx, 5*time.Second) defer cancel()
- 与
WithValue:- 用于在上下文中传递数据,适合跨多个函数或 goroutine 传递请求相关的信息(如用户认证信息、请求 ID)。
- 例子:在 HTTP 请求处理链中传递用户 ID。
1
goCopy Codectx := context.WithValue(parentCtx, "userID", 123)
这些函数让你能够灵活地管理上下文的生命周期,控制任务的取消和超时,并在请求上下文中传递必要的数据。
传递共享的数据
现实场景中可能是从一个 HTTP 请求中获取到 Request-ID。所以,下面这个样例可能更适合:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
const requestIDKey int = 0
// 中间件:提取 Request ID 并注入上下文
func WithRequestID(next http.Handler) http.Handler {
return http.HandlerFunc(
func(rw http.ResponseWriter, req *http.Request) {
// 从 header 中提取 request-id
reqID := req.Header.Get("X-Request-ID")
// 创建 valueCtx。使用自定义的类型,不容易冲突
ctx := context.WithValue(
req.Context(), requestIDKey, reqID)
// 创建新的请求
req = req.WithContext(ctx)
// 调用 HTTP 处理函数
next.ServeHTTP(rw, req)
}
)
}
// 从上下文中获取 Request ID
// 获取 request-id
func GetRequestID(ctx context.Context) string {
ctx.Value(requestIDKey).(string)
}
// 处理请求
func Handle(rw http.ResponseWriter, req *http.Request) {
reqID, ok := GetRequestID(req.Context())
if !ok {
reqID = "unknown" // 处理缺失的情况
}
// 继续处理请求,例如记录日志
rw.Write([]byte("Request ID: " + reqID))
}
func main() {
handler := WithRequestID(http.HandlerFunc(Handle))
http.ListenAndServe("/", handler)
}
取消 goroutine
Context 的作用是为了在一组 goroutine 间传递上下文信息,其中便包括取消信号。取消信号可用于通知相关的 goroutine 终止执行,避免无效操作。
设想一个场景:打开外卖的订单页,地图上显示外卖小哥的位置,而且是每秒更新 1 次。客户端向后台发起 WebSocket 连接请求后,后台启动一个协程,每隔 1 秒计算 1 次小哥的位置,并发送给客户端(现实中可能是轮询)。如果用户退出此页面,则后台需要“取消”此过程,退出 goroutine,系统回收资源。
后端可能的实现如下:
1
2
3
4
5
6
7
func Perform() {
for {
calculatePos()
sendResult()
time.Sleep(time.Second)
}
}
如果需要实现“取消”功能,并且在不了解 Context 功能的前提下,可能会这样做:给函数增加一个指针型 bool 变量,在 for 语句的开始处判断 bool 变量是发由 true 变为 false,如果改变,则退出循环。
上面给出的简单做法,可以实现想要的效果,但是并不优雅。并且一旦通知的信息多了之后,函数入参就会臃肿复杂。
优雅的做法是使用 Context。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Perform(ctx context.Context) {
for {
calculatePos()
sendResult()
select {
case <-ctx.Done():
// 被取消,直接返回
return
case <-time.After(time.Second):
// block 1 秒钟
}
}
}
主流程可能是这样的:
1
2
3
4
5
6
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
go Perform(ctx)
// ...
// 收到 App 端退出页面的通知,调用 cancel 函数。
cancel()
注意一个细节,WithTimeout 函数返回的 Context 和 CancelFunc 是分开的。Context 本身并没有取消函数,这样做的原因是取消函数只能由外层函数调用,防止子结点 Context 调用取消函数,从而严格控制信息的流向:由父结点 Context 流向子结点 Context。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建一个 5 秒超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 模拟一个耗时操作
go func() {
time.Sleep(2 * time.Second)
cancel() // 手动取消上下文
}()
//time.After(3 * time.Second):如果 3 秒后没有其他事件发生,打印 "操作完成"。ctx.Done():如果上下文被取消(无论是因为超时还是手动取消),打印 "操作取消" 并输出取消的错误信息。
select {
case <-time.After(3 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done(): // 上下文被取消
fmt.Println("操作取消:", ctx.Err())
}
}
防止 goroutine 泄漏
goroutine 还是会自己执行完,最后返回,只不过会多浪费一些系统资源。这里给出一个如果不用 context 取消,goroutine 就会泄漏的例子(源自Using contexts to avoid leaking goroutines)。
1
2
3
4
5
6
7
8
9
10
11
12
13
// gen 是一个整数生成器且会泄漏 goroutine
func gen() <-chan int {
ch := make(chan int)
go func() {
var n int
for {
ch <- n
n++
time.Sleep(time.Second)
}
}()
return ch
}
上面的生成器会启动一个具有无限循环的 goroutine,调用者会从信道这些值,直到 n 等于 5。
1
2
3
4
5
6
for n := range gen() {
fmt.Println(n)
if n == 5 {
break
}
}
当 n == 5 的时候,直接 break 掉。那么 gen 函数的协程就会无限循环,永远不会停下来。发生了 goroutine 泄漏。
我们可以使用 Context 主动通知 gen 函数的协程停止执行,阻止泄漏。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func gen(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
var n int
for {
select {
case <-ctx.Done():
return // 当 ctx 结束时避免 goroutine 泄漏
case ch <- n:
n++
}
}
}()
return ch
}
现在,调用方可以在完成后向生成器发送信号。调用 cancel 函数后,内部 goroutine 将返回。
1
2
3
4
5
6
7
8
9
10
11
12
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // make sure all paths cancel the context to avoid context leak
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
cancel()
break
}
}
// ...
参考:
补充:
1
2
3
4
5
6
7
8
9
10
11
newCtx := octrace.CopySpanToNewCtx(ctx) // 防止影响其他
gorunner.GoRun(newCtx, func() error {
instancesId := librds.GetRdsInstancesId(proto.PlatformDY, req.AppName)
if _, err = api.GetInstance().DdpAddShop(newCtx, req.AppName, _token.PlatShopID, instancesId, nil); err != nil {
logger.Errorf(newCtx, "failed to add rds shop: %v", err)
}
if err = librdswhite.SetRdsWhiteShopPlatUserIds(newCtx, proto.PlatformDY, _token.PlatShopID, true, -1); err != nil {
logger.Errorf(newCtx, "failed to set rds white shop: %v", err)
}
return nil
})
补充
context 上下文管理
主要功能: - 全局变量设置 - 通知goroutine退出
通知goroutine退出
使用context.Done()通道是否有数据判断超时时间
- 如果到达超时时间,ctx就会往超时通道放一个数据
- ctx.Done()是从超时通道中获取一个消息,如果获取到了,说明超时时间已经到了。
- 然后执行该请求的关闭操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main
import (
"io/ioutil"
"time"
"net/http"
"context"
"fmt"
)
type Result struct{
r *http.Response
err error
}
func process(){
// 生成一个context超时多对象
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// 最后关闭这个对象
defer cancel()
//生成http的相关对象,和返回结果的管道
tr := &http.Transport{}
client := &http.Client{Transport: tr}
resultChan := make(chan Result, 1)
// 生成一个http请求的对象
req, err := http.NewRequest("GET", "https://www.google.com.hk", nil)
if err != nil{
fmt.Println("http request failed, err:", err)
return
}
// 使用goroutine 执行开始请求并把结果返回到resultChan通道中
go func(){
resp, err := client.Do(req)
pack := Result{r: resp, err: err}
resultChan <- pack
}()
// 检查两个通道判断请求的结果
// ctx.Done()是一个超时的通道,如果之行后就会在指定的超时时间点放入一个消息到这个通道中
// resultChan 是请求结果的通道,如果这个通道有数据说明请求结果完成。
select {
case <- ctx.Done():
// 是否能在超时时间通道获取数据,如果获取到就直接把该请求取消。
tr.CancelRequest(req)
res := <- resultChan
fmt.Println("Time out, err:", res.err)
case res := <- resultChan :
// 如果有数据打印请求的数据
defer res.r.Body.Close()
out, _ := ioutil.ReadAll(res.r.Body)
fmt.Printf("server Respones :%s", out)
}
}
func main(){
process()
}
请求超时运行结果:
1
Time out, err: Get https://www.google.com.hk: net/http: request canceled while waiting for connection
数据的设置和继承
继承是在实例化新的对象时,引用已有的对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"context"
"fmt"
)
func process(ctx context.Context) {
ret, ok := ctx.Value("trace_id").(int)
if !ok {
ret = 654321
}
fmt.Printf("ret:%d\n", ret)
user_id, ok := ctx.Value("user_id").(string)
if !ok {
fmt.Println("Can't get user_id")
}
fmt.Println("user_id:", user_id)
return
}
func main() {
// context.Background() 可以理解为一个基类
ctx := context.WithValue(context.Background(), "trace_id", 123456)
// 之后再设置可以引用上面的ctx(继承)
ctx = context.WithValue(ctx, "user_id", "9fd14d74-9fde-48ff-a060-b18d27cdf201")
process(ctx)
}
运行:
1
2
3
ret:123456
user_id: 9fd14d74-9fde-48ff-a060-b18d27cdf201
1234
主程序完成后自动关闭所有的goroutine
context.WithCanceln 自动通知goroutine退出,
- 就是在cancle()的时候,自动向chan发送一个信号数据,
- goroutine通过ctx.Done()获取数据后自动退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main
import (
"time"
"fmt"
"context"
)
func gen(ctx context.Context) <-chan int{
dst := make(chan int)
n := 1
go func(){
for{
select{
case <- ctx.Done():
fmt.Println(n, "exit")
return
case dst <- n:
n++
}
}
}()
return dst
}
func test(){
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intChan := gen(ctx)
for n := range intChan{
fmt.Println(n)
if n == 5{
break
}
}
}
func main(){
test()
time.Sleep(time.Hour)
}
生命周期
HTTP 框架层(Gin/Echo/Fasthttp 等)
1 2
收到 TCP → 创建 requestCtx → 调 handler → handler return → 框架执行 cancel(requestCtx) → 关闭连接/回写响应 → 生命周期结束
这个
requestCtx就是gin.Context.Request.Context()、echo.Context.Request().Context(),你拿到的就是它。链路追踪(OpenTracing/Jaeger/Zipkin)
1 2
框架在 ① 处创建 span → 注入到 ctx → handler return → 中间件 defer span.Finish() → span 置为 finished
因此 goroutine 里再拿同一条 span 去做 HTTP 调用,会立刻得到
context canceled或ErrSpanFinished。gRPC Server
1
建立 stream → 创建 ctx → 业务逻辑 return → gRPC 框架 cancel(ctx)
和 HTTP 完全一样。
自己手动开的 goroutine
1
go func(ctx context.Context) { … }(ctx)这个 ctx 和主线程共享,主线程 return 后它被 cancel,goroutine 就跪。
如何验证 ctx 什么时候结束
1
2
3
4
go func(ctx context.Context) {
<-ctx.Done()
fmt.Println("ctx done:", ctx.Err()) // 会打印 context canceled
}(ctx)
总结
- 只要 ctx 是从 HTTP/gRPC 请求带进来的,生命周期就只在「一次请求」里。
- 想让后台任务长期跑,必须
context.Background()(或context.TODO())再自建 span / metadata。
不可以。 context.WithCancel(ctx) 只是把「父 ctx 的取消信号」继续向下传递,再加一个可由你手动触发的 cancel() 而已。 如果父 ctx 就是 HTTP 请求带来的那个,那么:
- 当 HTTP handler 返回,父 ctx 会被框架 cancel;
WithCancel得到的子 ctx 也会立刻跟着 Done;- 你的 goroutine 依旧活不了。
正确做法
拿一个全新的根(与请求生命周期完全脱钩)
1 2
bgCtx := context.Background() // 如果还需要 trace / metadata,手动复制
可选:再给它加自己的超时、取消
1
controlCtx, cancel := context.WithCancel(context.Background())
1 2 3
bgCtx := context.Background() bgCtx, cancel := context.WithTimeout(bgCtx, 30*time.Minute) defer cancel() // 视业务在合适时候调用
启动 goroutine
1 2 3
go func(ctx context.Context) { ... }(bgCtx)
一句话: 想让后台任务长期跑,根必须是 context.Background()(或 context.TODO()),而不能复用 HTTP/gRPC 请求里的 ctx。
或者
1
2
3
4
5
6
7
8
9
10
baseCtx := context.WithoutCancel(ctx)
newCtx := octrace.CopySpanToNewCtx(baseCtx)
gorunner.GoRun(newCtx, func() error {
// 调接口 robot-base-llm-service:9011/get_answer
if err := getAnswer(newCtx, req); err != nil {
logger.Errorf(newCtx, "getAnswer 调用失败: %v", err)
return err
}
return nil
})
使用 context.WithoutCancel 确保 newCtx 不会因为原请求上下文取消(如客户端断开连接)而被取消。 使用 octrace.CopySpanToNewCtx 保留 trace 信息,方便链路追踪。 这进一步保证了 getAnswer 的执行 不会受到原请求生命周期的影响
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
func (hdl *AdapterHdl) AuthEventHdl() AuthEventHdl {
return func(ctx context.Context, req *proto.AuthEventReq, rsp *proto.AuthEventRes) error {
logger.Infof(ctx, "wxxd authEventReq details - ShopId: %s, OrderId: %s, CycleDays: %d, CycleStartTs: %d, CycleEndTs: %d, TotalPrice: %f",
req.ShopId, req.OrderId, req.CycleDays, req.CycleStartTs, req.CycleEndTs, req.TotalPrice)
confBase := config.GetInstance()
platShopId := req.ShopId
// 推送订购
if err := publishOrder(ctx, req, platShopId, confBase); err != nil {
logger.Errorf(ctx, "send subcibe info error: %v", err)
return err
}
// 在后台处理获取产品链接
newCtx := context.Background()
gorunner.GoRun(newCtx, func() error {
return handlePublishAndLink(newCtx, req, platShopId, confBase)
})
return nil
}
}
// 处理推送和获取链接的完整流程
func handlePublishAndLink(ctx context.Context, req *proto.AuthEventReq, platShopId string, confBase *config.Config) error {
// 推送订购
if err := publishOrder(ctx, req, platShopId, confBase); err != nil {
logger.Errorf(ctx, "publish order failed: %v", err)
return err
}
// 等待10秒后获取产品链接
time.Sleep(proto.PRODUCT_LINK_WAIT_TIME * time.Second)
// 获取产品链接
redirectUrl, lastErr := getProductLinkWithRetry(ctx, platShopId, confBase.MarketOrderConfig.AppKey)
if lastErr != nil {
logger.Warnf(ctx, "get product link failed, retry publish order: %v", lastErr)
// 重新推送订购
if retryErr := publishOrder(ctx, req, platShopId, confBase); retryErr != nil {
logger.Errorf(ctx, "retry publish order failed: %v", retryErr)
return retryErr
}
// 再次等待并获取产品链接
time.Sleep(proto.PRODUCT_LINK_WAIT_TIME * time.Second)
redirectUrl, lastErr = getProductLinkWithRetry(ctx, platShopId, confBase.MarketOrderConfig.AppKey)
if lastErr != nil {
logger.Errorf(ctx, "get product link failed after retry: %v", lastErr)
return lastErr
}
}
logger.Infof(ctx, "get product link success: %s", redirectUrl)
return nil
}
func publishOrder(ctx context.Context, req *proto.AuthEventReq, platShopId string, confBase *config.Config) error {
orderMsg := &libshop.AppOrderMessage{
ID: req.OrderId,
PlatShopID: platShopId,
Platform: proto.PlatformWXXD,
AppKey: confBase.MarketOrderConfig.AppKey,
CycleDays: req.CycleDays,
CycleStartTs: req.CycleStartTs / 1000,
CycleEndTs: req.CycleEndTs / 1000,
VersionCode: confBase.MarketOrderConfig.VersionCode,
OrderStatus: 1,
OrderCreateTs: time.Now().Unix(),
Ext: make(map[string]interface{}),
ItemNum: 0,
PlatUid: platShopId,
Buyer: "",
PayAmount: req.TotalPrice,
}
return libshop.PublishOrder(ctx, &confBase.MarketOrderConfig, orderMsg)
}
func getProductLinkWithRetry(ctx context.Context, platShopId string, appKey string) (string, error) {
var redirectUrl string
lastErr := retry.Do(func() error {
shopClient := libshop.GetInstance()
var err error
redirectUrl, err = shopClient.GetProductLink(ctx, proto.PlatformWXXD, platShopId, appKey)
if err != nil {
return err
}
if redirectUrl == "" {
return fmt.Errorf("empty redirect url")
}
return nil
},
retry.Attempts(3),
retry.Delay(2*time.Second),
retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)),
retry.MaxDelay(10*time.Second),
retry.LastErrorOnly(true),
)
return redirectUrl, lastErr
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (hdl *AdapterHdl) SpiSend() func(context.Context, *proto.ISVEvaluationCallbackReq, *proto.ISVEvaluationCallbackRes) error {
return func(ctx context.Context, req *proto.ISVEvaluationCallbackReq, rsp *proto.ISVEvaluationCallbackRes) error {
// 保存答案
{
baseCtx := context.WithoutCancel(ctx)
newCtx := octrace.CopySpanToNewCtx(baseCtx)
gorunner.GoRun(newCtx, func() error {
_ = mongo.SaveISVAnswer(newCtx, req)
return nil
})
}
goodsResponse, err := api.GetInstance().ISVEvaluationCallback(ctx, req)
if err != nil {
return err
}
*rsp = *goodsResponse
return nil
}
} 能否100%保证 // 保存答案
{
baseCtx := context.WithoutCancel(ctx)
newCtx := octrace.CopySpanToNewCtx(baseCtx)
gorunner.GoRun(newCtx, func() error {
_ = mongo.SaveISVAnswer(newCtx, req)
return nil
})
}不会影响到SpiSend接口
1
2
3
4
base := context.WithoutCancel(ctx) // 脱离请求生命周期
spanCtx := octrace.CopySpanToNewCtx(base)
newCtx, cancel := context.WithTimeout(spanCtx, 10*time.Second)
defer cancel()
✅ 使用 context.WithoutCancel(ctx) 的作用
它创建一个不受父 context 取消影响的新 context(但仍保留 values,如 trace ID)。
这意味着:即使 ctx(父 context)被取消,newCtx 仍然可以继续运行。
1
2
baseCtx := context.WithoutCancel(ctx) // 第1步:去除取消信号
newCtx := octrace.CopySpanToNewCtx(baseCtx) // 第2步:复制 span 到新 context
1
2
3
4
5
6
原 ctx (可取消)
↓ 调用 Cancel()
× 取消信号被阻断
baseCtx (不可取消) ← context.WithoutCancel()
↓
newCtx (不可取消) ← octrace.CopySpanToNewCtx()
注意: 容易出现oom, 即使goroutine执行完毕:
newCtx 仍然存活(因为WithoutCancel)
上下文包含trace信息、metadata、span等数据
这些对象无法被垃圾回收
❌ 不使用 context.WithoutCancel(ctx) 的后果
如果你直接写:
1
2
3
spanCtx := octrace.CopySpanToNewCtx(ctx)
newCtx, cancel := context.WithTimeout(spanCtx, 10*time.Second)
defer cancel()
1
2
3
4
5
6
ctx (父,可取消)
↓ 调用 Cancel() ─────────┐
取消信号传播 │
spanCtx (继承取消信号) ←─────┘ // CopySpanToNewCtx 只是复制 span,不阻断取消
↓
newCtx (WithTimeout 包装) ───→ 会收到取消信号!
那么:
newCtx的取消链会继承自ctx。- 如果
ctx被取消(比如请求提前返回、上游超时),newCtx也会被取消。 - 这可能导致你设置的 10 秒超时提前失效。
✅ 总结:能不能不用?
| 场景 | 是否可以用 ctx 直接替代 base |
|---|---|
你希望 newCtx 的 10 秒超时不受父 context 取消影响 | ❌ 不能用 ctx,必须用 WithoutCancel |
你接受父 context 取消时,newCtx 也一起被取消 | ✅ 可以省略 WithoutCancel |
✅ 建议
如果你是为了后台任务、日志上传、清理操作等场景,不希望被父请求的生命周期影响,那就必须用 context.WithoutCancel。
方案对比
| 方案 | 优点 | 缺点 |
|---|---|---|
| context.WithoutCancel(ctx) | 异步操作永不取消 | 内存泄漏严重 |
| octrace.CopySpanToNewCtx(ctx) | 无内存泄漏 | 异步操作会被父context取消 |
| 带超时的独立上下文 | ✅ 无内存泄漏 ✅ 异步操作独立 ✅ 有合理超时 | 需要手动管理cancel |
// 方案1:如果需要独立控制,自己创建可取消的 context
1
2
3
baseCtx := context.WithoutCancel(ctx)
newCtx, cancel := context.WithCancel(baseCtx) // 自己管理生命周期
defer cancel() // 或根据业务逻辑调用
// 方案2:如果确实需要继承原 ctx 的取消,但保留 span
1
2
// 不要调用 WithoutCancel,直接复制 span
newCtx := octrace.CopySpanToNewCtx(ctx) // 会继承取消信号
推荐写法
版本B(最佳):goroutine 内创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func pushPulsarAsync(ctx context.Context, msg *pb2.AdapterMsg) {
//gorunner.GoRun 的实现 goroutine 不会被 ctx 主动取消,
gorunner.GoRun(ctx, func() error {
// ✅ 先阻断父取消,再复制 span
baseCtx := context.WithoutCancel(ctx) // 意义不大的时候不要使用,
spanCtx := octrace.CopySpanToNewCtx(baseCtx)
// 现在 asyncCtx 只受 20s 超时控制,不受父取消影响
asyncCtx, cancel := context.WithTimeout(spanCtx, 20*time.Second)
defer cancel()
client_business.PushPulsar(asyncCtx, msg)
return nil
})
}
T+0s: 提交到 gorunner 队列(无资源占用) T+5s: 队列调度,开始执行回调 T+5s: 创建 asyncCtx(20秒计时开始) T+25s: 超时(实际执行了完整的20秒)
版本A:上下文在 goroutine 外创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func pushPulsarAsync(ctx context.Context, msg *pb2.AdapterMsg) {
// 1️⃣ 立即创建 asyncCtx(20秒计时开始)
baseCtx := context.WithoutCancel(ctx) //意义不大的时候不要使用,
spanCtx := octrace.CopySpanToNewCtx(baseCtx)
asyncCtx, cancel := context.WithTimeout(spanCtx, 20*time.Second)
// 2️⃣ 提交到 gorunner(可能排队等待)
gorunner.GoRun(ctx, func() error {
defer cancel()
// 3️⃣ 实际执行时,可能已经过去 5 秒
// 但只剩 15 秒可用(20秒已流逝)
client_business.PushPulsar(asyncCtx, msg)
return nil
})
}
T+0s: 创建 asyncCtx(20秒计时开始) T+0s: 提交到 gorunner 队列 T+5s: 队列调度,开始执行回调(只剩 15 秒) T+20s: asyncCtx 超时取消(实际只执行了 15 秒)
导致 OOM(内存溢出)
问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
newCtx := octrace.CopySpanToNewCtx(baseCtx)
switch msgAdapter.GetMsgType() {
case model.BuyerMsg:
aiAnswer, ok := req.(*pb.ChatMessage)
if !ok {
return fmt.Errorf("req type error")
}
logger.Infof(ctx, "原始消息buyer:%+v", *aiAnswer)
ret.PlatMsgId = aiAnswer.PlatMsgId
ExtInfo.RequestID = aiAnswer.RequestId
if aipaasDispatchMindFlow(ctx, aiAnswer.ShopId) {
isSendBusiness = false
}
pushPulsarAsync(newCtx, &pb2.AdapterMsg{
MsgType: pb2.AdapterMsgType_BUYER_MSG,
Data: &pb2.AdapterMsg_ChatMessage{
ChatMessage: aiAnswer.ChatMessage,
},
})
if !isSendBusiness {
return nil
}
err := client_business.Answer(ctx, aiAnswer.ChatMessage, rspInfo)
if err != nil {
return err
}
func pushPulsarAsync(ctx context.Context, msg *pb2.AdapterMsg) {
gorunner.GoRun(ctx, func() error {
client_business.PushPulsar(ctx, msg)
return nil
})
}
func PushPulsar(ctx context.Context, msg *msgAdapterPb.AdapterMsg) {
PushPulsarJson(ctx, msg)
data, err := proto.Marshal(msg)
if err != nil {
logger.Errorf(ctx, "json marshal error: %v", err)
return
}
err = pulsar.SendAsyncByte(ctx, "msg_adapter_msg", data)
if err != nil {
logger.Errorf(ctx, "send pulsar error: %v", err)
return
}
return
}
主要问题分析
1. 上下文管理不当
1
2
3
4
5
6
7
8
9
10
baseCtx := context.WithoutCancel(ctx) // 创建了一个永不取消的上下文
newCtx := octrace.CopySpanToNewCtx(baseCtx)
func pushPulsarAsync(ctx context.Context, msg *pb2.AdapterMsg) {
gorunner.GoRun(ctx, func() error {
client_business.PushPulsar(ctx, msg) // ctx被goroutine持有
return nil // goroutine函数结束
})
// goroutine结束,但ctx的引用仍然存在
}
使用 context.WithoutCancel(ctx) 创建了一个永不取消的上下文
这意味着即使原始请求(父)被取消(客户端断开连接、超时等),异步推送 Pulsar 的 goroutine 仍然会继续运行
在高并发场景下,会积累大量不会被垃圾回收的 goroutine, 高并发下的积累效应
每个消息 = 一个goroutine = 一个永不释放的上下文
1000 QPS × 上下文大小(几KB) × 运行时间(小时) = 大量内存
2. goroutine 泄漏风险
1
pushPulsarAsync(newCtx, &pb2.AdapterMsg{...})
个消息都会通过 gorunner.GoRun 启动新的 goroutine:
如果 Pulsar 服务器响应慢或网络问题,这些 goroutine 会阻塞
没有 goroutine 池限制,无限创建 goroutine
goroutine 持有上下文引用,导致内存无法释放
3. 异步操作无限制
每个消息分支都调用 pushPulsarAsync
高峰期可能同时创建数千个 goroutine
每个 goroutine 占用 ~2-8KB 栈空间,容易导致内存爆炸
解决方案建议
1. 修复上下文管理
1
2
// 替换现有的上下文创建
newCtx := octrace.CopySpanToNewCtx(ctx) // 直接使用原始上下文
2. 添加 goroutine 池限制
考虑使用 sync.Pool 或第三方 goroutine 池库来限制并发数量。
3. 添加超时控制
1
2
3
4
5
timeoutCtx, cancel := context.WithTimeout(newCtx, 30*time.Second)
defer cancel()
gorunner.GoRun(timeoutCtx, func() error {
// 推送逻辑
})
4. 使用缓冲通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var pulsarChan = make(chan *pb2.AdapterMsg, 1000) // 限制缓冲区大小
func pushPulsarAsync(msg *pb2.AdapterMsg) {
select {
case pulsarChan <- msg:
// 成功入队
default:
// 队列满,丢弃或记录错误
logger.Warnf(ctx, "pulsar queue full, dropping message")
}
}
// 单独的消费者 goroutine 处理队列
go func() {
for msg := range pulsarChan {
client_business.PushPulsar(ctx, msg)
}
}()
核心问题:context.WithoutCancel 导致的 goroutine 永生是主要元凶,建议优先修复这个问题。
双层异步是合理的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func pushPulsarAsync(ctx context.Context, msg *pb2.AdapterMsg) {
gorunner.GoRun(ctx, func() error {
client_business.PushPulsar(ctx, msg)
return nil
})
}
func PushPulsar(ctx context.Context, msg *msgAdapterPb.AdapterMsg) {
PushPulsarJson(ctx, msg)
data, err := proto.Marshal(msg)
if err != nil {
logger.Errorf(ctx, "json marshal error: %v", err)
return
}
err = pulsar.SendAsyncByte(ctx, "msg_adapter_msg", data)
if err != nil {
logger.Errorf(ctx, "send pulsar error: %v", err)
return
}
return
}
func PushPulsarJson(ctx context.Context, msg *msgAdapterPb.AdapterMsg) {
op := protojson.MarshalOptions{
UseProtoNames: true,
UseEnumNumbers: true,
}
data, err := op.Marshal(msg)
if err != nil {
logger.Errorf(ctx, "json marshal error: %v", err)
return
}
err = pulsar.SendAsyncByte(ctx, "msg_adapter_msg_json", data)
if err != nil {
logger.Errorf(ctx, "send pulsar error: %v", err)
return
}
return
}
异步层次分析
第一层异步 (gorunner.GoRun):
1
2
3
4
gorunner.GoRun(asyncCtx, func() error {
client_business.PushPulsar(asyncCtx, msg) // 所有操作都在goroutine中
return nil
})
第二层异步 (pulsar.SendAsyncByte):
1
2
3
4
5
// 在goroutine中执行
PushPulsarJson(ctx, msg) // 同步:JSON序列化
data, err := proto.Marshal(msg) // 同步:protobuf序列化
err = pulsar.SendAsyncByte(...) // 异步:网络发送
err = pulsar.SendAsyncByte(...) // 异步:再次网络发送
为什么需要双重异步
序列化开销:proto.Marshal 和 JSON 序列化是CPU密集型操作
错误处理:日志记录、错误检查等同步操作
客户端准备:GetPublisher() 等可能有同步开销
网络准备:连接池获取、消息封装等
性能优势
| 场景 | 直接调用 | 单层异步 | 双重异步 |
|---|---|---|---|
| 主流程阻塞 | 高 | 中 | 无 |
| CPU累积 | 高 | 高 | 低 |
| 响应时间 | 慢 | 中 | 快 |
| 错误隔离 | 无 | 中 | 好 |
实际效果
1
2
3
4
5
6
7
8
// 主流程:立即返回,不等待任何操作
pushPulsarAsync(ctx, msg)
return // 主业务立即完成
// 后台goroutine:异步处理所有耗时操作
goroutine: {
序列化消息 → 发送到Pulsar → 发送到Pulsar JSON → 完成
}
这种设计确保了:
✅ 主业务流程零阻塞
✅ 所有耗时操作异步化
✅ 高并发下的稳定性能
✅ 完善的错误隔离
结论:这种双重异步设计是经过深思熟虑的架构模式,能在高并发场景下提供最佳的性能和用户体验。
由 Go GC(垃圾回收) 管理,无人引用时自动回收
1
2
3
baseCtx := context.WithoutCancel(ctx) // ① 创建
newCtx := octrace.CopySpanToNewCtx(baseCtx) // ② 包装
pushPulsarAsync(newCtx, msg) // ③ 传入 goroutine
1
2
3
4
5
6
7
8
goroutine 堆栈(长期存在,因为 gorunner 是线程池)
↓ 长期引用
newCtx
↓ 父节点
baseCtx
↓ 父节点(通过 WithoutCancel 的内部机制)
ctx(原始请求上下文)
↓ 可能引用大量请求相关数据...
为什么 “永不取消” 导致无法回收
| 正常 context | WithoutCancel context |
|---|---|
超时/取消后,Done() 关闭 | Done() 永远不会关闭 |
| 生命周期明确 | 生命周期无限 |
| 可以被及时回收 | 必须等待所有引用释放 |
1
2
3
4
5
6
7
8
// WithoutCancel 的实现(简化)
func WithoutCancel(parent Context) Context {
return &withoutCancelCtx{parent} // 持有 parent 引用!
}
type withoutCancelCtx struct {
c Context // 这个字段一直存在
}
baseCtx 持有了 ctx,newCtx 持有了 baseCtx,goroutine 持有了 newCtx…
泄漏场景
1
2
3
4
5
6
7
8
9
10
// 每次请求都执行
func HandleRequest(ctx context.Context, msg *Msg) {
baseCtx := context.WithoutCancel(ctx)
newCtx := octrace.CopySpanToNewCtx(baseCtx)
pushPulsarAsync(newCtx, msg) // goroutine 长期持有
}
// 假设 gorunner 是固定线程池,goroutine 永不退出
// 结果:每次请求的数据都通过 ctx → baseCtx → newCtx 被长期引用
// 内存持续增长,无法回收,最终 OOM
解决方案
✅ 方案 1:在 goroutine 内部创建(推荐)
1
2
3
4
5
6
7
8
9
10
11
12
13
func pushPulsarAsync(parentCtx context.Context, msg *pb2.AdapterMsg) {
// 只传递必要信息,不传递 ctx
gorunner.GoRun(nil, func() error {
// 内部创建,内部释放
baseCtx := context.WithoutCancel(parentCtx) // 阻断取消
spanCtx := octrace.CopySpanToNewCtx(baseCtx)
asyncCtx, cancel := context.WithTimeout(spanCtx, 20*time.Second)
defer cancel()
client_business.PushPulsar(asyncCtx, msg)
return nil
})
}
关键点: parentCtx 只在创建瞬间使用,创建后不再被持有。
✅ 方案 2:只复制必要数据,不保留 ctx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func pushPulsarAsync(ctx context.Context, msg *pb2.AdapterMsg) {
// 提前提取需要的数据
traceID := octrace.GetTraceID(ctx)
gorunner.GoRun(nil, func() error {
// 用提取的数据重建,不持有原 ctx
newCtx := octrace.WithTraceID(context.Background(), traceID)
asyncCtx, cancel := context.WithTimeout(newCtx, 20*time.Second)
defer cancel()
client_business.PushPulsar(asyncCtx, msg)
return nil
})
}
对比总结
| 写法 | 是否泄漏 | 原因 |
|---|---|---|
外部 WithoutCancel + CopySpan + 传入 goroutine | ❌ 泄漏 | goroutine 长期持有引用链 |
| 内部创建 + 使用 + 释放 | ✅ 安全 | 创建后原 ctx 不再被持有 |
| 提取数据 + 重建上下文 | ✅ 安全 | 完全不持有原 ctx |
一句话结论
是的,你的写法会导致内存泄漏!
newCtx通过baseCtx间接持有原始ctx,而 goroutine 长期不退出 → 引用永不释放 → 无法 GC。 必须将WithoutCancel和CopySpan移到 goroutine 内部执行,确保原 ctx 只被短暂使用。
octrace
ExtractTraceID
octrace.ExtractTraceID(ctx).String() 的作用是: 功能说明
- 提取追踪ID:从上下文(context)中提取分布式追踪系统的追踪ID
- 转换为字符串:将提取出的追踪ID转换为字符串格式
使用场景
- 日志关联:将追踪ID记录到日志中,便于跨服务的问题排查
- 监控追踪:在微服务架构中跟踪单个请求的完整调用链路
- 调试分析:帮助开发人员理解请求在不同服务间的流转情况
