channel通道

通道可以被认为是Goroutines通信的管道。类似于管道中的水从一端到另一端的流动,数据可以从一端发送到另一端,通过通道接收。

在前面讲Go语言的并发时候,我们就说过,当多个Goroutine想实现共享数据的时候,虽然也提供了传统的同步机制,但是Go语言强烈建议的是使用Channel通道来实现Goroutines之间的通信。

“不要通过共享内存来通信,而应该通过通信来共享内存” 这是一句风靡golang社区的经典语

Go语言中,要传递某个数据给另一个goroutine(协程),可以把这个数据封装成一个对象,然后把这个对象的指针传入某个channel中,另外一个goroutine从这个channel中读出这个指针,并处理其指向的内存对象。Go从语言层面保证同一个时间只有一个goroutine能够访问channel里面的数据,为开发者提供了一种优雅简单的工具,所以Go的做法就是使用channel来通信,通过通信来传递内存数据,使得内存数据在不同的goroutine中传递,而不是使用共享内存来通信。

什么是通道

通道的概念

通道是什么,通道就是goroutine之间的通道。它可以让goroutine之间相互通信。

每个通道都有与其相关的类型。该类型是通道允许传输的数据类型。(通道的零值为nil。nil通道没有任何用处,因此通道必须使用类似于map和切片的方法来定义。)

通道的声明

声明一个通道和定义一个变量的语法一样:

1
2
3
4
//声明通道
var 通道名 chan 数据类型
//创建通道:如果通道为nil(就是不存在),就需要先创建通道
通道名 = make(chan 数据类型)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
var a chan int
if a == nil {
fmt.Println("channel 是 nil 的, 不能使用,需要先创建通道。。")
a = make(chan int)
fmt.Printf("数据类型是: %T", a)
}
}

运行结果:

1
2
channel 是 nil 的, 不能使用,需要先创建通道。。
数据类型是: chan int

也可以简短的声明:

1
a := make(chan int) 

channel的数据类型

channel是引用类型的数据,在作为参数传递的时候,传递的是内存地址。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
)

func main() {
ch1 := make(chan int)
fmt.Printf("%T,%p\n",ch1,ch1)

test1(ch1)

}

func test1(ch chan int){
fmt.Printf("%T,%p\n",ch,ch)
}

运行结果:

1
2
chan int,0xc00001e180
chan int,0xc00001e180

我们能够看到,ch和ch1的地址是一样的,说明它们是同一个通道。

通道的注意点

Channel通道在使用的时候,有以下几个注意点:

  • 1.用于goroutine,传递消息的。

  • 2.通道,每个都有相关联的数据类型,
    nil chan,不能使用,类似于nil map,不能直接存储键值对

  • 3.使用通道传递数据:<-
    chan <- data,发送数据到通道。向通道中写数据
    data <- chan,从通道中获取数据。从通道中读数据

  • 4.阻塞:
    发送数据:chan <- data,阻塞的,直到另一条goroutine,读取数据来解除阻塞
    读取数据:data <- chan,也是阻塞的。直到另一条goroutine,写出数据解除阻塞。

  • 5.本身channel就是同步的,意味着同一时间,只能有一条goroutine来操作。

最后:通道是goroutine之间的连接,所以通道的发送和接收必须处在不同的goroutine中。

通道的使用语法

发送和接收

发送和接收的语法:

1
2
data := <- a // read from channel a  
a <- data // write to channel a

在通道上箭头的方向指定数据是发送还是接收。

另外:

1
v, ok := <- a //从一个channel中读取

发送和接收默认是阻塞的

一个通道发送和接收数据,默认是阻塞的。当一个数据被发送到通道时,在发送语句中被阻塞,直到另一个Goroutine从该通道读取数据。相对地,当从通道读取数据时,读取被阻塞,直到一个Goroutine将数据写入该通道。

这些通道的特性是帮助Goroutines有效地进行通信,而无需像使用其他编程语言中非常常见的显式锁或条件变量。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import "fmt"

