文章

通道

内存泄露

对于一个channel来说,如果没有任何goroutine(轻量级线程)引用,gc会对其进行回收,不会引起内存泄露。

1
2
3
4
// goroutine
go func() {
    fmt.Println("hello from a goroutine")
}()

而当goroutine处于接收或发送阻塞状态,channel处于空或满状态时,一直得不到改变,gc则无法回收这类一直处于等待队列中的goroutine,引起内存泄露。

引起内存泄露的几种情况:

  • channel缓存队列已满,接收者goroutine退出,发送者阻塞且所在goroutine无法退出
  • channel缓存队列未满,发送者goroutine退出,接收者阻塞且所在goroutine无法退出
  • channel未初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//情形1:  channel缓存队列已满,接收者goroutine退出,发送者阻塞且所在goroutine无法退出
func TestName(t *testing.T) {
	channel := make(chan int, 5)
	//sender
	go func() {
		for i := 0; i < 10; i++ {
			channel <- i //阻塞
			time.Sleep(10 * time.Millisecond)
			fmt.Println("sender count:", i)
		}
		fmt.Println("sender exit.") //sender 无法退出
	}()

	//receiver
	go func() {
		i := <-channel //receiver 接收一个退出
		fmt.Println("receive:", i)
		fmt.Println("receiver exit.")
	}()

	for {
		time.Sleep(10 * time.Millisecond)
	}
}

/**
=== RUN   TestName
receive: 0
receiver exit.
sender count: 0
sender count: 1
sender count: 2
sender count: 3
sender count: 4
sender count: 5


*/
// 解决方式
func TestName1(t *testing.T) {
    channel := make(chan int, 5)

    // sender
    go func() {
        channel <- 1
        close(channel)          // 关键:关闭 channel
        fmt.Println("sender exit.")
    }()

    // receiver
    go func() {
        for v := range channel { // range 会在 channel 关闭且读完数据后自动退出
            fmt.Println("receive:", v)
        }
        fmt.Println("receiver exit.")
    }()

    time.Sleep(100 * time.Millisecond) // 等它们跑完
}



// 情形2:  channel缓存队列未满,发送者goroutine退出,接收者阻塞且所在goroutine无法退出
func TestName1(t *testing.T) {
	channel := make(chan int, 5)
	//sender
	go func() {
		channel <- 1 //发一个退出
		fmt.Println("sender exit.")
	}()

	//receiver
	go func() {
		for i := 0; i < 10; i++ {
			i := <-channel //阻塞
			fmt.Println("receive:", i)
		}
		fmt.Println("receiver exit.")
	}()

	for {
		time.Sleep(10 * time.Millisecond)
	}

}

/**
=== RUN   TestName1
sender exit.
receive: 1

*/

// 解决:取消信号. 加一个通知另一段退出的stopChan
func TestName2(t *testing.T) {
	channel := make(chan int, 5)
	stopChan := make(chan struct{}) //无缓冲的channel作为信号量

	//sender
	go func() {
		for i := 0; i < 10; i++ {
			select {
			case <-stopChan: //收到close信号,退出。多个sender时,每个goroutine也都能收到
				fmt.Println("sender exit.")
				return
			case channel <- i:
				time.Sleep(10 * time.Millisecond)
				fmt.Println("sender count:", i)
			}
		}
		fmt.Println("sender exit.")
	}()

	//receiver
	go func() {
		i := <-channel
		close(stopChan)
		fmt.Println("receive:", i)
		fmt.Println("receiver exit.")
	}()

	for {
		time.Sleep(10 * time.Millisecond)
	}

}

备注:

从一个关闭的channel仍能读出数据,当缓冲队列还有数据时,会返回正常值;缓冲队列为空时,则返回channel已关闭。

向一个关闭的channel发送数据则会报Panic

阻塞判断

1
2
3
4
for skuMsg := range skuChan {  // <--- 这里会一直等待直到channel被关闭
    // 处理每个sku消息
}
//当 channel 被关闭且所有数据被消费后,循环自然结束

