Golang扫盲式学习——GO并发
并发与并行😣
并发与并行的概念和区别
并行:同一个时间段内多个任务同时在不同的CPU核心上执行。强调同一时刻多个任务之间的”同时执行“。
并发:同一个时间段内多个任务都在进展。强调多个任务间的”交替执行“。
随着硬件水平的提高,现在的终端主机都是多个CPU,每个CPU都是多核结构。当多个CPU同时运行起来,跑不同的任务,这属于并行;在一个CPU里的多个核心里同时运行不同的任务,同样也属于并行。而并发是关注一个核心里的多个任务,这时需要交替执行,就是并发。
CPU是计算单元,有数据才能进行计算。当一个任务被网络I/O阻塞,CPU没有数据,就会处于等待。显然,若是能够将等待的时间利用起来,资源利用率会提高。因此,并发处理的主要目的是提高CPU和资源的利用率。
Go语言并发与CPU核心的关系
- Go并发是基于Goroutine和Channel实现的。
Goroutine是Go语言的并发执行单元,Channel用于Goroutine之间的通信与同步。
- Goroutine的执行依赖于操作系统的线程调度。
Goroutine自身不具备执行上下文,它必须依存在操作系统线程上才可以真正执行。当一个Goroutine被创建时,Go runtime会自动选择一个空闲的操作系统线程,将这个Goroutine的执行上下文绑定到该线程上。
上图中G表示Goroutine,P表示一个调度的上下文(包含了运行 Goroutine 的资源),M表示一个OS线程。一个操作系统线程可以同时关联多个Goroutine,这些Goroutine会被Go runtime高效地在该线程上调度执行。但任意时刻只会有一个Goroutine获得线程的执行权进行运行(G0就是获得线程权的Goroutine)。当关联的操作系统线程终止时,绑定在该线程上的所有Goroutine也会被终止。
- 在单核CPU上,即使有许多Goroutine,同一时刻也只能有一个Goroutine真正在CPU上运行。
单核CPU同一时刻只能执行一个线程。即使有许多Goroutine,也只有获得CPU执行权的那个Goroutine在真正运行,其他Goroutine会被挂起,等待下次被调度执行。
- 在多核CPU上,操作系统可以将不同的Goroutine直接调度到不同的CPU核心上运行。
这样多个Goroutine就可以同时真正运行,实现并行执行。此时Go并发程序可以发挥多核CPU的强大计算能力。Go runtime会有智能的调度策略,将Goroutine均匀地分布在所有CPU核心上或者以负载均衡的方式进行调度,这取决于Goroutine的数量和系统的CPU核心数。
GMP模型如上图所示。图中涉及5个重要的实体。
- 全局队列(run queue):存放等待运行的Goroutine。
- 本地队列(local queue):和P连接的队列,存放的也是等待运行的Goroutine,存放数量有限,一般不超过256个。新建Goroutine时,优先在本地队列存放,然后再放置全局队列。
- P:被称为处理器,包含了运行 Goroutine 的资源。一个Goroutine想要运行,必须先获取P。P的数量是可配置的,最多有
GOMAXPROCS
个。 - G:指代Goroutine,会被Go runtime智能化调度。当一个线程M空闲时,首先会从全局队列里获取Goroutine,若全局队列为空,则会从周边的线程”偷“一半Goroutine放到本地队列。
- M:线程由OS调度器分配到CPU的核上执行。当一个线程阻塞时,会导致和该线程的其它Goroutine”饿死“。此时,Go runtime会解绑P和M,将一个新的M分配给P避免”饿死“情况发生。
Goroutine与OS线程的区别
- 生命周期不同
Goroutine由Go runtime 管理生命周期,创建和销毁由runtime调度完成。OS线程是由操作系统内核来管理生命周期。
- 调度不同
Goroutine由Go runtime的调度器进行调度。OS线程的调度是由操作系统内核根据时间片进行的。
- 关联关系不同
每个OS线程与一个Goroutine关联,但一个Goroutine不一定对应一个OS线程,多个Goroutine可能对应同一个OS线程。Go runtime会动态地将Goroutine映射到线程上。
- 资源消耗不同
创建和维护OS线程需要较多资源,而Goroutine的资源消耗很小。一个应用程序可以同时存在成千上万个Goroutine,但OS线程数目通常较小。
- 通信方式不同
Goroutine之间通信使用Channel,而OS线程通常使用共享内存来通信。
- 并发数不同
一个Go程序的并发度可以达到上百万,这是由于Goroutine的高效率实现。OS线程难以达到如此高的并发度。
Goroutine🤔
Goroutine的概念与特点
概念:Goroutine是一个轻量级的执行单元,用于执行并发任务。多个Goroutine可以在同一地址空间中执行,且Go runtime会管理其生命周期。Goroutine通过Channel进行通信。
特点:
- 轻量级:创建和维护Goroutine的开销很小,一个程序可以同时存在成千上万个Goroutine。
- 并发执行:Goroutine允许程序利用多核CPU的优势进行并发执行。
- 自动调度:Goroutine的调度完全由Go运行时进行管理,开发者不需要关心底层细节。
- 资源共享:多个Goroutine可以访问共享的内存资源,这使得Goroutine之间可以高效地通信与协作。
- 无需回收:Goroutine不需要手动回收即可释放资源,运行时会自动回收结束的Goroutine。
- 动态扩展:一个Go程序可以从几个Goroutine开始,然后动态地创建更多Goroutine来利用多核资源。
- 高并发:利用Goroutine可以轻易地编写高并发程序,一个服务器程序可以同时接待成千上万个客户端。
- Channel通信:Goroutine之间可以通过Channel进行高效的消息通信与同步,这使得编写并发程序变得简单。
创建Goroutine的语法
func(){}():第一个func() {...} 定义了一个匿名函数(anonymous function)。第二个()代表调用这个匿名函数。
package main
import (
"fmt"
"time"
)
func main() {
// 创建一个Goroutine
go func() {
fmt.Println("Hello from Goroutine!")
}()
time.Sleep(1)
// 主Goroutine
fmt.Println("Hello from main!")
}
Goroutine的调度与上下文切换
为了让不同的Goroutine有机会运行,runtime会在Goroutine之间进行上下文切换。当一个Goroutine运行一定时间或遇到channel操作时,会主动交出线程的执行权,这时runtime会从其他挂起的Goroutine中选择一个继续运行。上下文切换涉及到保存当前运行Goroutine的程序计数器、堆栈指针等上下文信息,并恢复下一个要运行的Goroutine的上下文信息,这个过程需要一定的时间开销。
Goroutine存在的内存问题及解决方案
存在的问题:
栈溢出:每个Goroutine都有一个私有的栈,默认栈大小为2MB,如果函数调用太深会导致栈溢出。
堆溢出:如果Goroutine中分配过多的堆对象,也会导致内存溢出。
内存泄露:如果Goroutine退出时没有释放之前分配的内存,会导致这部分内存泄漏。
解决方案:
- Goroutine栈大小
方法一:设置runtime.GOMAXPROCS(n, stackSize)
,其中,n指定要使用的P的个数,设置为0表示使用所有CPU核心,stackSize指定新的默认Goroutine栈大小,单位为字节。
方法二:创建Goroutine时,指定栈的大小。go func(params) { /* ... */ }(stackSize, params)
,stackSize必须是第一个参数,在params之前指定。注意:stackSize的参数只在编译时起作用,用于指定Goroutine的栈大小,在被调用的函数内部,它无法访问这个参数。
- 避免无限递归
在Goroutine中调用无限递归函数会引起栈溢出,应该避免这种情况发生。
- 设置垃圾回收阈值
可以通过runtime.GOGC来设置垃圾回收器的阈值,触发更频繁的垃圾回收来避免堆溢出,例如runtime.GOGC=200
设置垃圾回收阈值为200。
- 手动回收堆内存
当一个Goroutine结束时,其私有栈内存会被收回,但堆内存不会自动回收。因此,需要在Goroutine结束前手动回收不再使用的堆内存,否在会发生内存泄露。
go func() {
// 分配一些堆空间
buf := make([]byte, 100)
// 使用buf...
// Goroutine结束前手动回收buf
buf = nil
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done() // Goroutine结束 decrement WaitGroup计数
// 分配内存并使用...
buf := make([]byte, 100)
// 使用buf...
buf = nil // 手动回收内存
}()
wg.Wait() // 等待Goroutine结束
Channel👍
Channel的概念与特点
Channels are not closed by default. They need to be closed explicitly with the
Close
method to indicate that no more values will be sent on the channel.
Channel是一个通信机制,它可以使多个Goroutine之间相互发送数据。Channel允许任意两个Goroutine通过它异步地传递信息。
特点:
方向性:Channel可以是双向的(default)或单向的(指定方向时)。单向Channel按发送/接收方向分为发送(chan<-)和接收(<-chan)Channel。
类型安全:Channel在声明时需指定元素类型,之后只能传送该类型的元素。这保证了Channel通信的类型安全。
FIFO:Channel实现了先入先出的规则,发送的元素按顺序被接收。
阻塞:向一个满的Channel发送数据会导致发送方阻塞,从一个空的Channel接收数据会导致接收方阻塞。
缓冲:可以指定Channel的缓冲区大小,向一个未满的缓冲Channel发送数据不会阻塞。
关闭:关闭的Channel无法再发送数据,但可以继续从中接收数据。向关闭的Channel发送数据会panic。
无缓冲或满的Channel导致Goroutine阻塞,这可用于实现同步和协作。
Channel支持for range形式的接收,这会不断接收Channel的数据知道它被关闭。
Channel可用于函数间传递数据,实现异步执行的函数之间的数据通信。
无缓冲Channel与有缓冲Channel
- 无缓冲Channel
unbuffered channel
就是缓冲大小为 0 的 channel
,无缓冲区的 channel 本身是不存放数据的,在发送和接收都会被阻塞。也就是相当于,你现在是一个 send
身份,但是当另外一个没有 receive
你发送的值之前,你一直处于阻塞(等待接收)状态;就好比你递东西给别人,别人没接,你就要一直举着东西。相反,如果你现在是一个 receive
身份,你就会一直阻塞(等待发送)状态,在你拿到值之前,你会一直等待。就好比你准备要接东西,别人迟迟不给你,你就要一直等着。
package main
import (
"fmt"
)
func main() {
ch := make(chan struct{})
go func() {
defer close(ch)
v := <-ch
fmt.Printf("receive a struct: %v\n", v)
}()
ch <- struct{}{}
fmt.Println("send a struct")
}
- 有缓冲Channel
有缓冲的Channel就是设置了一个buffersize,作为缓冲区的大小。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1) // 设置buffersize大小为1
go func() {
defer close(ch)
defer fmt.Println("我关闭了")
ch <- 1
ch <- 2
fmt.Println("send a struct")
}()
time.Sleep(5 * time.Second)
for {
v, ok := <-ch
if !ok {
fmt.Println("channel closed")
break
}
fmt.Printf("received a struct %v\n", v)
}
}
// output
received a struct 1
received a struct 2
send a struct
我关闭了
channel closed
创建Channel时,当size ==1时,相当于size==0,表示创建一个无缓冲的通道。所以,当向通道传入一个1时,缓冲区就满了。此时,必须等接收端将1读走,才能继续向channel发送2。
将buffersize修改为2时,表示创建一个有缓冲的通道。发送者会一次性将1和2发送到通道里,然后就关闭Channel的写入端。睡眠5秒后,就由接收端将结果读出。
// output
send a struct
我关闭了
received a struct 1
received a struct 2
channel closed
应用:实现一个生产者消费者模型
package main
import (
"fmt"
"time"
)
func producer(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i // 生产一个数据,发送到Channel
fmt.Println("生产者:", i)
}
close(out)
}
func consumer(in <-chan int) {
for num := range in { // 从Channel中接收数据
fmt.Println("消费者:", num) // 消费数据
}
}
func main() {
// 创建一个管道
ch := make(chan int)
go producer(ch) // 启动生产者Goroutine
go consumer(ch) // 启动消费者Goroutine
go consumer(ch) // 启动消费者Goroutine
time.Sleep(5 * time.Second) // 等待
}
Channel超时机制
为什么需要Channel超时机制
使用 select 可以实现 channel 超时,这可以解决以下问题:
- 防止协程阻塞:如果只读取 channel,并且 channel 没有被关闭,协程会一直阻塞。使用超时可以防止协程无限阻塞。
- 实现重试:在超时后可以重新初始化 channel 读取,实现重试的效果。
- 限定操作时间:我们可以给定一个时间限制,如果在限定时间内通信未成功,则做默认操作。
select语句中的case语法
select {
case <communication 1>:
<statements> // 执行当 communication 1 成功时
case <communication 2>:
<statements>
// ...
default:
<statements> // 默认执行的语句,当没有case可以继续时执行
}
select 的一些规则:
如果多个case同时满足,select会随机选择一个执行。
如果没有case满足,则会执行default分支。如果default也不存在,select将阻塞,直到某个通信操作成功。
必须有case和communication,不能一个空的select。
每个case实际上是一个通信操作(channel接收/发送),要求必须为channel。
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
应用 select实现Channel超时
func main() {
retry := 3 // 设置重试次数
success := make(chan struct{})
for {
select {
case <-success: // 操作成功
println("success")
return
case <-time.After(1 * time.Second): // 超时后重试
if retry > 0 {
retry--
println("retry...")
continue
}
println("timeout, exit")
return
}
}
}
这里我们实现了重试的逻辑,在 1 秒后会触发超时并重试,一直重试 3 次。如果 3 次后还是超时,则退出操作。
timer和tickers
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C
fmt.Println("Timer 1 fired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 fired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
time.Sleep(5 * time.Second)
}
在代码中,我们首先创建了一个 Timer
对象 timer1
,它将在 2 秒钟后触发。我们使用 <-timer1.C
语法来等待 timer1
的触发事件,并在触发后输出一条消息。
接着,我们又创建了另一个 Timer
对象 timer2
,它将在 1 秒钟后触发。不同的是,我们使用一个新的 goroutine 来等待 timer2
的触发事件,并在触发后输出一条消息。注意,在我们启动的 goroutine 中,我们使用了 <-timer2.C
语法来等待 timer2
的触发事件。
然后,我们调用了 timer2.Stop()
方法来停止 timer2
的触发事件。由于我们在一个 goroutine 中等待 timer2
的触发事件,所以在调用 Stop()
方法后,我们需要判断 timer2
是否成功停止,并输出一条相应的提示信息。
最后,我们使用 time.Sleep()
函数来让程序休眠 5 秒钟,以等待所有定时器事件的完成。
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}
在代码中,我们首先创建了一个 Ticker
对象 ticker
,它将每隔 500 毫秒触发一次。我们使用 <-ticker.C
语法来等待 ticker
的触发事件,并在触发后输出一条消息。
接着,我们启动了一个 goroutine,用于等待 ticker
的触发事件。在 goroutine 中,我们使用 select
语句来等待两个事件:done
事件和 ticker.C
事件。当接收到 done
事件时,我们就退出 goroutine,并结束 Ticker
的定时器事件。当接收到 ticker.C
事件时,我们就执行相应的操作,输出一条消息。
在主函数中,我们使用 time.Sleep()
函数让程序休眠 1.6 秒钟,以等待一些 ticker.C
事件的触发。然后,我们调用了 ticker.Stop()
方法来停止 ticker
的定时器事件,这将导致 ticker.C
事件的停止。接着,我们向 done
通道发送了一个值,以通知 goroutine 退出,并输出一条相应的提示信息。
示例:一个基于超时的并发TCP服务器
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}
这段代码演示了 Go 语言中的经典并发模式——工作池模式。
在代码中,我们定义了一个 worker()
函数,它将在一个 goroutine 中运行。worker()
函数接受两个通道参数:jobs
和 results
。jobs
通道用于接收任务,results
通道用于发送任务结果。
在 worker()
函数中,我们使用 range
循环来遍历 jobs
通道中的所有任务。对于每个任务,我们输出一条消息表示当前 worker 开始执行任务,并在任务中使用 time.Sleep()
函数来模拟一些工作,然后再输出一条消息表示任务执行完成,并将任务结果发送到 results
通道中。
在主函数中,我们首先创建了两个通道:jobs
和 results
。然后,我们启动了 3 个 goroutine 来执行 worker()
函数,并将它们的 jobs
和 results
通道绑定到相应的通道。
接着,我们将 5 个任务发送到 jobs
通道中,并在发送完毕后关闭 jobs
通道,以通知 worker 所有任务都已经发送完毕。
最后,我们从 results
通道中接收所有的任务结果,并输出相应的信息。由于 results
通道的缓冲区大小为 5,因此我们可以一次性接收所有的任务结果。