func main() {
var ch1 chan bool //声明,没有创建
fmt.Println(ch1) //<nil>
fmt.Printf("%T\n", ch1) //chan bool
ch1 = make(chan bool) //0xc0000a4000,是引用类型的数据
fmt.Println(ch1)

go func() {
for i := 0; i < 10; i++ {
fmt.Println("子goroutine中,i:", i)
}
// 循环结束后,向通道中写数据,表示要结束了。。
ch1 <- true

fmt.Println("结束。。")

}()

data := <-ch1 // 从ch1通道中读取数据
fmt.Println("data-->", data)
fmt.Println("main。。over。。。。")
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<nil>
chan bool
0xc000086120
子goroutine中,i: 0
子goroutine中,i: 1
子goroutine中,i: 2
子goroutine中,i: 3
子goroutine中,i: 4
子goroutine中,i: 5
子goroutine中,i: 6
子goroutine中,i: 7
子goroutine中,i: 8
子goroutine中,i: 9
结束。。
data--> true
main。。over。。。。

在上面的程序中,我们先创建了一个chan bool通道。然后启动了一条子Goroutine,并循环打印10个数字。然后我们向通道ch1中写入输入true。然后在主goroutine中,我们从ch1中读取数据。这一行代码是阻塞的,这意味着在子Goroutine将数据写入到该通道之前,主goroutine将不会执行到下一行代码。因此,我们可以通过channel实现子goroutine和主goroutine之间的通信。当子goroutine执行完毕前,主goroutine会因为读取ch1中的数据而阻塞。从而保证了子goroutine会先执行完毕。这就消除了对时间的需求。在之前的程序中,我们要么让主goroutine进入睡眠,以防止主要的Goroutine退出。要么通过WaitGroup来保证子goroutine先执行完毕,主goroutine才结束。

示例代码:以下代码加入了睡眠,可以更好的理解channel的阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
done := make(chan bool) // 通道
go func() {
fmt.Println("子goroutine执行。。。")
time.Sleep(3 * time.Second)
data := <-ch1 // 从通道中读取数据
fmt.Println("data:", data)
done <- true
}()
// 向通道中写数据。。
time.Sleep(5 * time.Second)
ch1 <- 100

<-done
fmt.Println("main。。over")

}

运行结果:

1
2
3
子goroutine执行。。。
data: 100
main。。over

再举一个例子,下面这段程序将打印一个数字的各位的平方和以及立方和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
)

func calcSquares(number int, squareop chan int) {
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit
number /= 10
}
squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
sum := 0
for number != 0 {
digit := number % 10
sum += digit * digit * digit
number /= 10
}
cubeop <- sum
}
func main() {
number := 123
sqrch := make(chan int)
cubech := make(chan int)
go calcSquares(number, sqrch)
go calcCubes(number, cubech)
squares, cubes := <-sqrch, <-cubech
fmt.Println("Final output", squares, cubes)
}

运行结果:

1
Final output 14 36

死锁

使用通道时要考虑的一个重要因素是死锁。如果Goroutine在一个通道上发送数据,那么预计其他的Goroutine应该接收数据。如果这种情况不发生,那么程序将在运行时出现死锁。

类似地,如果Goroutine正在等待从通道接收数据,那么另一些Goroutine将会在该通道上写入数据,否则程序将会死锁。

关闭通道

发送者可以通过关闭信道,来通知接收方不会有更多的数据被发送到channel上。

1
close(ch)

接收者可以在接收来自通道的数据时使用额外的变量来检查通道是否已经关闭。

语法结构:

1
v, ok := <- ch  

类似map操作,存储key,value键值对

v,ok := map[key] //根据key从map中获取value,如果key存在, v就是对应的数据,如果key不存在,v是默认值

在上面的语句中,如果ok的值是true,表示成功的从通道中读取了一个数据value。如果ok是false,这意味着我们正在从一个封闭的通道读取数据。从闭通道读取的值将是通道类型的零值。

例如,如果通道是一个int通道,那么从封闭通道接收的值将为0。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
go sendData(ch1)
/*
子goroutine,写出数据10个
每写一个,阻塞一次,主程序读取一次,解除阻塞

主goroutine:循环读
每次读取一个,堵塞一次,子程序,写出一个,解除阻塞

发送发,关闭通道的--->接收方,接收到的数据是该类型的零值,以及false
*/
//主程序中获取通道的数据
for {
time.Sleep(1 * time.Second)
v, ok := <-ch1 //其他goroutine,显示的调用close方法关闭通道。
if !ok {
fmt.Println("已经读取了所有的数据,", ok, v)
break
}
fmt.Println("取出数据:", v, ok)
}