三种状态下的行为

1
2
3
4
数据可用: 立即获取数据并执行循环体
数据未到达: 阻塞等待直到有数据或 channel 关闭
channel 关闭且空: 循环正常退出
同步机制

生产者-消费者模型

1
2
3
生产者通过 ch <- data 发送数据
消费者通过 for range 接收数据
自动实现了生产者和消费者的同步

关闭信号传递

1
2
3
1. close(channel) 既是关闭信号也是完成信号
2. 每个 channel 都有 defer close(ch) 保证最终会被关闭range, 循环会在 channel 关闭时自动结束,即使暂时没有数据,range 也会等待直到有数据或者 channel 被关闭
3. 消费者通过检测 channel 状态知道生产已完成

内存可见性保障

1
2
3
Go 的 channel 实现提供了必要的内存屏障
确保数据在 goroutine 间正确传递
这种设计使得 for range 循环能够优雅地处理并发数据流,无需额外的同步原语即可实现完整的生产者-消费者模式。

带缓存和无缓存

无缓冲(N=0)——发送端必阻塞

1
2
ch := make(chan item) // 容量 0
ch <- it         // 写端会卡住,直到有人 `<-ch` 读走

特点:

  • 同步操作:发送和接收必须同时就绪才能完成操作
  • 阻塞行为:发送操作会阻塞直到有接收者准备就绪
  • 接收操作:也会阻塞直到有发送者准备就绪
  • 保证顺序:发送和接收严格配对,确保数据传递的同步性

使用场景:

  • 需要严格的同步通信 ]生产者消费者模式中需要精确控制 协程间协调操作

有缓冲(N>0)——发送端先丢进队列,立即返回

1
2
3
ch := make(chan item, 2) // 容量 2
ch <- it1                // 缓冲区空,直接拷贝进去,写端立即返回
ch <- it2                // 缓冲区还有 1 个空位,再拷贝,立即返回

特点:

  • 异步操作:发送方可以在缓存未满时立即发送

  • 非阻塞发送:只要缓存未满,发送不会阻塞

  • 非阻塞接收:只要缓存非空,接收不会阻塞

  • 缓冲作用:可以暂时存储数据,平滑生产者和消费者的速率差异

使用场景:

  • 数据流处理中的临时存储
  • 生产者和消费者速率不匹配的情况
  • 避免不必要的阻塞,提高并发性能

写法

入参:

1
onsaleCh chan <- IterGoodsResult

出参

1
chan SupplierSkuMsg

写入

1
2
3
4
5
ch <- SupplierSkuMsg{
    SkuId:     resWare.GoodsID,
    IsMainSku: true,
    Price:     resWare.Price,
}

读取

1
2
3
4
5
6
7
8
9
10
11
12
13
// 方法1
for skuMsg := range ch { 
    //… 
}
// 方法2
for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case job := <-jobCh:
         //… 
    }
}

搭配select

使用的典型场景可以一句话概括:“当你想同时等好几个 channel 的收发事件,谁先到就处理谁” 的时候,就必须用 select

基础

1
2
3
4
5
6
7
8
9
10
11
// range
for skuMsg := range ch { //… }  

// 等价于
for {
    v, ok := <-ch
    if !ok {          // 只有 ch 被关闭才能退出
        break
    }
    // …
}

例子

1
2
3
4
{
    for skuMsg := range skuChan1 {  }   // ① 把 ch1 全部消费完
    for skuMsg := range skuChan2 {  }   // ② 再把 ch2 全部消费完
}
1
2
3
4
5
6
7
8
9
10
11
for {
    // 谁先到用谁
    select {
    case skuMsg, ok := <-skuChan1:
        if !ok { skuChan1 = nil; continue }
        ...
    case skuMsg, ok := <-skuChan2:
        if !ok { skuChan2 = nil; continue }
        ...
    }
}

主动取消(context.Done)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//只处理一条就结束 → 不用 for
select {  // select不带default是阻塞的
case <-ctx.Done():
    return ctx.Err()
case job := <-jobCh:
    process(job)   // 只处理一次就返回
}

//想循环不断地从 jobCh 里拿任务,直到上游取消 → 必须包 for
for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case job := <-jobCh:
        process(job)   // 每条都处理
    }
}

