funcworks(id int, c chanint) { // 不断的从channel取 for { fmt.Printf("channel %d receive %c\n", id, <-c) } }
funcworkIfNotClose(id int, c chanint) { // 接收方判断channel中有数据就不断的从channel取 for { n, ok := <-c if !ok{ break } fmt.Printf("channel %d receive %c\n", id, n) } }
funcworkIfNotCloseSimple(id int, c chanint) { // 接收方判断channel中有数据就不断的从channel取 for n := range c{ fmt.Printf("channel %d receive %c\n", id, n) } }
funccreateWorks(id int)chan<- int { c := make(chanint) go works(id, c) return c }
funcfirst() { // 创建channel用例 var chans [10]chanint for i := 0; i < 10; i++ { chans[i] = make(chanint) go works(i, chans[i]) } // 往channel放 for i := 0; i < 10; i++ { chans[i] <- 'a' + i } // 往channel放 for i := 0; i < 10; i++ { chans[i] <- 'A' + i } }
funcsecond() { // 只能发数据 chan<- int 将channel 作为函数返回参数 即 channel也是一等公民 var chann [10] chan<- int for i := 0; i < 10; i++ { chann[i] = createWorks(i) }
for i := 0; i < 10; i++ { chann[i] <- 'k' + i }
for i := 0; i < 10; i++ { chann[i] <- 'K' + i } }
functhirdBufferedChannel() { // channel缓冲区 可以放入不大于缓冲区的大小时,可以不用取 c := make(chanint, 4) go works(0, c) c <- 'u' c <- 'v' c <- 'w' c <- 'x' c <- 'y' }
funcchannelClose() { c := make(chanint, 4) go workIfNotCloseSimple(0, c) c <- 'U' c <- 'V' c <- 'W' close(c) }
// 输出 channel 7 receive h channel 9 receive j channel 8 receive i channel 5 receive f channel 3 receive d channel 6 receive g channel 4 receive e channel 2 receive c channel 1 receive b channel 0 receive a channel 0 receive A ----------------- channel 4 receive E channel 3 receive D channel 1 receive B channel 7 receive H channel 2 receive C channel 5 receive F channel 6 receive G channel 9 receive J channel 8 receive I channel 0 receive k channel 1 receive l channel 2 receive m channel 3 receive n channel 4 receive o channel 5 receive p channel 6 receive q channel 7 receive r channel 7 receive R channel 8 receive s channel 8 receive S channel 3 receive N channel 5 receive P channel 0 receive K channel 6 receive Q channel 9 receive t channel 9 receive T channel 2 receive M channel 4 receive O channel 1 receive L ----------------- channel 0 receive u channel 0 receive v channel 0 receive w channel 0 receive x channel 0 receive y channel 0 receive U channel 0 receive V channel 0 receive W Exiting.
// 输出 channel 7 receive h channel 5 receive f channel 8 receive i channel 9 receive j channel 2 receive c channel 3 receive d channel 1 receive b channel 0 receive a channel 4 receive e channel 6 receive g channel 6 receive G channel 7 receive H channel 8 receive I channel 9 receive J channel 0 receive A channel 1 receive B channel 2 receive C channel 3 receive D channel 5 receive F channel 4 receive E ---------------- channel 0 receive j channel 1 receive k channel 4 receive n channel 2 receive l channel 3 receive m channel 2 receive L channel 8 receive r channel 0 receive J channel 1 receive K channel 7 receive q channel 5 receive o channel 9 receive s channel 3 receive M channel 5 receive O channel 6 receive p channel 4 receive N channel 6 receive P channel 9 receive S channel 7 receive Q channel 8 receive R
// 树中最大值 c := t.travelWithChannel() maxNode := 0 // 从channel中取 for n := range c { if maxNode < n.Value { maxNode = n.Value } } fmt.Println("Max node:", maxNode) }
/** * Author: Wang P * Version: 1.0.0 * Date: 2021/2/5 上午10:37 * Description: select **/
package goroutines
import ( "fmt" "math/rand" "time" )
type SelectWorker struct { id string c chanint }
func(w *SelectWorker)setValue(id string) { w.id = id }
funcselectNotBlock(c1, c2 chanint) { for { select { case n := <- c1: fmt.Printf("receice %d from c1\n", n) case n := <-c2: fmt.Printf("receice %d from c2\n", n) default: fmt.Println("not receive anything") } } }
funcselectBlock(c1, c2 chanint) { for { select { case n := <- c1: fmt.Printf("receice %d from c1\n", n) case n := <-c2: fmt.Printf("receice %d from c2\n", n)
} } }
funcselectWorkConditionBlock(rw, sw1, sw2 SelectWorker) { var values []int id := "" endTime := time.After(time.Second * 10) tick := time.Tick(time.Second) for { activeWorker := SelectWorker{ c: nil, } var activeValue int iflen(values) >0 { rw.setValue(id) activeWorker = rw activeValue = values[0] } select { case n := <-sw1.c: id = sw1.id values = append(values, n) case n := <-sw2.c: id = sw2.id values = append(values, n) case activeWorker.c <- activeValue: values = values[1:] case <-time.After(time.Millisecond * 600): // 500毫秒未产生数据 fmt.Println("程序超时") case <- tick: fmt.Println("queue len = ", len(values)) case <- endTime: // 程序执行到endTime时结束 fmt.Println("程序执行结束") return } } }
funcgenerateChannel()chanint{ c := make(chanint) gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) c <- i i++ } }() return c }
funcgenerateWorkChannel(id string)SelectWorker { w := SelectWorker{ id:id, c: make(chanint), } gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) w.c <- i i++ } }() return w }
/** * Author: Wang P * Version: 1.0.0 * Date: 2021/2/5 下午5:37 * Description: 基于WaitGroup实现通信来共享内存 **/
package goroutines
import ( "fmt" "sync" )
type WorkerWG struct { in chanint wg *sync.WaitGroup }
funcdoWorkWaitDone(id int, w WorkerWG) { // 接收方判断channel中有数据就不断的从channel取 for n := range w.in{ fmt.Printf("channel %d receive %c\n", id, n) w.wg.Done() } }
funccreateWaitDoneWorkers(id int, wg *sync.WaitGroup)WorkerWG { w := WorkerWG{ in: make(chanint), wg: wg, } go doWorkWaitDone(id, w) return w }
funccommunicateWaitDone() { // 只能发数据 chan<- int 将channel 作为函数返回参数 即 channel也是一等公民 var works [10]WorkerWG var wg sync.WaitGroup for i := 0; i < 10; i++ { works[i] = createWaitDoneWorkers(i, &wg) }
//wg.Add(20)
for i, worker := range works{ worker.in <- 'a' + i wg.Add(1) }
for i, worker := range works{ worker.in <- 'A' + i wg.Add(1) }
wg.Wait() }
type WorkerWG2 struct { in chanint done func() // 函数式编程 }
// WorkerWG封装 funcdoWorkWaitDone2(id int, w WorkerWG2) { // 接收方判断channel中有数据就不断的从channel取 for n := range w.in{ fmt.Printf("channel %d receive %c\n", id, n) w.done() } }
funccreateWaitDoneWorkers2(id int, wg *sync.WaitGroup)WorkerWG2 { w := WorkerWG2{ in: make(chanint), done: func() { wg.Done() }, } go doWorkWaitDone2(id, w) return w }
funccommunicateWaitDone2() { // 只能发数据 chan<- int 将channel 作为函数返回参数 即 channel也是一等公民 var works [10]WorkerWG2 var wg sync.WaitGroup for i := 0; i < 10; i++ { works[i] = createWaitDoneWorkers2(i, &wg) }
wg.Add(20) for i, worker := range works{ worker.in <- 'a' + i //wg.Add(1) } for i, worker := range works{ worker.in <- 'A' + i //wg.Add(1) } wg.Wait() }
// 输出 channel 7 receive h channel 5 receive f channel 6 receive g channel 0 receive a channel 8 receive i channel 1 receive b channel 3 receive d channel 2 receive c channel 4 receive e channel 9 receive j channel 9 receive J channel 3 receive D channel 0 receive A channel 5 receive F channel 2 receive C channel 4 receive E channel 6 receive G channel 1 receive B channel 7 receive H channel 8 receive I channel 9 receive j channel 6 receive g channel 8 receive i channel 4 receive e channel 0 receive a channel 0 receive A channel 2 receive c channel 3 receive d channel 5 receive f channel 1 receive b channel 1 receive B channel 7 receive h channel 7 receive H channel 3 receive D channel 5 receive F channel 4 receive E channel 6 receive G channel 9 receive J channel 2 receive C channel 8 receive I
// 输出 xxx@xxxdeMacBook-Pro ~/Projects/golang/src/offer/note master ±✚ go run -race main.go safe Increase safe Increase ================== WARNING: DATA RACE Read at 0x00c000134010 by main goroutine: offer/note/goroutines.(*AtomicInt).get() /Users/wangpeng/Projects/golang/src/offer/note/goroutines/atomic.go:33 +0xab offer/note/goroutines.AtomicDemo() /Users/wangpeng/Projects/golang/src/offer/note/goroutines/atomic.go:43 +0xb4 main.main() /Users/wangpeng/Projects/golang/src/offer/note/main.go:75 +0x2f
Previous write at 0x00c000134010 by goroutine 7: offer/note/goroutines.(*AtomicInt).increase.func1() /Users/wangpeng/Projects/golang/src/offer/note/goroutines/atomic.go:26 +0xbd offer/note/goroutines.(*AtomicInt).increase() /Users/wangpeng/Projects/golang/src/offer/note/goroutines/atomic.go:27 +0x9e offer/note/goroutines.AtomicDemo.func1() /Users/wangpeng/Projects/golang/src/offer/note/goroutines/atomic.go:40 +0x38
Goroutine 7 (finished) created at: offer/note/goroutines.AtomicDemo() /Users/wangpeng/Projects/golang/src/offer/note/goroutines/atomic.go:39 +0x90 main.main() /Users/wangpeng/Projects/golang/src/offer/note/main.go:75 +0x2f ================== 2 Found 1 data race(s) exit status 66
go test atomic_test.go -race ================== WARNING: DATA RACE Read at 0x00c00008e008 by goroutine 10: runtime.growslice() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/slice.go:162 +0x0 command-line-arguments.raceCondition.func1() /Users/wangpeng/Projects/golang/src/offer/notes/awesomego/atomic_test.go:33 +0xec command-line-arguments.raceCondition·dwrap·1() /Users/wangpeng/Projects/golang/src/offer/notes/awesomego/atomic_test.go:34 +0x47
Previous write at 0x00c00008e008 by goroutine 8: command-line-arguments.raceCondition.func1() /Users/wangpeng/Projects/golang/src/offer/notes/awesomego/atomic_test.go:33 +0x110 command-line-arguments.raceCondition·dwrap·1() /Users/wangpeng/Projects/golang/src/offer/notes/awesomego/atomic_test.go:34 +0x47
虽然s.Load与s.Store分别为原子操作,但整个过程并非原子,因此还会存在竞态条件。但是当涉及到可以使用Read-copy-update[1]模式管理的共享资源时,它非常出色。在这种技术中,我们通过引用获取当前值,当我们想要更新它时,我们不修改原始值,而是替换指针(因此没有人访问另一个线程可能访问的相同资源)。另外,atomic原子操作很快,因为它们依赖于原子 CPU 指令而不是依赖外部锁。使用互斥锁时,每次获得锁时,goroutine 都会短暂暂停或中断,这种阻塞占使用互斥锁所花费时间的很大一部分。