//集冲器接口的实现类型 type myBuffer struet { //存放数据的通道 ch chan interface{} //缓冲器的关闭状态:0-未关闭;2-已关闭 closed uint32 //为了消除因关闭缓冲器而产生的竞态条件的读写锁 closingLock sync.RWMutex }
显然,缓冲器的实现就是对通道类型的简单封装,只不过增加了两个字段用于解决前面所说的那些问题。字段 closed 用于标识缓冲器的状态。缓冲器自创建之后只有两种状态:未关闭和已关闭。注意,我们需要用原子操作访问该字段的值。
closingLock 字段代表了读写锁。如果你在程序中并发地进行向通道发送值和关闭该通道的操作的话,会产生竞态条件。通过在使用 go 命令(比如 go test)时加入标记 -race,可以检测到这种竞态条件。后面你会看到使用读写锁消除它的正确方法。
NewBuffer 函数用于创建一个缓冲器:
// NewBuffer 用于创建一个缓.冲器。 // 参数size代表缓冲器的容量 func NewBuffer(size uint32) (Buffer, error) { if size == 0 { errMsg := fmt.Sprintf("illegal size for buffer: %d", size) return nil, errors.NewIllegalParameterError(errMsg) } return &myBuffer{ ch: make(chan interFace{}, size), }, nil }
它先检验参数值,然后构造一个 *myBuffer 类型的值并返回。显然,在实现接口方法时,接收者的类型都是 *myBuffer。
注意,errors.NewIllegalParameterError 用于生成一个代表参数错误的错误值,其中 errors 代表的并不是标准库中的 errors 包,而是代码包中的 gopcp.v2/chapter6/webcrawler/
errors 包。大家可以在我的网盘中下载相关代码包(链接:https://pan.baidu.com/s/1yzWHnK1t2jLDIcTPFMLPCA 提取码:slm5)。
Buffer 接口的 Cap 方法和 Len 方法实现起来都相当简单,只需把内建函数 cap 或 len 应用在字段 ch 上就好了。这里也无需使用额外的保证并发安全的措施。
对于 Put 方法,需要注意的是对读写锁的运用和对缓冲器状态的判断。在 Put 方法中,我们应该使用读锁。因为“向通道发送值”的操作会受到“关闭通道”操作的影响。如果不关闭通道,根本无需在进行发送操作时使用锁。另外,如果在进行发送操作前就已经发现通道关闭,就不应该再去尝试发送值了。下面来看 Put 方法的实现:
func (buf *myBuffer) Put(datum interface{}) (ok bool, err error) { buf.closingLock.RLock() defer buf.closingLock.RUnlock() if buf.Closed() { return false, ErrClosedBuffer } select { case buf.ch <- datum: ok = true default: ok = false } return }
在写锁的保护下关闭通道。对应地,在 Put 方法的起始处锁定读锁,然后再去做状态判断。如果反过来,那么通道就有可能在状态判断之后且锁定读锁之前关闭。这时,Put 方法会以为通道未关闭,然后在读锁的所谓保护下向通道发送值,引发运行时恐慌。
接下来的 select 语句主要是为了让 Put 方法永远不会阻塞在发送操作上。在 default 分支中把结果变量 ok 的值设置为 false,加之这时的结果变量 err 必为 nil,就可以告知调用方放入数据的操作未成功,且原因并不是缓冲器已关闭,而是缓冲器已满。
Get 方法的实现要简单一些。因为从通道接收值的操作可以丝毫不受到通道关闭的影响,所以无需加锁。其实现如下:
func (buf *myBuffer) Get() (interface{}, error) { select { case datum, ok := <-buf.ch: if !ok { return nil, ErrClosedBuffer } retum datum, nil default: return nil, nil } }
这里同样使用 select 语句让它变成非阻塞的。顺便提一句,ErrClosedBuffer 是一个变量,表示缓冲器已关闭的错误,它的声明是这样的:
//表示缓冲器已关闭的错误
var ErrClosedBuffer = errors.New("closed buffer")
这遵从了 Go语言程序中的惯用法。标准库中的类似变量有 io.EOF、bufio.ErrBufferFull 等。
再来说 Close 方法。在关闭通道之前,先要避免重复操作。因为重复关闭一个通道也会引发运行时恐慌。避免措施就是先检查 closed 字段的值。当然,必须使用原子操作。下面是它的实现:
func (buf *myBuffer) Close() bool { if atomic.CompareAndSwapUint32(&buf.closed, 0, 1) { buf.closingLock.Lock() close(buf.ch) buf.closingLock.Unlock() return true } return false }
最后,在 Closed 方法中读取 closed 字段的值时,也一定要使用原子操作:
func (buf *myBuffer) Closed() bool { if atomic.LoadUint32(&buf.closed) == 0 { return false } return true }
千万不要假设读取共享资源就是并发安全的,除非资源本身做出了这种保证。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/22613.html