多路等待(multiplex)

1
2
3
4
5
6
7
// 读
select {
case v := <-ch1:
    fmt.Println("ch1 先到:", v)
case v := <-ch2:
    fmt.Println("ch2 先到:", v)
}

非阻塞收发(带 default)

消息必须立刻发出去,否则就直接丢,绝对不能阻塞上游的场景

1
2
3
4
5
6
7
8
9
10
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)

// 写
select {
    case ch1 <- msg: //投递
    case ch2 <- msg: //投递
    default: // 实现select非阻塞
      // 缓冲区两个都满,丢弃
}

例子

多路等待(multiplex)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func fastest(ctx context.Context, req Request) (Response, error) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()          // 1. 确保函数退出时所有 goroutine 都被打断

	type item struct {
		resp Response
		err  error
	}
	ch := make(chan item, 2) // 缓冲 2,防止写端阻塞

	// 2. 并发请求两个副本
	go func() {
		resp, err := callReplica1(ctx, req)
		select {
		case ch <- item{resp, err}:
		case <-ctx.Done(): // 如果已经取消,直接退出
		}
	}()

	go func() {
		resp, err := callReplica2(ctx, req)
		select {
		case ch <- item{resp, err}:
		case <-ctx.Done():
		}
	}()

   // select阻塞 
	// 3. 只要第一个回来就返回,cancel 会通知另一个 goroutine 退出
	select {
	case it := <-ch:
		return it.resp, it.err
	case <-ctx.Done():
		return Response{}, ctx.Err()
	}
}

取消goroutine

方式1 取消信号

假设你有一个主 goroutine 和一个工作 goroutine,主 goroutine 需要通知工作 goroutine 停止处理,并等待工作 goroutine 确认停止。 主 goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
    "fmt"
    "time"
)

func main() {
    closeProcessCh := make(chan struct{})
    closeProcessAckCh := make(chan struct{})

    // 启动工作 goroutine
    go worker(closeProcessCh, closeProcessAckCh)

    // 模拟主 goroutine 的其他操作
    time.Sleep(2 * time.Second)

    // 通知工作 goroutine 停止处理
    closeProcessCh <- struct{}{}     // ① 发送停止信号(阻塞直到worker接收)
    <-closeProcessAckCh           // ② 等待确认(阻塞直到worker发送)

    fmt.Println("工作 goroutine 已经停止")
}