fmt.Println("main...over....")
}
func sendData(ch1 chan int) {
// 发送方:10条数据
for i := 0; i < 10; i++ {
ch1 <- i //将i写入通道中
}
close(ch1) //将ch1通道关闭了。
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
取出数据: 0 true
取出数据: 1 true
取出数据: 2 true
取出数据: 3 true
取出数据: 4 true
取出数据: 5 true
取出数据: 6 true
取出数据: 7 true
取出数据: 8 true
取出数据: 9 true
已经读取了所有的数据, false 0
main...over....

在上面的程序中,send Goroutine将0到9写入chl通道,然后关闭通道。主函数里有一个无限循环。它检查通道是否在发送数据后,使用变量ok关闭。如果ok是假的,则意味着通道关闭,因此循环结束。还可以打印接收到的值和ok的值。

通道上的范围循环

我们可以循环从通道上获取数据,直到通道关闭。for循环的for range形式可用于从通道接收值,直到它关闭为止。

使用range循环,示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
go sendData(ch1)
// for循环的for range形式可用于从通道接收值,直到它关闭为止。
for v := range ch1 {
fmt.Println("读取数据:", v)
}
fmt.Println("main..over.....")
}
func sendData(ch1 chan int) {
for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
ch1 <- i
}
close(ch1) //通知对方,通道关闭
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
读取数据: 0
读取数据: 1
读取数据: 2
读取数据: 3
读取数据: 4
读取数据: 5
读取数据: 6
读取数据: 7
读取数据: 8
读取数据: 9
main..over.....

缓冲通道

非缓冲通道

之前学习的所有通道基本上都没有缓冲。发送和接收到一个未缓冲的通道是阻塞的。

一次发送操作对应一次接收操作,对于一个goroutine来讲,它的一次发送,在另一个goroutine接收之前都是阻塞的。同样的,对于接收来讲,在另一个goroutine发送之前,它也是阻塞的。

缓冲通道

缓冲通道就是指一个通道,带有一个缓冲区。发送到一个缓冲通道只有在缓冲区满时才被阻塞。类似地,从缓冲通道接收的信息只有在缓冲区为空时才会被阻塞。

可以通过将额外的容量参数传递给make函数来创建缓冲通道,该函数指定缓冲区的大小。

语法:

1
ch := make(chan type, capacity)  

上述语法的容量应该大于0,以便通道具有缓冲区。默认情况下,无缓冲通道的容量为0,因此在之前创建通道时省略了容量参数。

示例代码

以下的代码中,chan通道,是带有缓冲区的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"strconv"
"time"
)

func main() {
/*
非缓存通道:make(chan T)
缓存通道:make(chan T ,size)
缓存通道,理解为是队列:

非缓存,发送还是接受,都是阻塞的
缓存通道,缓存区的数据满了,才会阻塞状态。。

*/
ch := make(chan string, 4)
go sendData3(ch)
for {
time.Sleep(time.Second / 2)
v, ok := <-ch
if !ok {
fmt.Println("读完了,,", ok)
break
}
fmt.Println("\t读取的数据是:", v)
}

fmt.Println("main...over...")
}

