Concurrent Prime Sieve

本科刚学编程的时候经常遇到判断一个数是不是素数之类的编程题,当时基础比较差,用的方法也相当简单粗暴,直接采用枚举法判断,虽然简单但是效率很低,时间复杂度为 O(n^2) ,后来慢慢了解了神奇的埃氏筛法,只用 O(nlglgn) 的时间复杂度就可以完成,效率大幅度提高,如果对埃氏筛不了解的童鞋可以看看这篇文章 sieve of Eratosthenes

Sieve

一般的埃氏筛实现都是单线程的,并不能完全发挥现在多核 CPU 的全部性能,下面给出了 Golang 并发版本的埃氏筛实现。

1
2
3
4
5
func generate(ch chan int) {
for i := 2 ;; i++ {
ch <- i
}
}
1
2
3
4
5
6
7
8
func filter(in, out chan int, prime int) {
for {
i := <-in
if i % prime != 0 {
out <- i
}
}
}
1
2
3
4
5
6
7
8
9
10
11
func main() {
ch := make(chan int)
go generate(ch)
for {
prime := <-ch
fmt.Println(prime)
out := make(chan int)
go filter(ch, out, prime)
ch = out
}
}

详解

并发版本的埃氏筛代码行数虽然不多,然而实现还是比较 tricky 的,谁要是第一遍就看懂算我输(你要是天才的话当我没说过)。

channel

首先要起一个 goroutine ,记为 G , 用于生成2以后的整数,并放到通道中(记为 out0 )供别的 goroutine 读取。

1
2
ch := make(chan int)
go generate(ch) // ch <- i

进入第一个 for 循环,从上面的通道中取出第一个数2,毫无疑问,2为素数,所以创建一个通道 out1 ,并起一个 goroutine ,记为 filter1 ,用于过滤所有2的整数倍的数,此时通道数据流的走向为 G -> out1 ,只有通过 filter1 筛选的数才有资格进入 out1 中,此时将 ch 设为 out1ch 用于记录该链上的最后一个通道),第一个循环结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
prime := <-ch             // <-ch
fmt.Println(prime) // 输出2
out := make(chan int) // 记作out1
go filter(ch, out, prime) // ch -> out1
ch = out // ch = out1
^
|
+----+
G --->|out1|--->
+----+
%2
^
|
ch

进入第二个 for 循环,因为此时 chout1 ,尝试去 out1 中取,3由 G 产生,然后经过 filter1 后成功进入 out1 ,说明3是素数,此时我们再创建一个通道 out2 ,并且再起一个 goroutine ,记为 filter2 ,该 goroutine 会过滤所有3的整数倍的数,此时通道数据流的走向为 G -> out1 -> out2 ,只有成功经过 filter1,filter2 筛选的数才能进入通道 out2 中,将 ch 设为 out2 ,第二个循环结束。当G 中产生4,并尝试在这个链上移动时,当它经过 filter1 时,由于4是2的整数倍,说明不是素数,不符合要求,4出局。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
prime := <-ch             // <-out1
fmt.Println(prime) // 输出3
out := make(chan int) // 记作out2
go filter(ch, out, prime) // out1 -> out2
ch = out // ch = out2

^ ^
| |
+----+ +----+
G --->|out1|--->|out2|--->
+----+ +----+
%2 %3
^
|
ch

进入第三个 for 循环,同样的,此时 chout2 ,尝试去 out2 中取,5由 G 产生,经过 filter1,filter2 后成功进入 out2 ,说明5为素数,此时我们再创建一个通道 out3 ,并且再起一个 goroutine ,记为 filter3 ,该 goroutine 会过滤所有5的整数倍的数,此时通道数据流的走向为 G -> out1 -> out2 -> out3 ,只有成功经过 filter1,filter2,filter3 筛选的数才能进入通道 out3 中,将 ch 设为 out3 ,第三个循环结束。当 G 中产生6,并尝试在这个链上移动时,经过 filter1 时,由于6是2的整数倍,说明不是素数,不符合要求,同样出局。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
prime := <-ch             // <-out2
fmt.Println(prime) // 输出5
out := make(chan int) // 记作out3
go filter(ch, out, prime) // out2 -> out3
ch = out // ch=out3

^ ^ ^
| | |
+----+ +----+ +----+
G --->|out1|--->|out2|--->|out3|--->
+----+ +----+ +----+
%2 %3 %5
^
|
ch

进入第四个 for 循环,此时 chout3 ,尝试去 out3 中取,7由 G 产生,然后经过 filter1,filter2,filter3 筛选后成功进入 out3 ,说明7是素数,此时我们再创建一个通道 out4 ,并且再起一个 goroutine ,记为 filter4 ,该 goroutine 会过滤所有7的整数倍的数,此时通道数据流的走向为 G -> out1 -> out2 -> out3 -> out4 ,只有成功经过 filter1,filter2,filter2,filter4 筛选的数才能进入通道 out4 中,将 ch 设为 out4 ,第四个循环结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
prime := <-ch             // <-out3
fmt.Println(prime) // 输出7
out := make(chan int) // 记作out4
go filter(ch, out, prime) // out3 -> out4
ch = out // ch=out4

^ ^ ^ ^
| | | |
+----+ +----+ +----+ +----+
G --->|out1|--->|out2|--->|out3|--->|out4|--->
+----+ +----+ +----+ +----+
%2 %3 %5 %7
^
|
ch

以此类推,并发版本素数筛的本质就是构建 DaisyChain ,每个数经过 DaisyChain 层层筛选后,如果最终保留,说明该数为素数,作为 DaisyChain 的最后一层加入,如此往复,就可以达到快速筛选素数的目的了。

chain of gophers

上面这种将多个通道串联起来形成菊花链的模式还是比较常见,代码的关键就在于对通道指针的理解,例如下面的例子中就指定了 leftright 两个通道”指针”,最初这两个”指针”都指向 leftmost 通道,每次新建一个通道时候,都将 left 重新指向 right ,最后向 right 通道中发送0,leftmost 通道也就会很快收到最终的一个值了。

1
2
3
4
   +----+    +----+    +----+    +----+           +----+
<- | | <- | | <- | | <- | | <- ... <- | | <- 1
+----+ +----+ +----+ +----+ +----+
leftmost right
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func f(left, right chan int) {
left <- 1 + <-right
}

func main() {
start := time.Now()
const n = 10000
leftmost := make(chan int)

right := leftmost
left := leftmost

for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}

go func(c chan int) { c <- 0 }(right)

fmt.Println(<-leftmost, time.Since(start))
}
Pieces of Valuable Programming Knowledges