工作 goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func worker(closeProcessCh chan struct{}, closeProcessAckCh chan struct{}) {
    for {
        select {
        case <-closeProcessCh:
            // 收到停止信号,进行清理操作
            fmt.Println("工作 goroutine 收到停止信号,开始清理...")
            time.Sleep(1 * time.Second) // 模拟清理操作
            fmt.Println("工作 goroutine 清理完成")

            // 发送确认信号
            closeProcessAckCh <- struct{}{}
            return
        default:
            // 继续处理任务
            fmt.Println("工作 goroutine 正在处理任务...")
            time.Sleep(500 * time.Millisecond)
        }
    }
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
	succPlatGoodsIds := make(map[string]bool)
	failedPlatGoodsIds := make(map[string]bool)	
	syncM := sync.Mutex{}
	runner := gorunner.NewGoRunner()
	runner.GoRun(ctx, func() error {
		for iterResult := range onsaleCh {
			if iterResult.Err != nil {
				abort = true
				return err
			}
			tempV := iterResult.Spu
			_, err := w.container.Spu().AddSpu(ctx, &tempV)
			if err != nil {
				syncM.Lock()
				failedPlatGoodsIds[tempV.PlatGoodsID] = true
				syncM.Unlock()
				continue
			} else {
				logger.Debugf(ctx, "add spu success, goods is %v", tempV.PlatGoodsID)
				syncM.Lock()
				succPlatGoodsIds[tempV.PlatGoodsID] = true
				actualTotal++
				syncM.Unlock()
			}
		}
		return nil
	})

	closeProcessCh := make(chan struct{})
	closeProcessAckCh := make(chan struct{})
	runner.GoRun(ctx, func() error {
		defer func() {
			closeProcessAckCh <- struct{}{}
		}()

		t := time.NewTicker(time.Second * 5)
		defer t.Stop()

		for {
			select {
			case <-t.C:
				syncM.Lock()
				succ := int64(len(succPlatGoodsIds))
				failed := int64(len(failedPlatGoodsIds))
				syncM.Unlock()

				fixTotal := max(total, succ+failed)
				logger.WithFields(taskFields).WithFields(logger.Fields{
					"trueTotal": actualTotal,
					"total":     fixTotal,
					"succeed":   succ,
					"failed":    failed,
				}).Info(ctx, "update task process")
				err = w.TaskProcessReport(fixTotal, succ, failed)
				if err != nil {
					logger.WithFields(taskFields).WithError(err).Error(ctx, "report process failed")
				}
			case <-closeProcessCh:
				return nil
			}
		}
	}, gorunner.WithSkipWait())
	runner.Wait()
	closeProcessCh <- struct{}{}
	<-closeProcessAckCh

方式2 取消上下文

1
onsaleCh 通道是在多个 fetchAndSendSpuItems 调用之间共享的这意味着每个 fetchAndSendSpuItems 函数都会尝试向同一个 onsaleCh 通道发送数据尝试关闭一个已经关闭的通道好像就会触发报错

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
func (a *defaultApi) IterOnsaleGoodsByPage(ctx context.Context, work *Worker, pf platform.TPlatform, shopID string, initialPageNo int64, pageSize int64, total int64) (<-chan IterGoodsResult, error) {
	onsaleCh := make(chan IterGoodsResult, 150)
	subCtx, cancel := context.WithCancel(ctx)   // 取消上下文
	cancelFunc := func() { cancel() }
	runner := gorunner.NewGoRunner()
	var limitCount int64 = 10000

	if total >= limitCount {
		startTime := int64(1575043200)
		endTime := common.LocalTimeNow().Unix()
		isSubdivided, isAbort := efficientSubdivideTime(subCtx, work, pf, shopID, limitCount, initialPageNo, pageSize, startTime, endTime, onsaleCh, &runner, cancelFunc)
		if isAbort {
			cancelFunc()
			return onsaleCh, fmt.Errorf("unable to subdivide time range for shop %s", shopID)
		}
		if !isSubdivided {
			cancelFunc()
			return onsaleCh, fmt.Errorf("subdivision did not return any valid ranges for shop %s", shopID)
		}
	} else if total > 0 {
		runner.GoRun(ctx, func() error {
			return fetchAndSendSpuItems(subCtx, work, pf, shopID, initialPageNo, pageSize, nil, nil, onsaleCh, cancelFunc)
		})
	}

	// 后台等待所有任务完成
	gorunner.GoRun(ctx, func() error {
		defer func() {
			defer cancelFunc()  // 取消
			defer close(onsaleCh) // 完成后关闭 onsaleCh 通道
			logger.Infof(ctx, "IterOnsaleGoodsByPage all goroutines completed for shop %s", shopID)
		}()

		select {
		case <-ctx.Done():
			logger.Warnf(ctx, "Operation cancelled for shop %s", shopID)
			return ctx.Err()
		default:
			runner.Wait()
		}
		return nil
	})

	return onsaleCh, nil
}

// 获取SPU列表并发送到通道  //返回一个通道的函数
func fetchAndSendSpuItems(ctx context.Context, work *Worker, pf platform.TPlatform, shopID string, initialPageNo int64, pageSize int64, beginTime *int64, endTime *int64, onsaleCh chan<- IterGoodsResult, cancelFunc func()) error {
	filter := &spu.SpuQuery{
		ShopId:   shopID,
		Page:     initialPageNo,
		PageSize: pageSize,
	}

	if beginTime != nil && endTime != nil {
		filter.TimeFilters = []query.TimeFilter{
			{
				Start:    strconv.FormatInt(*beginTime, 10),
				End:      strconv.FormatInt(*endTime, 10),
				TimeName: "CreateTime",
			},
		}
	}

	for {
		select {
		case <-ctx.Done():
			onsaleCh <- IterGoodsResult{Err: ctx.Err()}
			return ctx.Err()
		default:
		}

		const maxRetries = 3
		retryBackoff := time.Second
		var spuList []spu.Spu
		var err error
		var i int
		// 重试
		for i = 0; i < maxRetries; i++ {
			spuList, _, err = work.container.Sdk(pf).GetSpuList(ctx, filter)
			if err == nil {
				break
			}
			time.Sleep(retryBackoff)
			logger.Infof(ctx, "failed to get spu list by page, shop_id: %v, page: %v, page_size: %v, err: %v, retrying (%d/%d)",
				filter.ShopId, filter.Page, filter.PageSize, err, i+1, maxRetries)
			retryBackoff *= 2 // 指数退避
		}
		if err != nil {
			logger.Errorf(ctx, "failed to get spu list by page, shop_id: %v, page: %v, page_size: %v, err: %v, retrying (%d/%d)",
				filter.ShopId, filter.Page, filter.PageSize, err, i+1, maxRetries)
			onsaleCh <- IterGoodsResult{Err: err}
			cancelFunc() // 取消所有goroutine的fetchAndSendSpuItems()
			return err
		}

		if len(spuList) == 0 {
			return nil
		}

		for _, spuItem := range spuList {
			s := spuItem
			onsaleCh <- IterGoodsResult{Spu: s, Err: nil}
		}

		filter.Page += 1
		if len(spuList) > 0 {
			time.Sleep(time.Second * 5)
		}
	}
}

// 细分时间区间
func efficientSubdivideTime(ctx context.Context, work *Worker, pf platform.TPlatform, shopID string, limitCount, initialPageNo, pageSize,
	startTime, endTime int64, onsaleCh chan<- IterGoodsResult, runner *gorunner.GoRunner, cancelFunc func()) (bool, bool) {
	if endTime <= startTime {
		return true, false // 参数1: true 表示区间有效; 参数2: true 表示是否终止
	}

	count, err := Instance(pf).GetOnsaleGoodsCountByTimestamp(ctx, work, pf, shopID, 1, 1, startTime, endTime)
	if err != nil {
		logger.Errorf(ctx, "failed to get spu list count by page, shop_id: %v, err: %v", shopID, err)
		onsaleCh <- IterGoodsResult{Err: err}
		return false, true
	}

	if count == 0 {
		return true, false
	}

	if count > 0 && count < limitCount {
		logger.Infof(ctx, "Efficient subdivision: startTime:%s, endTime:%s, count:%d", common.TimestampToStringTime(startTime), common.TimestampToStringTime(endTime), count)
		(*runner).GoRun(ctx, func() error {
			return fetchAndSendSpuItems(ctx, work, pf, shopID, initialPageNo, pageSize, &startTime, &endTime, onsaleCh, cancelFunc)
		})
		return true, false
	}

	if count >= limitCount {
		midTime := (startTime + endTime) / 2
		isLeftValid, isLeftAbort := efficientSubdivideTime(ctx, work, pf, shopID, limitCount, initialPageNo, pageSize, startTime, midTime, onsaleCh, runner, cancelFunc)
		if isLeftAbort {
			return false, true
		}

		isRightValid, isRightAbort := efficientSubdivideTime(ctx, work, pf, shopID, limitCount, initialPageNo, pageSize, midTime, endTime, onsaleCh, runner, cancelFunc)
		if isRightAbort {
			return false, true
		}

		return isLeftValid || isRightValid, false
	}

	return false, false
}

具体调用见: 方式1 取消信号

效果: 调用瞬间, 会打印细分出来的时间区间, 然后后台去执行耗时操作(fetchAndSendSpuItems)

image-20241206144106416

© 2024- lfj