func sendData3(ch chan string) {
for i := 0; i < 10; i++ {
ch <- "数据" + strconv.Itoa(i)
fmt.Println("子goroutine,写出第", i, "个数据")
}
close(ch)
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
子goroutine,写出第 0 个数据
子goroutine,写出第 1 个数据
子goroutine,写出第 2 个数据
子goroutine,写出第 3 个数据
读取的数据是: 数据0
子goroutine,写出第 4 个数据
子goroutine,写出第 5 个数据
读取的数据是: 数据1
子goroutine,写出第 6 个数据
读取的数据是: 数据2
读取的数据是: 数据3
子goroutine,写出第 7 个数据
读取的数据是: 数据4
子goroutine,写出第 8 个数据
读取的数据是: 数据5
子goroutine,写出第 9 个数据
读取的数据是: 数据6
读取的数据是: 数据7
读取的数据是: 数据8
读取的数据是: 数据9
读完了,, false
main...over...

定向通道

双向通道

通道,channel,是用于实现goroutine之间的通信的。一个goroutine可以向通道中发送数据,另一条goroutine可以从该通道中获取数据。截止到现在我们所学习的通道,都是既可以发送数据,也可以读取数据,我们又把这种通道叫做双向通道。

1
2
data := <- a // read from channel a  
a <- data // write to channel a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import "fmt"

func main() {

ch1 := make(chan string) // 双向,可读,可写
done := make(chan bool)
go sendData(ch1, done)
data := <-ch1 //阻塞
fmt.Println("子goroutine传来:", data)
ch1 <- "我是main。。" // 阻塞

<-done
fmt.Println("main...over....")
}

// 子goroutine-->写数据到ch1通道中
// main goroutine-->从ch1通道中取
func sendData(ch1 chan string, done chan bool) {
ch1 <- "我是小明" // 阻塞
data := <-ch1 // 阻塞
fmt.Println("main goroutine传来:", data)

done <- true
}

运行结果:

1
2
3
子goroutine传来: 我是小明
main goroutine传来: 我是main。。
main...over....

单向通道

单向通道,也就是定向通道。

之前我们学习的通道都是双向通道,我们可以通过这些通道接收或者发送数据。我们也可以创建单向通道,这些通道只能发送或者接收数据。

创建仅能发送数据的通道,示例代码:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import "fmt"

func main() {
/*
单向:定向
chan <- T,
只支持写,
<- chan T,
只读
*/
ch1 := make(chan int)//双向,读,写
//ch2 := make(chan <- int) // 单向,只写,不能读
//ch3 := make(<- chan int) //单向,只读,不能写
//ch1 <- 100
//data :=<-ch1
//ch2 <- 1000
//data := <- ch2
//fmt.Println(data)
// <-ch2 //invalid operation: <-ch2 (receive from send-only type chan<- int)
//ch3 <- 100
// <-ch3
// ch3 <- 100 //invalid operation: ch3 <- 100 (send to receive-only type <-chan int)

//go fun1(ch2)
go fun1(ch1)
data:= <- ch1
fmt.Println("fun1中写出的数据是:",data)

//fun2(ch3)
go fun2(ch1)
ch1 <- 200
fmt.Println("main。。over。。")
}
//该函数接收,只写的通道
func fun1(ch chan <- int){
// 函数内部,对于ch只能写数据,不能读数据
ch <- 100
fmt.Println("fun1函数结束。。")
}

func fun2(ch <-chan int){
//函数内部,对于ch只能读数据,不能写数据
data := <- ch
fmt.Println("fun2函数,从ch中读取的数据是:",data)
}

运行结果:

1
2
3
4
fun1函数结束。。
fun1中写出的数据是: 100
fun2函数,从ch中读取的数据是: 200
main。。over。。

time包中的通道相关函数

主要就是定时器,标准库中的Timer让用户可以定义自己的超时逻辑,尤其是在应对select处理多个channel的超时、单channel读写的超时等情形时尤为方便。

Timer是一次性的时间触发事件,这点与Ticker不同,Ticker是按一定时间间隔持续触发时间事件。

Timer常见的创建方式:

1
2
3
t:= time.NewTimer(d)
t:= time.AfterFunc(d, f)
c:= time.After(d)

虽然说创建方式不同,但是原理是相同的。

Timer有3个要素:

定时时间:就是那个d
触发动作:就是那个f
时间channel: 也就是t.C

time.NewTimer()

NewTimer()创建一个新的计时器,该计时器将在其通道上至少持续d之后发送当前时间。它的返回值是一个Timer。

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

通过源代码我们可以看出,首先创建一个channel,关联的类型为Time,然后创建了一个Timer并返回。

  • 用于在指定的Duration类型时间后调用函数或计算表达式。
  • 如果只是想指定时间之后执行,使用time.Sleep()
  • 使用NewTimer(),可以返回的Timer类型在计时器到期之前,取消该计时器
  • 直到使用<-timer.C发送一个值,该计时器才会过期

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func main() {

/*
func NewTimer(d Duration) *Timer
创建一个计时器:d时间以后触发,go触发计时器的方法比较特别,就是在计时器的channel中发送值
*/
//新建一个计时器:timer
timer := time.NewTimer(3 * time.Second)
fmt.Printf("%T\n", timer) //*time.Timer
fmt.Println(time.Now()) //2023-07-07 16:26:45.5207225 +0800 CST m=+0.001542901

//此处在等待channel中的信号,执行此段代码时会阻塞3秒
ch2 := timer.C //<-chan time.Time
fmt.Println(<-ch2) //2023-07-07 16:26:48.5308961 +0800 CST m=+3.011716501

}

timer.Stop

计时器停止:

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"time"
)

func main() {

//新建计时器,一秒后触发

timer2 := time.NewTimer(3 * time.Second)

//新开启一个线程来处理触发后的事件

go func() {

//等触发时的信号

<-timer2.C

fmt.Println("Timer 2 结束。。")

}()

//由于上面的等待信号是在新线程中,所以代码会继续往下执行,停掉计时器

time.Sleep(1 * time.Second)
stop := timer2.Stop()

if stop {

fmt.Println("Timer 2 停止。。")
}
}

运行结果:

1
Timer 2 停止。。

time.After()

在等待持续时间之后,然后在返回的通道上发送当前时间。它相当于NewTimer(d).C。在计时器触发之前,垃圾收集器不会恢复底层计时器。如果效率有问题,使用NewTimer代替,并调用Timer。如果不再需要计时器,请停止。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"time"
)

func main() {

/*
func After(d Duration) <-chan Time
返回一个通道:chan,存储的是d时间间隔后的当前时间。
*/
ch1 := time.After(3 * time.Second) //3s后
fmt.Printf("%T\n", ch1) // <-chan time.Time
fmt.Println(time.Now()) //2023-07-07 16:36:10.8553299 +0800 CST m=+0.001792901
time2 := <-ch1
fmt.Println(time2) //2023-07-07 16:36:13.8602885 +0800 CST m=+3.006751501

}