PHILIP x BLOG

心有猛虎,细嗅蔷薇


  • 首页

  • 归档

深入系列之zookeeper

发表于 2020-03-28 |

zookeeper有什么用

zk是一个分布式领域的知名中间件,可以在其中存储 key-value形式的数据,每个key-value数据我们可以称为一个znode,znode之间又可以通过树形的方式来组织,从而产生父节点和子节点的联系。除了数据存储之外,zk还有很多特性,比如watch机制、临时节点机制,这使得zk可以用来实现很多其他需求,比如命名注册、分布式锁、pubsub机制、数据同步等

zookeeper 集群架构

zk16.png

zookeeper的特点

高可用

zk的高可用是通过冗余节点来保证的,如果其中有一个节点挂掉,还是能提供服务的。如果要保证所有节点都可以用来处理读请求,则需要所有节点数据做replication复制,而zk有专门的机制来保证这一点。

可扩展性

zk可以在线添加节点,新的节点会被很快纳入集群并提供服务,很方便的进行扩容。

读写操作

由于zk集群内的数据是一致的,因此读操作可以由任一节点来完成;而写操作由唯一的leader节点来统一控制,并同步到所有节点。实际应用中,client连接到集群中时,是通过tcp长连接连到任一可用的节点上的,如果是读,直接通过当前连接的节点来完成,如果是写操作,则会统一转发到leader节点来完成,等所有的节点数据都同步后,写操作才算完成,并通知对应的client。

所有的写操作都通过唯一的leader节点完成,也保证了客户端的操作的顺序性,这样在处理并发时就不会出问题。

数据同步

除了正常写操作时,所有节点的数据需要同步之外,如果有节点新加入集群(可能是新增或者宕机之后恢复),也会通过同步指令来保证新增的节点数据和其他节点的一致性。

watch机制

由于client与集群节点之间是通过 tcp 长连接来进行通信的,且会通过心跳来做健康检查,因此任何节点的变化都可以通过tcp实时通知到所有的client。

zookeeper的数据模型

zk的数据是存在磁盘上的,以目录树的形式存储,是由很多个 dataNode节点组成的一个树形结构,这个树形结构除了存储在磁盘中,在每个zk节点的内存中也维护着,叫做 DataTree,包括了zk集群中所有dataNode的数据。

除了实际数据,zk还有一份事务日志,类似于 mysql 的 binlog,这其实可以理解成增量修改。

zk在内存中构造了个DataTree的数据结构,维护着 path 到 dataNode 的映射以及 dataNode 间的树状层级关系。为了提高读取性能,集群中每个服务节点都是将数据全量存储在内存中。可见,zk最适于读多写少且轻量级数据(默认设置下单个 dataNode 限制为1MB大小)的应用场景。数据仅存储在内存是很不安全的,zk采用事务日志文件及快照文件的方案来落盘数据,保障数据在不丢失的情况下能快速恢复。

zk在集群中实际写数据的时候,为了保证一致性,使用了两阶段提交,其中第一阶段就是写这个事务日志,第二阶段才是实际更新内存中的 DataTree,也即当前数据。

数据快照位于磁盘的 version-2/snapshot.xxx 文件,日志文件为 version-2/log.xxxxx,所有写事务操作都是需要记录到日志文件中的,可通过 dataLogDir配置文件目录,文件是以写入的第一条事务zxid为后缀,方便后续的定位查找。默认设置下,每次事务日志写入操作都会实时刷入磁盘。

unknown.png

这些数据都是二进制格式,如果要查看,需要用lib目录的一些工具jar包:

92D3EB69-8C64-4EEC-82F1-6566171BBF4C

查看实际的数据快照:

1
2
$ cd lib
$ java -cp zookeeper-3.5.6.jar:zookeeper-jute-3.5.6.jar:slf4j-log4j12-1.7.25.jar:log4j-1.2.17.jar:slf4j-api-1.7.25.jar org.apache.zookeeper.server.SnapshotFormatter /data/zookeeper1/version-2/snapshot.100012c92

unknown.png

unknown.png

查看事务日志:

1
2
$ cd lib
$ java -cp zookeeper-3.5.6.jar:zookeeper-jute-3.5.6.jar:slf4j-log4j12-1.7.25.jar:log4j-1.2.17.jar:slf4j-api-1.7.25.jar org.apache.zookeeper.server.LogFormatter /data/zookeeper1/version-2/log.100012c94 > tlogs

unknown.png

zk存储的数据是保证的最终一致性,也就是说数据最终会一致。对于zk这种分布式系统来说,最好的情况当然是所有节点数据一致,但也可能出现数据不一致的情况(比如两阶段提交时,有个别节点没有回应 ACK),这时一般是leader节点的数据比其他节点数据新,可以通过不停的同步来将所有节点的数据最终和leader节点保持一致。

根据CAP定理,P必然无法保障,因此A和C需要做权衡。zk是优先考虑高可用(A),牺牲了完全一致性,但他是最终一致性的。也就是说,由于多个节点的数据不会时刻保证一致性,因此在读取时是可能读取不到最新数据的。

zookeeper的ZAB协议

ZAB全称 Zookeeper Atomic Broadcast Protocol,意思就是 zk 的原子广播协议,这是zk的核心机制。它实际是 multi-paxos 的一种实现,类似的multi-paxos 实现的还有 raft 算法(ETCD采用)。

ZAB 简单来说是定义了一套规则,用来在各个节点之间复制数据。此外,它还定义了如何选举leader,并定义了如何处理节点恢复。其实就是分别实现了 multi-paxos 专门拆分的几个子问题。

数据复制

ZAB使用两阶段提交来同步数据。当然这个两阶段提交并不是传统意义上的两阶段提交,而是稍微改造过的,第一阶段只需要超过半数节点有 ACK 响应就实施第二阶段,即 COMMIT 数据修改。而且,在第一阶段,propose 不会被拒绝,因为这个过程就是简单的写日志,所以不存在拒绝导致回滚的处理。总之,是一个简化版本的二阶段提交。

zk1

另外需要注意的是,leader和follower之间的数据传输都是按照 FIFO 规则来控制的,所有的 propose 按照顺序依次处理,这样就能保证所有的修改操作都是顺序执行的;此外,server之间通信是采用 TCP 的,TCP的顺序性也保证了传输的顺序性。最终,我们解决了因为分布式环境下并发带来的数据不一致问题。

leader选举

数据同步的二阶段提交是需要一个协调者来发起和统筹整个过程的,在 zk 集群中就由 leader 来充当这个角色。所以如何选举 leader 也是个很重要的事情。

zk集群一启动,就需要选举一个leader,完成后进入对外服务状态;如果服务过程中,leader节点宕机,也需要在剩余节点中重新选举 leader,完成后再继续提供服务;

那什么样的人可以称为leader呢?

  • 首先,leader 节点的数据必须要是最新的
  • 其次,leader 被过半数的节点推举

选举的过程有点类似于现实生活中的投票,只要一个候选节点获得的票数超过半数,就成为 leader。稍微有点不同的是,每个投票者是可以多次修改投给哪个候选人的,只要任一时刻一个候选人获得的票数超过一半,leader 随机产生,选举过程也就结束了。

具体来讲,就是每个节点可以投票给集群中的任一节点,并将自己的投票结果通过广播 (broadcast) 通知其他节点。每个节点都会实时统计所有节点的得票情况,如果发现有节点得票过半,就完成选举。

需要说明的是,投票的标准是什么?其实就是各个节点的数据版本号,版本号约高的可以认为数据越新,在投票过程中,数据新的节点胜出,因为数据新的可以同步给数据老的,从而集群整体的数据更新;如果数据版本号一致,就比较节点的序号,约定序号大的节点胜出。

集群启动一开始,由于没有广播通信,各个节点都不知道其他节点的数据版本号,无从比较,因此都默认投给自己,并将投给自己的信息广播给其他节点。每个节点在收到广播后,将别人的数据版本号和自己的版本号比较,如果一样就比较节点序号,通过比较如果发现别人胜出,就修改自己的投票,再次广播给其他节点。通过这样一个不停广播加不停修改选票的过程,最终肯定会有一个节点的得票超过半数,并成为 leader 节点。

注意,只要有一个节点超过半数,就立即停止选举过程,后续收到的其他投票信息也不再处理了。例如有3个节点,只要有一个节点的票数大于等于2,则选举结束。有没有可能出现一种情况,就是有节点得票过半后,有一个节点的数据版本比 leader 的数据还要新,它又广播过来一个消息投票给自己?这时候是否要重新选举?

答案是否,不会重新选举。我们可以这样来理解,这个后来的节点,它的数据虽然比较新,但是他的数据并没有同步到其他过半节点(否则别人就会投票给它,而不是投给我们已经选出的数据版本更老一些的leader了),这是什么情况下才出现的?很可能是集群宕机,leader的数据没有及时同步给其他节点。这相当于一个没有完成的写操作(只修改了leader的数据,而没有来得及修改其他节点的数据),未完成的操作,其数据是可以丢弃的。实际遇到这种情况时,这个后来的节点会把自己的数据回滚到和 leader一致。

zookeeper如何防止脑裂

说人话,脑裂意思是集群中有多个leader,类似leader也就是大脑裂开一样。秘诀就是任何leader的选举都需要得票过半数。

参考

Apache ZAB—Zookeeper Atomic Broadcast Protocol)

Architecture of ZAB – ZooKeeper Atomic Broadcast protocol

浅析Zookeeper的一致性原理

ZooKeeper Internals

未命名

发表于 2019-12-08 |

vue-router中的hash和history

发表于 2019-09-21 |

引子

这几天同事在实现内部的页面时,用到了基于 vue 框架的 iview-admin 解决方案,在实际使用时遇到了以下一个问题:

  • 在正常操作页面时,浏览器地址栏里会显示当前页面的路由信息,比如 http://example.com/xxx/yyy; 但是在按 F5 刷新该页面的时候,会收到web服务器返回的 invalid method or path 提示。这是为什么?

针对该问题,我好好研究了一番。

问题研究

通过查看 chrome 浏览器的 检查 选项,我们发现刷新时浏览器向服务器发送了路径为 xxx/yyy 的GET 请求,但是服务器并没有对改路由注册处理方法,因此返回了错误。

按照我的理解,xxx/yyy 应该是前端的路由,发往服务器处理是太不合理了;但是浏览器里刷新之后,重新发送 GET 请求似乎也合情合理。经过一番 google,我发现似乎和 vue-router 的一个称为 history 模式的东西有关。

单页面应用SPA

定义

维基百科对SPA的定义:

单页应用(英语:single-page application,缩写SPA)是一种网络应用程序或网站的模型,它通过动态重写当前页面来与用户交互,而非传统的从服务器重新加载整个新页面。这种方法避免了页面之间切换打断用户体验,使应用程序更像一个桌面应用程序。

在单页应用中,所有必要的代码(HTML、JavaScript和CSS)都通过单个页面的加载而检索,或者根据需要(通常是为响应用户操作)动态装载适当的资源并添加到页面。页面在过程中的任何时间点都不会重新加载,也不会将控制转移到其他页面。

现在很多 Javascript 框架都是基于单页面的,如 AngularJS、React、Vue.js 等。

SPA与服务端渲染

SPA技术将逻辑从服务器转移到了客户端。这导致Web服务器发展为一个纯数据API或Web服务。这种架构的转变在一些圈子中被称为“瘦服务器架构”,以强调复杂性已从服务端转移到客户端,并认为这最终降低了系统的整体复杂性。

另外一种方式称为服务端渲染,这种设计是服务器在内存中保存必要的客户端所处状态。这种模式下,当任何请求到达服务器(通常因用户操作)时,服务器发送适当的HTML和/或JavaScript,以及具体的更改,以使客户端达到新的期望状态(如添加、删除或更新部分客户端的DOM)。与此同时更新服务器中的状态。这种设计下的大部分逻辑都在服务器上运行,HTML通常也在服务器上呈现。在某些方面,服务器是模拟Web浏览器,接收事件并执行服务器状态下的增量更改,将这些更改自动传播到客户端。

这种方法需要更多的服务器内存和处理能力,但优点是简化的开发模型,因为:1、应用程序通常完全在服务器中编写;2、服务器中的数据和UI状态在相同的内存空间中共享,不需要自定义客户端/服务器通信隧道。

SPA的问题

  • 很多搜索引擎爬虫缺乏执行 javascript 的能力,SEO(搜索引擎最优化)成为SPA的问题,由于只有一个index.html的页面,爬虫往往爬不到很多内容
  • 根据单页应用(SPA)模型的定义,它只有“单个页面”,因此这打破了浏览器为页面历史记录导航所设计的“前进/后退”功能。当用户按下后退按钮时,可能会遇到可用性障碍,页面可能返回真正的上一个页面,而非用户所期望的上一个页面
  • SPA应用需要在一开始就加载所有的页面,因此加载速度往往不快。为了加快加载速度,可以采用一些方法比如多项缓存措施、需要时再加载某些模块(懒加载)

前端路由

路由这个概念最先是后端出现的。在以前用模板引擎开发页面时,经常会看到这样 http://hometown.xxx.edu.cn/bbs/forum.php ,有时还会有带.asp或.html的路径,这就是所谓的SSR (Server Side Render),通过服务端渲染,直接返回页面。

简单来说路由就是用来跟后端服务器进行交互的一种方式,通过不同的路径,来请求不同的资源,请求不同的页面是路由的其中一种功能。

前端路由的出现要从 ajax 开始。Ajax,全称 Asynchronous JavaScript And XML,是浏览器用来实现异步加载的一种技术方案。在 90s 年代初,大多数的网页都是通过直接返回 HTML 的,用户的每次更新操作都需要重新刷新页面。及其影响交互体验,随着网络的发展,迫切需要一种方案来改善这种情况。有了 Ajax 后,用户交互就不用每次都刷新页面,体验带来了极大的提升,为后来异步交互体验方式的繁荣发展带来了根基。

而异步交互体验的更高级版本就是 SPA,单页应用不仅仅是在页面交互是无刷新的,连页面跳转都是无刷新的,为了实现单页应用,所以就有了前端路由。

vue中为了构建SPA,需要引入前端路由系统,这也就是 Vue-Router 存在的意义。前端路由的核心,就在于 —— 改变视图的同时不会向后端发出请求。

hash模式和history模式

为了达到不自动发送请求给后端,浏览器当前提供了以下两种模式支持:

hash: 即地址栏 URL 中的 # hash 符号(此 hash 不是密码学里的散列运算)2014年之前广泛应用。
比如这个 URL:http://www.abc.com/#/hello,hash 的值为 #/hello。它的特点在于:hash 虽然出现在 URL 中,但不会被包括在 HTTP 请求中,对后端完全没有影响,因此改变 hash 不会重新加载页面。每次 hash 值的变化,还会触发 hashchange 这个事件,通过这个事件我们就可以知道 hash 值发生了哪些变化。

history : 14年后,因为HTML5标准发布,多了两个 API,pushState 和 replaceState,通过这两个 API 可以改变 url 地址且不会发送请求。同时还有 onpopstate 事件。通过这些就能用另一种方式来实现前端路由了,但原理都是跟 hash 实现相同的。用了 HTML5 的实现,单页路由的 url 就不会多出一个#,变得更加美观。

但因为没有 # 号,所以当用户刷新页面之类的操作时,浏览器还是会给服务器发送请求。为了避免出现这种情况,所以这个实现需要服务器的支持,需要把所有路由都重定向到根页面。

VUE的history模式

vue中对history模式的支持可以参考 vue中HTML5 history模式,需要后端配置做一些处理,即服务端在匹配路由时,如果发现找不到,则不能返回404或者其他错误信息了,而需要特殊处理一下:直接返回/对应的index.html,这相当于重新加载了一次单页面应用,前端收到返回后再根据自己的路由导航到对应的页面。

参考资料

维基-单页应用

知乎-前端路由是什么

vue中mode hash和history的区别

Go研究之内存管理

发表于 2018-12-30 |

参考资料

Go Memory Management

Go’s Memory Allocator - Overview

技术分享:内存管理

818虚拟内存

发表于 2018-12-21 |

说明:本文大部分内容参考了 <深入理解计算机系统> 一书中关于虚拟内存的讲解,综合了自己的一些理解及其他资料,可以认为是一篇读书笔记。

基本概念

现代处理器为了更加有效的管理物理内存并减少出错,提出了 “虚拟内存” 的概念,英文是 Virual Memory (VM)。虚拟内存是对内存空间的一种抽象表示,为每个进程提供一个大的、一致的和私有的地址表示。它有以下特性:

  • 将物理内存看成一个存储在磁盘上的地址空间的高速缓存,该缓存只保存进程内的活动区域,并根据需要在磁盘和内存之间来回交换数据。
  • 每个进程有自己私有的独立的虚拟内存空间,对内存的操作都是通过虚拟内存间接进行的,保证了每个进程的内存不被其他进程破坏。
  • 虚拟内存空间和物理内存空间有特有的映射关系,访问内存时,会将虚拟地址翻译为物理地址。

虚拟内存的工作机制

物理寻址和虚拟寻址

计算机的内存被组织成一个由 M 个连续的字节大小的单元组成的数组,每个字节都有唯一的物理地址 (Physical Address, PA)。早期的 CPU 在寻址时采用直接访问物理地址的方式,称为物理寻址。现代CPU都普遍采用一种称为虚拟寻址的方式来寻址,如下图所示:

image-20181221175910154

使用虚拟寻址,CPU 通过虚拟地址 (Virtual Address, VA) 来访问内存,这个虚拟地址在被送到内存之前先转换成物理地址,这个转换过程是通过 CPU 芯片上的内存管理单元 (Memory Management Unit, MMU) 来完成的。

地址空间与虚拟内存状态

地址空间

不管是物理内存还是虚拟内存,都是有大小限制的,也因此对应了一段特定大小的地址空间。

现代系统通常支持 32 位 或 64 位 地址空间,也就是可以表示的最大地址为 1 << 32 (即4G) 或者 1 << 64 。 对于 64 位的情况,目前实际可支持的空间还没有这么大。例如 Intel Core i7 处理器,支持 48 位 (256T) 虚拟地址空间和 52 位 (4PB) 物理地址空间,还有一个兼容模式,支持 32 位 (4GB) 的虚拟和物理地址空间。

页面 Page

概念上,虚拟内存被组织为一个由存放在磁盘上的 N 个连续字节组成的数组。每个字节都有一个唯一的地址,作为数组的索引。但实际工作时,并不是直接通过这个索引来定位内存。VM 系统通过将虚拟内存分割为虚拟页 (Virtual Page, VP) ,这是一种大小固定的内存块,在 X86 上,这个页的大小通常为 4K 个字节;类似的,物理内存也被同样大小的物理页 (Physical Page, PP)。在定位一个具体的内存地址时,是通过页号加页内偏移量来进行的。如果是一个 4G 大小的内存,就可能被分为 4G / 4K = 1024 个页面。

虚拟页状态

任意时刻,虚拟内存的页面 (VP, Virtual Page) 都是下面三种状态中的一种

  • 还未被分配
  • 已分配,缓存在物理内存中
  • 已分配,未缓存,在磁盘中

虚拟内存在生成时,默认就是未分配的状态;当程序运行时第一次访问到该地址时,才会去实际分配内存。分配时,如果物理内存足够,则直接将地址内容缓存在物理内存中;而当物理内存不足时,则需要将物理内存中暂时不用的部分给交换出去,写在磁盘上。空出来的部分就可以被分配了。如果要访问的虚拟内存页面已经被分配,但是未缓存在物理内存上,则同样会触发内存和磁盘之间的交换,将暂时不同的内容换出到磁盘,而将要访问的内容从磁盘上换入到物理内存中。

下图是一个这三种状态的简单展示

image-20181221180023209

页表

同任何缓存系统一样,虚拟内存系统必须有某种方法来判定一个虚拟页是否缓存在物理内存的某个地方。如果是,系统还必须确定这个虚拟页存放在物理内存的哪个物理页中。如果不命中,系统必须判断这个虚拟页存放在磁盘的哪个位置,在物理内存中选择一个暂时不用的页,并将虚拟页的内容从磁盘复制到物理内存中,替换掉该暂时不用的页。

这些功能是由软硬件联合提供的,包括系统内核、CPU芯片中MMU中的地址翻译硬件和一个存放在物理内存中叫做页表 (Page Table) 的数据结构,页表将虚拟页映射到物理页。每次地址翻译硬件需要将虚拟地址转换为物理地址时,都会读取页表。系统内核负责维护页表的内容,以及在磁盘和物理内存之间交换页。下图展示了一个页表的基本组织结构:

image-20181221180104513

页表是由一个个页表条目 (Page Table Entry, PTE) 组成的数组。虚拟地址空间中的每个页通过一定的换算规则,在页表中一个固定偏移量处都对应一个 PTE。可以假设 PTE 由一个有效位和一个 n 位的地址字段组成。有效位表明了该虚拟页当前是否被缓存在物理内存中。如果设置了有效位,那地址字段就表示物理内存中相应物理页的起始位置,这个物理页中缓存了该虚拟页。如果未设置有效位,那么一个空地址表示虚拟页还未分配。否则对于非空地址,地址字段就指向虚拟页在磁盘上的起始位置。

寻址情形

下面根据上文提到的页表,我们来分析一下寻址时涉及的几种情形。

页分配

虚拟内存页的分配过程,是在磁盘上创建空间并且更新页表中的对应 PTE 条目。下图中 VP5 的分配就是创建磁盘空间后将 PTE5 的地址指向该空间地址。注意,这里并不立即在物理内存里缓存该页,而是等分配好的页被实际访问时才会实际分配物理内存。

image-20181222195635912

页命中

当 CPU 需要访问一个虚拟地址时,地址翻译硬件会将根据虚拟地址计算出一个索引,根据索引定位到页表上的某个 PTE。页命中就是指该 PTE 的有效位为1,表示要访问的内容已经缓存在物理内存中了,因此直接命中。根据 PTE 中地址字段中保存的物理内存的位置,直接去物理内存中找到内容。

例如下图中,根据虚拟地址定位出 PTE 在 PTE2 的位置,其地址字段保存的内容指向了物理内存的 PP1,直接命中。

image-20181222200529760

缺页

缺页 (Page Fault) 简单说就是未命中。当 CPU 需要访问一个虚拟地址时,地址翻译硬件会将根据虚拟地址计算出一个索引,根据索引定位到页表上的某个 PTE。缺页就是指该 PTE 的有效位为0,表示未缓存在物理内存中,此时 PTE 中地址字段中保存的是虚拟页对应的磁盘位置。缺页时会触发一个缺页异常,缺页异常会调用内核中的缺页异常处理程序,该程序会在物理内存中选择一个暂时不用的页,准备将其交换出去。如果该页已经被修改多,则会被复制到磁盘。该选中的页会被待访问的磁盘页覆盖,从而实现了换入换出。

下图描述了缺页的大致处理流程图。页表由内核维护在内存中。

image-20181223135120553

下图的例子中,由于 PTE3 对应的是磁盘中 VP3 地址,而此时物理内存不足,我们选择将 PP3 中的 VP4 换出,而将 VP3 存入其中,并更新 PTE3,最后返回给 CPU。

image-20181222202459967

交换区

前面我们说了,一个进程的虚拟空间的页面,可能缓存在物理内存中,也可能被换出到磁盘中。这部分磁盘空间就称为交换分区。在 Linux 上安装系统时往往需要指定 swap 分区,就是交换区,Windows 上也有类似概念,不过不用手动指定。

TLB

每次 CPU 访问虚拟地址,MMU就必须去内存中的页表查询一个 PTE,以便将虚拟地址翻译为物理地址。查询一次内存的开销是几十到几百个 CPU 周期。为了消除这样的开销,在 MMU 中设计了一个关于 PTE 的小的缓存,称为翻译后备缓冲器 (Translation Lookaside Buffer, TLB)。TLB 中的每一行都保存着一个由单个 PTE 组成的块。

CPU寻址流程

CPU 寻址的大致的流程如下:

  • CPU 拿到虚拟地址后,将其传给 MMU
  • MMU 查询自己维护的 TLB 缓存,看能否直接找到对应的 PTE。如果找到,则直接获取到物理地址并将其发送到内存,内存将数据返回给 CPU 从而完成寻址流程
  • TLB 未命中时,则会去查询内存中页表,找到 PTE。(注意,在查询页表前,也可能会去查询内存的高速缓存)。一般都会使用具有层次结构的多级页表,以节省页表空间大小。
  • 根据 PTE 中的有效位字段,如果为 1,则页命中,根据 PTE 地址字段中的物理地址信息构造出物理地址,然后将其发送到内存,内存将数据返回给 CPU 完成寻址流程
  • 如果 PTE 中表明未缓存,即缺页,则触发缺页异常,磁盘页和物理内存页进行交换。完成后更新 PTE 中的内容,并继续访问内存,获取数据之后完成 CPU 寻址

image-20181223153246985

TLB 查询

下图是 TLB 查询的过程。

image-20181223150834055

上图可见,查询 TLB 是通过虚拟地址的高位部分:虚拟页 VPN 来进行的。查询时,VPN 被分解为 TLB 标记和 TBL 索引。

image-20181223151736508

地址翻译

CPU 寻址时,在 MMU 中的 TLB 未命中的情况下,需要根据页表找到物理地址,这个过程称为地址翻译。一个虚拟地址,分为虚拟页号 (VPN) 和 虚拟页偏移量 (VPO) 。类似的,物理地址也可以分为物理页号 (PPN) 和 物理页偏移量 (PPO)。由于虚拟页和物理页大小一般是一致的 (Linux 中是 4K),因此在做地址翻译时,可以保持偏移量不变,只翻译页号。VPN 作为页表索引找到页表中的 PTE,如果命中,PTE 中的地址字段就是物理页号 PPN。最终的物理地址就是由 PPN 和偏移量组合而成。

下面的图很形象的表示了这个过程:

image-20181222205519279

对于多级页表,VPN 实际是被拆分成多个,以对应多级页表项。情况如下:

image-20181223150037292

下面是 Intel Core i7 CPU 的寻址示意图:

259AC393-20C2-41C0-8A27-0E2910A3503E

多进程内存管理

虚拟内存的设计,给了内存管理极大的灵活性和便利性。由于每个进程都有自己独立的虚拟地址和页表映射,这保证了各进程之间内存的独立,也有利于多进程之间的内存共享和保护。

内存隔离与共享

每个进程都有自己独立的虚拟内存,和自己的页表(由系统内核代码维护),也就是每个进程都有自己的虚拟内存空间,这保证了各进程之间的寻址互不影响。由系统维护的寻址过程能杜绝各进程之间内存的非法访问。如果进程之间想共享内存,也是可以利用进程共享机制来实现(例如 Linux 的 mmap 系统调用)。下图中进程 i 和进程 j 就共享了同一物理页面 PP7,而虚拟页面不同:进程 i 的VP2 和进程 j 的 VP1。

image-20181221162312505

独立的地址空间允许每个进程的内存映像使用相同的基本格式,而不用管代码和数据实际存放在物理内存的何处。

  • 相同的内存排布。 对于 64 位地址空间的 Linux 进程,虚拟地址总是从 0x400000 开始,分成多个段来存放不同部分,且增长方向一样。这样的一致性,极大简化了链接器的设计和实现,允许链接器生成完全链接的可执行文件,这些可执行文件独立于物理内存中代码和数据的最终位置。

  • 程序的加载。对于程序的加载,使用虚拟内存也很方便。可执行文件中的 .text 代码段和 .data 数据段,在被 load 进进程内存时,会被分配虚拟内存页,然后在页表中把他们标记为 已分配未缓存 状态,页表中的地址则指向可执行文件中的磁盘位置。只有当程序实际运行时,对应的虚拟地址被访问,才触发缺页,执行换入换出,将实际内容页面从磁盘加载到内存中并引用。

  • 物理内存共享。进程之间经常要共享代码和数据,这能显著节约内存使用。例如,进程需要调用相同的操作系统内核代码,调用相同的库函数 (例如 C 库函数 printf ),我们不用每个进程都维护一份副本,共享就可以了。
  • 简化内存分配。如果进程需要分配一段连续的内存,可能包含了 N 个虚拟页,由于虚拟内存的映射是各个页分别映射的,所以分配出来的物理内存就可以不是连续的,因此分配操作起来更灵活,这可以显著提高物理内存利用率和分配效率。

内存保护

利用页表中的控制选项,可以控制对特定内存的保护,杜绝非法访问。哪些访问属于非法访问呢?例如程序不能修改只读的代码指令,否则问题就大了;此外,用户进程也不能直接读或者修改内核段的代码和数据,用户空间对内核空间的操作唯一的入口应该是系统调用。

image-20181221162414432

Linux的虚拟内存

进程内存排布

基本所有使用虚拟内存的系统,会为每个进程维护了一个单独的虚拟地址空间。整个空间大致分为了内核地址空间和用户地址空间。

下图是 Linux 和 Windows 平台的空间划分示意:

image-20181226212413439

简单起见,我们主要介绍 Linux的虚拟内存。

内核虚拟内存包含内核中的代码和数据结构。内核虚拟内存中的某些区域被映射到所有共享进程的物理页面。内核虚拟内存的其他区域包含每个进程都不相同的数据。比如说,页表、内核在进程的上下文中执行代码时使用的栈,以及记录虚拟地址空间当前组织的各种数据结构。

用户地址空间被组织成一些区域 (也叫做段) 的集合。例如,代码段、数据段、堆、共享库段以及用户栈。

image-20181221163637036

下图记录了一个进程中虚拟内存区域的内核数据结构。内核为系统中的每个进程维护一个单独的结构 (task_struct) 。

image-20181223160605710

一个段的结构就如下:

  • vm_start: 指向这个段的起始处

  • vm_end: 指向这个段的结束处

  • vm_prot: 描述这个段内包含的所有页的读写许可权限

  • vm_flags: 描述这个段内的页面是与其他进程共享的,还是这个进程私有的

  • vm_next: 指向链表中下一个段结构

Linux 缺页异常处理

前面我们讲过,页表中发生缺页时,会触发缺页异常,调用系统内核的异常调度程序来处理。那 Linux 的缺页是如何调度的呢?

1)首先,检查虚拟地址是否合法?是落入某个段的区域中吗?为了回答这个问题,缺页处理程序依次搜索段结构的链表,检查其 vm_start 和 vm_end。如果地址非法,则触发段错误,并终止程序。

2)其次,试图进行的内存访问是否合法?即进程是否有读、写或者执行这个段内页面的权限?例如,试图对代码段中的只读页面进行写操作?或者是用户进程的代码试图访问内核空间的代码或数据?如果是,则触发异常,并终止程序。

3)如果是正常缺页,则选择一个牺牲页,如果这个牺牲页面被修改过,那么就将它交换出去,换入新的页面并更新页表。

image-20181223162633760

虚拟内存使用

内存映射

内核将虚拟内存和一个磁盘上的对象关联起来,以初始化这个虚拟内存的内容,这个过程称为内存映射 (memory mapping)。文件的内容直接映射到内存中,操作内存相当于直接直接操作文件。映射时文件被分成页大小的片,每一片包含一个虚拟页的初始内容。由于按需页面调度的策略,这些虚拟页面没有实际交换进入物理内存,直到CPU开始访问这些虚拟地址时才触发缺页,从而将页面内容从磁盘文件中换入物理内存。

被映射的文件可以是

  • 磁盘普通文件。程序可执行文件就是一个典型例子。程序开始执行时,系统会将可执行文件的 .text 和 .data 部分直接映射到进程虚拟内存的 Text 段和 Data 段;程序链接的动态链接库,如常用的标准 C 库 libc.so 在加载时是映射到进程的虚拟内存内,由于动态链接库没有必要每个进程都维护一份私有的,所以会将其映射到共享区域内;

  • 匿名文件。匿名文件是由内核创建,全部设置为二进制零,然后映射到虚拟内存中。进程的 BSS 段、堆、栈段就是映射到匿名文件的;

下面是上述各种映射的示意图:

image-20181227171515705

除了以上的系统的自动映射,用户程序还可以调用系统 API 来进行手动的内存映射。例如,Linux 上的 mmap() 系统调用,Windows 上的API CreateFileMapping()/MapViewOfFile() 都可以操作映射文件。类似的,可以指定映射的是普通文件还是匿名文件。如果是普通文件,就相当于通过内存来读写文件,效率很高;如果是匿名文件,相当于申请一块全部清零的内存空间。C 语言的库函数 malloc 在申请内存的时候通常会在堆区分配,当申请的内存较大 (缺省的大于128K) 时,会转而使用 mmap 在内存映射区分配内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//Linux
void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);

//Windows
HANDLE CreateFileMappingA(
HANDLE hFile,
LPSECURITY_ATTRIBUTES lpFileMappingAttributes,
DWORD flProtect,
DWORD dwMaximumSizeHigh,
DWORD dwMaximumSizeLow,
LPCSTR lpName
);
HANDLE OpenFileMappingA(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCSTR lpName
);

内存映射根据文件可以分为匿名和非匿名的;而根据映射方式,又可以分为私有映射和共享映射。上面系统自动映射时,我们也说了,动态链接库是通过共享方式映射的,其他都是各进程的私有映射。Linux 上使用 mmap 时,可以设置 flags 参数为 MAP_SHARED 或 MAP_PRIVATE 来指定映射是私有的还是共享的;在 Windows 上 CreateFileMappingA 可以创建映射,而 OpenFileMappingA 则根据映射的名字来实现共享。

如果一个进程将一个共享对象映射到它的虚拟地址空间的一个区域内,那么这个进程对这个区域的任何写操作,对于那些也把这个共享对象映射到它们虚拟内存的其他进程而言,也是可见的。相对的,对一个映射到私有对象的区域的任何写操作,对于其他进程来说是不可见的。一个映射到共享对象的虚拟内存区域叫做共享区域,类似地,也有私有区域。

下图显示了进程间共享内存映射一个文件的情况

image-20181228104507208

如果对于一个文件,有多个进程映射,且是私有的,在 Linux 上通常会使用一种称为写时复制 (Copy-On-Write, COW) 的技术来实现(这是一种未写时共享,有写时才分开的懒拷贝模式)。为了节约内存,私有对象开始的生命周期与共享对象基本上是一致的(在物理内存中只保存私有对象的一份副本),并使用写时复制的技术来应对多个进程的写冲突。如下图所示

image-20181229100438749

还有一个典型的例子就是fork()函数,该函数用于创建子进程。当fork()函数被当前进程调用时,内核会为新进程创建各种必要的数据结构,并分配给它一个唯一的PID。为了给新进程创建虚拟内存,它复制了当前进程的mm_struct、vm_area_struct和页表的原样副本。并将两个进程的每个页面都标为只读,两个进程中的每个区域都标记为私有区域(写时复制)。这样,父进程和子进程的虚拟内存空间完全一致,只有当这两个进程中的任一个进行写操作时,再使用写时复制来保证每个进程的虚拟地址空间私有的抽象概念。

此外,Linux 平台的 glibc 库在实现线程时,会使用 mmap (设置参数 flags 为 MAP_STACK) 匿名映射来创建线程栈,从而实现线程栈的独立。

内存分配类型

从程序角度看,我们的内存分配分为以下几种

  • 静态分配。是指程序中的全局变量和静态变量。这种内存大小往往在编译期就可以确定,大小是固定的。内存只分配一次,就是在程序开始时,在程序结束时由系统回收。
  • 自动变量分配。这是指由栈内存管理的变量,程序运行时自动分配和释放。
  • 动态内存分配。指程序运行时动态分配的内存,主要在堆上。一些程序如 C/C++ 中由程序员用 malloc 或者 new 显示的分配;在 Go 中,编译器通过逃逸分析,也可能在堆上维护内存变量。

下面几小结分别讲解这些内存管理的机制。

栈内存管理

程序开始执行,完成内存映射后,栈内存和堆内存就会使用各自独有的机制来实行管理了。我们先看栈内存的管理。

栈内存如下图 (Linux 平台下) 所示,处于用户空间的一端,向下增长

image-20181228150319032

栈空间主要用来存储程序运行过程中函数调用的信息。这些信息数据在分配和回收时,遵循 last-in-first-out(LIFO) 的后进先出方式。

当一个函数被调用时,它的状态数据就被添加到栈顶;而当函数退出时,这些状态数据就又从栈上被移除。需要存入栈空间的状态数据包括:函数参数、对象实例指针(如 C++ 中的 this 指针)、返回地址、函数内局部变量。每一次的函数调用,状态数据形成的结构叫做栈帧 (stack frame),多个栈帧通过 LIFO 的方式入栈出栈,就构成了栈空间管理的基本机制。

这个基于栈的内存分配和回收操作通常很快,往往只需要移动栈顶指针即可,效率很高。在大部分高级语言中,栈内存的分配和回收都是透明的、自动完成的,不需要程序员参与,很多处理器的指令集提供了栈操作的特殊指令。

在函数调用中,函数参数除了使用栈内存空间传输,有时候也会通过 CPU 寄存器存储。通常是在参数较少的时候。

例如,DrawSquare 调用了 DrawLine , 则其可能的栈结构 (该例子是栈顶在上,向上增长) 如下:

image-20181226111249595

注意栈顶可能在上,也可能在下,要看栈的增长方向 。从前面Linux进程段排布的图我们看到,Linux上栈空间是从高地址往低地址增长的,其他系统未必如此。但不管如何,当前正在执行的函数是位于栈顶的位置。

如果函数调用的层次很多,即使用的栈帧很多,栈空间就会很快被消耗。栈空间大小一般是在程序执行时候就分配好,当分配的栈空间被消耗完毕后,则经典的 stack overflow 错误将出现,往往会导致程序崩溃。常见的错误就是函数的无限递归或者分配了很大的局部变量,导致栈溢出。这里提一下在 Linux 中栈大小初始值由操作系统设定(ulimit),且有动态增长的机制,只要其大小不大于 RLIMIT_STACK,则空间不足时会自动增长,不会出现栈溢出的错误。但如果栈已经到达最大值 RLIMIT_STACK,则栈溢出就会出现。而 Windows的栈大小通常是固定的,没有动态增长的机制。其大小可以由编译器 (如 Visual Studio) 设置,并被记录在可执行文件中。

栈的结构,可以用来调试程序。如 GDB 调试的时候,就可以看到程序执行的完整栈结构,打印栈帧数据等;另外,对于很多程序性能 profile 工具,往往通过定时采样程序执行的栈帧结构,从而统计程序执行的各种信息,定位哪些函数是性能热点。

最后,

  • 程序中的每个 task 对应一个栈
  • 一个进程本身只有一个栈 (进程是一个task),称为进程栈
  • 对于在进程中开启的线程,其栈空间和进程栈是分开的,每个线程 (也是一个task) 在创建时,线程栈通过操作系统来分配
  • Golang 的每个协程拥有自己的栈

堆内存管理

堆内存,也称为动态内存。程序需要用到动态内存分配的原因,是经常直到程序实际执行时,才知道某些数据结构的大小,不能提前分配内存。堆区域紧接在未初始化的数据段后开始,并向特定方向增长 (下图还是 Linux 系统上的情况,不代表所有)。对于每个进程,内核维护着一个变量 break,它指向堆的顶部。

image-20181228151912039

那如何管理这部分的堆内存呢?很多语言维护了 动态内存分配器 来进行管理。动态分配器将堆视为一组不同大小的块 (block) 的集合来维护。每个块就是一个连续的虚拟内存片 (chunk),要么是已分配的,要么是空闲的。最开始的时候,没有空闲块,我们的内存申请是要向内核直接申请内存 (在 Linux 上是移动 brk 指针,Windows 上 调用 HeapAlloc )。释放的时候,没有必要立即将内存归还给系统,而是作为空闲块维护起来,下次分配的时候就可以优先利用被释放的空闲块了。如下图:

image-20181229102552027

维护这些分配块和空闲块的算法需要精心设计,以满足分配效率及内存利用率的最大化。在很多编程语言中,都提供了动态分配内存的方法,例如 C/C++ 中 malloc 库函数就是在堆中分配内存;Go 中 new 一个对象也可能由于逃逸分析,导致变量被分配在堆区中。

常用的分配器有两种风格。这两种风格都需要应用显示的分配内存块。它们不同的地方在于由谁来负责释放已分配的块。

  • 显式分配器,要求应用自身显式的释放任何已分配的块。例如,C 标准库提供 malloc 来分配内存块,并通过free 来显式释放该内存块。C++ 中的 new 和 delete 也是类似。C++ 类的析构函数本质上提供了内存释放的机会 (delete 操作符触发)。智能指针也是一种自动内存管理的机制。
  • 隐式分配器,要求分配器检测不被程序继续使用的内存块,并释放这些它们。这个过程常称为 垃圾收集(Garbage Collection, GC) ,高级语言如 Java、Go、JavaScript 等拥有自己的垃圾收集机制,程序员无需自己释放申请的内存。
显式分配器的机制

显式分配器的机制,典型的如如何实现 malloc 和 free 。我们考虑以下几点:

  • 使用堆内存

  • 分配时指定内存 size,返回内存地址

  • 显式释放时只需要指定内存地址,分配器本身需要记录该内存对应的大小
  • 分配的内存要做对齐,最大化访问效率

一种方式是使用块维护内存。各个内存块,不管是已分配还是空闲的,一起连接起来

image-20181228172930734

单个内存块的数据格式如下

image-20181228173029000

这里不详细展开显示分配的具体细节 (例如类似 malloc 的实现原理),有兴趣的可以查看 Implementing malloc and free 和 如何实现一个malloc

垃圾收集的机制
  • 追踪。这是最常用的垃圾收集机制,主要思想是从根节点进行追踪,所有未被引用的对象就被认为是垃圾,可以进行收集。它主要研究的是引用对象的可达性。如下图的引用关系

    image-20181224115827314

    从 root set 追踪不可达,则会被垃圾收集器认为是垃圾,可以回收。那具体的,对象的可达性是怎么样呢?主要分两种:

    • 根元素。这包括程序的所有全局变量、当前调用栈对象 (如所有栈内局部变量、当前调用的函数参数)

    • 引用元素。所有根元素引用链上的对象

    追踪的主要机制是如何维护对象之间的引用关系。对于 Java、Go 这些有标记内存变量类型的语言来说比较简单:有变量类型信息从而很简单的知道是否包含其他对象;而对于 C/C++ 语言,不会用类型信息来标记内存位置。因此,像 int、float 这类变量和对象的指针变量没有可区分的手段,很难说一个结构体的某个内存到底是引用其他结构体的指针还是一个 int/float 变量。如果 C/C++ 要设计垃圾收集器,必须保守的将所有 像指针 的内存视为可达的,尽管事实上它可能是不可达的。C/C++ 垃圾收集库的一个实现是 Boehm GC

    比较知名的追踪算法有 标记-清扫算法、三色标记算法等。详细的这里不展开,有兴趣的可以查看 这里

  • 引用计数

    引用计数机制,简单说就是为每个变量维护一个计数,有引用时累加计数,删除引用时则减计数。如果计数归零,则可以立即回收。但是因为引用计数有一些缺点,如

    • 环形引用。如果有两个变量互相引用,但是和其他变量没有引用关系,则是环形引用,因为引用计数不为0,得不到回收
    • 空间和时间消耗。每个变量维护一个计数,空间消耗不小。另外,对计数的增减,对程序执行效率也有影响
    • 要保证原子性。如果涉及并发,计数的修改需要保证原子性

    这些缺点让引用计数机制逊于追踪机制,在垃圾收集实现方面没有得到广泛应用。

Bss 段、Data 段 与 Text 段

Bss 段和 Data 都是用来存储程序的全局变量和静态变量的。不同的是,Bss 段是匿名映射的全零区域,用来存储代码中没有初始化的全局变量和静态变量 (包括文件内静态变量,函数内静态局部变量,类静态成员变量),如 static int cntActiveUsers;Data 段是通过可执行文件的 .data 映射来的,里面存储的是已经初始化 (且非0) 的 全局变量或静态变量的值,例如 static int cntWorkerBees = 10 会将将静态变量的值 10 存储到 Data 段。需要注意,Data 段的映射是私有映射,运行时对变量的修改并不会反馈到可执行文件里,否则可执行文件都被修改了,那是不合理的。

Text 段就是代码段,是从可执行文件的 .text 部分映射过来。除了代码指令,还存储了常量字符串。这部分内存是可读可执行的,但是不能写,因此无论是修改代码指令,还是修改常量字符串,都会导致程序 crash。

static const char* gonzo = "God's own prototype"; 已初始化的静态变量 gonzo 是个指针,存储在 Data 段,其内容是一个地址 0x080484f0,指向字符串的地址,而字符串在 Text 段。

image-20181229105756277

参考资料

<深入理解计算机系统V3>

GarbageCollection WiKi)

CallStack WiKI

Anatomy of a Program in Memory

虚拟内存的那点事儿

Windows Virtual Address Space

Go Memory Management

Go研究之channel

发表于 2018-12-13 |

CSP模型

CSP,全称 Communicating sequential processes 是指并发系统中的一种模式。简单来说,CSP 模型由并发执行的实体(线程或者进程或其他)所组成,实体之间通过消息进行通信。Go语言的并发特性正是基于CSP模型发展而来,具体来说,goroutine 就是并发执行的实体,而 goroutine 之前通信就是借助了 channel 来进行消息传递。Go语言有一种哲学叫做

Do not communicate by sharing memory; instead, share memory by communicating.

意思是尽量通过通信来共享内存,而不是通过共享内存来通信。由此可见 channel 在 Go 中的重要地位。

Go的channel

通常的线程模型,一个很重要的问题就是各线程之前的同步。我们通常采用条件变量或者信号量来做同步操作,如果需要传递消息,经常要自己实现一个带锁的线程安全的消息队列。

而在Go中,为了方便 goroutine 之前的通信,原生支持了一种称为 channel 的数据结构,用来做同步和消息传递。它的常用操作如下:

创建

1
2
unBufferChan := make(chan int)  // 无缓冲
bufferChan := make(chan int, N) // 带缓冲

读写

1
2
3
4
5
// 阻塞读操作
x := <- ch

// 阻塞写操作
ch <- x

关闭

1
2
// 关闭
close(ch)

select (类似IO多路复用),只要其中一个满足,则执行后续操作

1
2
3
4
5
6
select {
case e, ok := <-ch1:
...
case e, ok := <-ch2:
...
}

以上操作都是阻塞的,同时 channel 也支持非阻塞的读写操作,类似于 IO 操作中如果没有满足条件的则返回 EAGAIN 或者 EWOULDBLOCK。这需要借助 select 的 default 分支来实现

1
2
3
4
5
select {
case e := <-ch:
...
default:
}

上面的非阻塞操作,如果 ch 中没有读取到数据,也不会阻塞,而是进入default分支。类似的还有非阻塞写,这里就不具体说明了。

Channel的实现原理

下面以 Go1.8 的源码为例来研究下 channel 各个操作的实现原理。

channel 结构

hchan 结构用来表示一个channel,具体信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

type waitq struct {
first *sudog
last *sudog
}

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this.

g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// waitlink is only accessed by g.

acquiretime int64
releasetime int64
ticket uint32
waitlink *sudog // g.waiting list
c *hchan // channel
}

可以看到,hchan主要包含

  • 环形队列,用来存放消息(只针对带缓冲channel)。涉及的字段有

    • qcount 当前队列中的消息数量

    • dataqsiz 队列大小

    • buf 队列具体数据buf

    • sendx、recvx 环形队列的发送和接收游标

  • 元素信息 elemtype 、elemsize。其中elemtype 的类型 _type 是 Go 中表示变量类型的基础结构,在反射和interface 的实现中很常用。因为 channel 可以持有各种类型的数据,所以需要维护元素信息。
  • lock,用来做并发互斥。
  • 等待接收或等待发送的 goroutine 队列。recvq 和 sendq 都是用双向链表实现的队列,如果有 goroutine 因为读写被阻塞,就会被调度器挂起在这两个队列上。我们之前分析 Go 调度模型GMP的时候有提到,一般 G (goroutine) 和 P (proc) 挂钩,处于等待执行或者正在执行状态。如果正在执行的 G 阻塞在 channel 上,就会脱离 P,转而挂起在 channel 的recvq 或者 sendq 上,完成 channel 操作后再选择合适的 P 来继续执行。
  • closed。用来表示channel是否关闭。关闭 channel 可用用来实现广播的效果,后面会讲到。
  • sudog 代表一个goroutine

创建channel

创建channel就是如何新建一个 hchan 结构的过程,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func makechan(t *chantype, size int64) *hchan {
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}

var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
  • 元素大小不能超过64k(1<<16)

  • channel的元素个数不能为负数且不能超过一定数量

  • 如果channel里存的是非指针的具体对象,则channel和具体存放的元素buf会一起分配,GC不会扫描这部分buf数据,因为buf相当于是channel的一部分。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    if elem.kind&kindNoPointers != 0 || size == 0 {
    // Allocate memory in one call.
    // Hchan does not contain pointers interesting for GC in this case:
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
    if size > 0 && elem.size != 0 {
    c.buf = add(unsafe.Pointer(c), hchanSize)
    } else {
    // race detector uses this location for synchronization
    // Also prevents us from pointing beyond the allocation (see issue 9401).
    c.buf = unsafe.Pointer(c)
    }
    }
  • 否则 buf 分开分配内存,可能会被GC 回收到。

    1
    2
    3
    4
    } else {
    c = new(hchan)
    c.buf = newarray(elem, int(size))
    }

创建完成后的结构,利用 gdb 调试时看到 hchan 的数据

1
2
3
4
5
6
7
8
(gdb) p c
$1 = (struct runtime.hchan *) 0xc420072000
(gdb) p *c
$2 = {qcount = 0, dataqsiz = 10, buf = 0xc420072060, elemsize = 8, closed = 0, elemtype = 0x1097040 <type.*+56064>, sendx = 0, recvx = 0, recvq = {first = 0x0, last = 0x0},
sendq = {first = 0x0, last = 0x0}, lock = {key = 0}}
(gdb) p *c.elemtype
$3 = {size = 8, ptrdata = 0, hash = 4149441018, tflag = 7 '\a', align = 8 '\b', fieldalign = 8 '\b', kind = 130 '\202', alg = 0x110bbf0 <runtime.algarray+80>,
gcdata = 0x10bca3b <runtime.gcbits.*> "\001\002\003\004\005\006\a\b\n\f\r\016\017\020\022\025\026\030\031\032\033\036\037,568<ABUXr~\236\325\330\365\377\001\002\037\003%\004I\022U\001U\005U\025UUu\002y\001\224\a\230\a\230\177\330\003\340?\376\005\376!\377\377\001\016\034", str = 955, ptrToThis = 37088}

可以看到 c 的内存地址和 buf 的内存地址是连续的。

如果创建的是指针类型 channel, c 的地址就和 buf 地址是分开的

1
2
3
4
5
(gdb) p c
$1 = (struct runtime.hchan *) 0xc420068060
(gdb) p *c
$2 = {qcount = 0, dataqsiz = 10, buf = 0xc420016140, elemsize = 8, closed = 0, elemtype = 0x1092500 <type.*+37088>, sendx = 0, recvx = 0, recvq = {first = 0x0, last = 0x0},
sendq = {first = 0x0, last = 0x0}, lock = {key = 0}}

写channel

写对应的操作是 ch <- x, 编译器编译后实际调用了以下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if raceenabled {
raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend))
}
if msanenabled {
msanread(ep, t.elem.size)
}

if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}

if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
nil channel
1
2
3
4
5
6
7
if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

对于nil channel, 如果是非阻塞写,则会立即返回;如果是阻塞写,则当前 goroutine 会通过 gopark调用进入等待状态。

closed channel

写已经closed的channel会panic

1
2
3
4
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
recvq

如果 recvq 不为空,则表明有 goroutine 被挂起等待数据,此时即使是带缓冲的channel,也不会将数据存储到buf里,而是直接发给队列头的 goroutine。

1
2
3
4
5
6
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}

数据会从发送 goroutine 的栈上(ep)直接被拷贝到挂起的 sudog 的 数据域 elem 上,然后通过 goready 唤醒这个sudog

1
2
3
4
5
6
7
8
9
10
11
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)
buf not full

如果没有等待接收者,且缓冲没满,则将数据拷贝到缓冲区中,这里就是简单的环形缓冲的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

通过 chanbuf 定位到具体数据 qp,然后将 ep 指向的数据拷贝到其中,同时更新游标 sendx 和 元素数量 qcount

buf full

如果 buf 满了,对于非阻塞发送,就立即返回

1
2
3
4
if !block {
unlock(&c.lock)
return false
}

对于阻塞发送,当前发送 goroutine 就被阻塞住挂起在等待队列里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

待发送的数据 ep 被放置在 sudog 的 elem 上,sudog 本身入队到 sendq 中,并挂起。

读channel

读对应的操作是 x = <- ch, 编译器编译后实际调用了以下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}

if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

ep 代表变量 x 指向的内存区,用来存储接收到的数据,如果 ep 为 nil,则读到的数据会被忽略

nil channel

对于未初始化的 channel,如果非阻塞读,则直接返回;如果是阻塞读,会通过 gopark 将当前 goroutine 转为等待状态。

1
2
3
4
5
6
7
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
closed channel
1
2
3
4
5
6
7
8
9
10
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

对于已经关闭的 channel,如果 buf 中没有数据了,则返回元素类型的零值 (通过 typedmemclr 来处理)

如果 buf 中还有数据,还是会继续走后续流程读取数据 (从挂起的 sender 中或者 buf 中)。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ch := make(chan int, 5)

ch <- 1
ch <- 2
ch <- 3
ch <- 4
ch <- 5
close(ch)

//关闭后,只要还有数据,会读到数据;也会返回标识告知channel是否被关闭
data, isClosed := <-ch
log.Printf("data:%d isClosed:%v", data, isClosed)
log.Printf("after closed data:%d %d", <-ch, <-ch)
for data := range ch {
log.Printf("range data:%d", data)
}

//数据读完后,继续读取会返回零值
data = <- ch
log.Printf("continue read: data %d", data)

输出以下内容

1
2
3
4
5
data:1 isClosed:true
after closed data:2 3
range data:4
range data:5
continue read: data 0
sendq

如果 sendq 有被挂起等待发送数据的 goroutine,则获取队头的 goroutine,调用 recv 进行处理

1
2
3
4
5
6
7
8
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}

recv 做的具体工作我们继续看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)

如果 channel 是非缓冲的 (dataqsiz == 0),则接收者直接从发送者手里接收数据

1
2
3
4
5
6
7
8
9
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
} // else 忽略数据
}

如果 channel 是带缓冲的 (dataqsiz > 0),则表明此时 buf 已经满了,此时才会有等待发送的 goroutine 被挂起。需要做的事情就是从环形队列中读取一个元素,这时会空出一个元素的位置,之前挂起等待的队头 sudog 的数据就被写入环形队列。如果还有其他阻塞等待写的 sugog,继续挂起等待后续的读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}

处理完成之后,之前 sendq 中挂起的发送 goroutine 就被 goready 调用唤醒

1
2
3
4
5
6
7
8
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)
buf not empty

如果 buf 有数据,则直接将数据读出到 ep 内存处,然后将其置为零值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
//数据从qp读取到ep中
typedmemmove(c.elemtype, ep, qp)
}
//置为零值
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
buf empty

如果 buf 中没有数据,对于非阻塞读取,立即返回

1
2
3
4
if !block {
unlock(&c.lock)
return false, false
}

对于阻塞读,则当前读 goroutine 被挂起到 recvq 中等待后续数据写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

select channel

select 用于多个channel监听并收发消息,当任何一个case满足条件则会执行,若没有可执行的case,就会执行default,如果没有default,程序就会阻塞。select 的作用很类似于 IO多路复用。

多 channel select

前面提到,典型的 select 用法是用在多个 channel 上,例如

1
2
3
4
5
6
7
8
9
10
ch := make(chan int, 5)
chs := make(chan string, 5)
select {
case msg := <- ch:
fmt.Println("received msg: ", msg)
case msgs := <- chs:
fmt.Println("receied msgs: ", msgs)
default:
fmt.Println("no message received")
}

select 的结构为 hselect ,其定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Select statement header.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
type hselect struct {
tcase uint16 // total count of scase[]
ncase uint16 // currently filled scase[]
pollorder *uint16 // case poll order
lockorder *uint16 // channel lock order
scase [1]scase // one per case (in order of appearance)
}

// Select case descriptor.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
type scase struct {
elem unsafe.Pointer // data element
c *hchan // chan
pc uintptr // return pc
kind uint16
so uint16 // vararg of selected bool
receivedp *bool // pointer to received bool (recv2)
releasetime int64
}

对其操作的具体源码实现是

1
selectgoImpl(sel *hselect) (uintptr, uint16)

select 的多个选项会被包装成多个scase 结构,然后依据 lockorder 来处获得所有锁

1
2
// lock all the channels involved in the select
sellock(scases, lockorder)

sellock 是实现为

1
2
3
4
5
6
7
8
9
10
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}

锁会有去重判断,方式多个case 操作一个 channel 导致重复上锁问题。

然后依次查看所有的case是否有对应的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
for i := 0; i < int(sel.ncase); i++ {
//按照pollorder顺序来遍历
cas = &scases[pollorder[i]]
c = cas.c

switch cas.kind {
case caseRecv:
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}

case caseSend:
if raceenabled {
racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
}
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}

case caseDefault:
dfl = cas
}
}

注意这里遍历的时候并不是按照代码里 case 的顺序,而是按照 pollorder 来的,这个 pollorder 是随机出来的顺序,因此如果有多个满足条件的 case,则最终选中的 case 是哪一个是随机的。这样能避免一直选中写在前面的 case 而导致其他 case 饿死 的情况。pollorder 通过以下代码随机洗牌而来

1
2
3
4
5
6
7
8
// generate permuted order
pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
for i := 1; i < int(sel.ncase); i++ {
j := int(fastrand()) % (i + 1)
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}

如果过程中遍历到的 case 有一个是非阻塞的操作,则 select 会立即返回,不会再去检查后续的 case 是否 ready;如果每个 case 对其 channel 的操作都是阻塞的且没有 default 分支,则 select 会一直阻塞,而且会挂起在涉及的所有的 channel 的 recvq 或者 sendq 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// pass 2 - enqueue on all chans
gp = getg()
done = 0
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
cas = &scases[casei]
c = cas.c
sg := acquireSudog()
sg.g = gp
// Note: selectdone is adjusted for stack copies in stack1.go:adjustsudogs
sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done)))
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink

switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)

case caseSend:
c.sendq.enqueue(sg)
}
}

// wait for someone to wake us up
gp.param = nil
gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 2)
单 channel 非阻塞select

单 channel 的非阻塞 select 和前面多 channel 的select的实现机制不太一样,相对更简单,编译器会将其编译成 chanrecv 或者 chansend 调用

编译器会将以下非阻塞读取

1
2
3
4
5
6
select {
case c <- v:
... foo
default:
... bar
}

编译成

1
2
3
4
5
if selectnbsend(c, v) {
... foo
} else {
... bar
}

其中 非阻塞写的实现还是调用前面提到的 chansend

1
2
3
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}

编译器会将以下非阻塞写

1
2
3
4
5
6
select {
case v = <-c:
... foo
default:
... bar
}

编译成

1
2
3
4
5
if selectnbrecv(&v, c) {
... foo
} else {
... bar
}

其中 非阻塞读的实现还是调用前面提到的 chanrecv

1
2
3
4
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(t, c, elem, false)
return
}

close channel

close channel 的操作对应源码中的 closechan 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc(unsafe.Pointer(&c))
racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
racerelease(unsafe.Pointer(c))
}

c.closed = 1

var glist *g

// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
}
}
nil channel

如果关闭一个未初始化的 channel,会 panic

1
2
3
if c == nil {
panic(plainError("close of nil channel"))
}
closed channel
1
2
3
4
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

关闭一个已经关闭的 channel 也会导致 panic。所以一般使用时,是由唯一的一个生产者来关闭 channel。

正常关闭

正常关闭 channel 时,首先设置 closed 标识,然后将循环遍历,将 recvq 和 sendq 中挂起等待的 goroutine 收集到 glist 链表中等待调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
c.closed = 1

var glist *g

// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

然后调度 glist 中的 goroutine

1
2
3
4
5
6
7
8
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 使 g 的状态切换到 Grunnable,交给调度器调度
goready(gp, 3)
}

具体的:

  • 等待读的 goroutine,会将数据区 elem 置为零值 ( typedmemclr(c.elemtype, sg.elem) ) 然后继续执行
  • 等待写的 goroutine,将会 panic

参考资料

Go Channel 源码剖析

Diving Deep Into The Golang Channels.

GUID生成

发表于 2018-11-21 |

MangoDB objectid

twitter snowflake

游戏服务器数据一致性

发表于 2018-11-19 |

游戏服务器是一种很常见的分布式系统,分布式系统最大的难题是状态同步,CAP 定理是这方面的原理。

CAP定理

image-20181119135742290

  • Consistency 一致性

  • Availability 可用性

  • Partition tolerance 分区容错性

CAP 这三个指标不可能同时做到,最多只能满足其二。这个结论就叫做 CAP 定理。

Partition tolerance 分区容错,意思是分布式的各节点(也叫区)之间通信是不可靠的。一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。

Consistency 一致性,意思是从各个节点访问同一数据,其值必须都是最新的,即一致的。节点的多份数据必须通过一定的算法做同步。

举例来说,某条记录是 v0,用户向 G1 发起一个写操作,将其改为 v1。

image-20181119140749199

image-20181119140847181

此时如果有用户向 G2 发起读操作,由于 G2 的值没有发生变化,因此返回的是 v0。G1 和 G2 读操作的结果不一致,这就不满足一致性了。

image-20181119141002062

为了让 G2 也能变为 v1,就要在 G1 写操作的时候,让 G1 向 G2 发送一条消息,要求 G2 也改成 v1。

image-20181119141107638

注意这里说的其实是强一致性。相对的还有弱一致性,即最终一致性。

Availability 可用性, 意思是通过任意节点都可以拿到数据,强调服务的可用性。一般可以用 服务可用时间/总服务时间 去度量。比如4个9的可用性,即99.99%,换算到一年的时间内,可以反推出最长故障时间:

365 × 86400 ×(1-99.99%)秒 = 3153.6 秒 = 52.56 分钟

保证系统各个环节无单点、大部分故障可自愈是保障高可用性的关键。

以下为一些案例:

  1. CDN 缓存。最大可用性 + 最终一致性。各CDN节点的数据可以在运行一段时间后才趋于一致。
  2. zookeeper。强一致性 + 不错的可用性。paxos/raft 一致性算法保证一致性。当出现分区(P故障)的时候,并非是完全不可用的,它提供了在大多数节点连通的情况下的可用性保证。
  3. 两阶段提交协议:强一致性 + 糟糕的可用性。两阶段提交协议中任意一节点与协调节点之间发生了分区,则服务完全终止。
  4. Git。优先保证可用性,一般就是提交在本地。远程合并时,有一个专门的合并算法处理一致性问题,遇到无法处理的冲突(小部分情况下),把选择权交给了用户。

游戏中的数据一致性

游戏是一种逻辑极其复杂,数据结构繁杂的系统,需要处理的问题千差万别;不同的游戏类型,其要求也是不同的,不能简单的套用理论上的CAP定理,去过分强调系统的一致性或者可用性。对于游戏中产生的数据,我们可以分为以下几大类:

  • 玩家的存档数据。这部分数据可以说是游戏中最重要的数据了,通常我们的考虑是强一致性。
  • 全局重要数据,如工会数据、SLG大地图城池数据等。这也要求强一致性。
  • 旁路数据,如好友列表数据。因为好友信息,如名字、等级、头像等,这些数据的更新有一定的延迟容忍度,可以做成弱一致性,保证最终的数据一致性即可。

多方修改

对于玩家存档数据和全局重要数据,很可能有多方同时修改,如何保证强一致性呢?

方法一,某一玩家的数据读取或者修改统一指定到特定逻辑进程来操作。例如A在进程1,玩家B在进程2,如果A想读取或者修改B的数据,将修改请求统一发放到进程2处理,完成后返回进程1。这样处理有个问题,如果某一个进程挂掉,该进程上的玩家都无法得到服务。好处是简单,没有同时读写的问题。

方法二,所有进程对等,玩家操作不一定落在某一固定进程处理,所有进程的需要做的就是取玩家的数据,修改,然后存入数据层。一般在逻辑层做乐观锁机制,保证多进程同时修改一个玩家数据。简单的可以设置一个版本号,更新的时候去检查版本号,不一致则失败,需要做回滚。这种情况一般要求业务需求可以重试,如很多游戏里的偷菜等玩法就采用这种方式。复杂的地方在回滚怎么做。这种方式的并发性能比较高。

方法三,对于同时操作概率比较大的数据,如公会数据、大地图数据等,可以添加独占锁,严格保证一致性。这种方式性能会比较低,并发不高。可以做的改进是用读写锁。

方法二和方法三的情况,无论是操作玩家数据或者全局数据,本质上都是用的是锁(乐观锁和悲观锁),所以要注意尽量减小锁的粒度,做垂直划分:将关联度低的数据拆分成多块,分别加锁控制,减小锁冲突。

数据软拷贝

所谓数据软拷贝,意思是对核心数据的冗余备份,用以提高性能和逻辑复杂度。

举个例子,上文提到的好友列表中的某一个玩家的数据,如名字、等级等,在该玩家存档数据中一定是准确的,这个由强一致性保证。这是不是意味着拥有该玩家为好友的所有其他玩家,其好友列表中一定要实时更新?答案是否。我们可以为所有玩家维护一份冗余的简要数据放到全局服务中;各逻辑服如果有玩家存档数据更新,只需要同时通知简要数据修改,但可能更新失败,这可以容忍。下次找个时机,如玩家登录时再使用存档数据同步一下简要数据。拉取玩家好友列表,直接取简要数据里的数据即可。

另一个例子,是公会相关数据。全局的公会数据需要维护公会的基本信息,以及成员的信息;同时玩家存档数据需要记录自己属于哪个公会。这里我们就需要区分数据准确性,通常认为公会数据是准确的,玩家存档里的是软拷贝。如果有一个玩家加入公会,永远以公会数据做基本判断,加入成功后再同步玩家存档数据。如果同步出错怎么办?其实问题不大,只是其他玩家看到的软拷贝是脏数据。找个时机,如玩家登录时从公会服拉取同步一下,或者公会服控制定时 push 到游戏逻辑服。

事务数据

分布式事务经常出现在互联网产品中,如电商后台的订单系统,就会涉及一个操作修改多份数据:下单会修改商品库存、订单、支付信息等,要求事务性。

游戏中类似的系统有建角,双向好友,拍卖行,特定玩法如俘虏与被俘虏等。

一种做法是用状态机,将事务的各个操作拆分N个原子操作,然后每一步都有其对应的回滚逻辑,如果中间失败,依次回滚。

另一种做法是互联网产品中经常使用的方式,利用消息队列的方式保存消息,利用消息队列的重传特性,如果中间失败,则会重传继续重试。这里要注意,对于操作要做到幂等性,即使重试多次调用接口不会多次运行逻辑。

不使用两阶段提交是因为,有可能多个原子步骤之间有依赖关系;并且两阶段提交整个过程是阻塞的,第一阶段prepare 会锁住资源,这是不可接受的。

参考资料

CAP 定理的含义

有一千个程序员,就有一千种对CAP的解读

深度探索C++对象模型总结1

发表于 2018-11-18 |

C++ class 相对于 C struct,支持了更多特性,如添加了类的方法,构造析构函数,继承等。一般来说 class 和 struct 效率大部分情况下是相同的,引起效率差异的地方主要在两点:

  • virutal function。class需要维护一个虚函数表,并通过虚指针指向它,中间多了一道访问转换。

  • virutal base class。虚基类,主要是为了解决菱形继承关系,防止基类对象在子类对象布局中存在多份,因而引入了指针,这中间也多了一道访问转换。

C++对象模型

非静态成员变量

非静态成员变量存放在每个object上

静态成员变量

静态成员变量存放在静态数据区,需要在类外单独定义。例如

1
2
3
4
5
class A {
public:
static int a; //声明
};
int A::a = 1; //定义
成员函数

成员函数,包括非静态和静态成员函数,存放于代码区,也是独立于每个object之外。

虚函数

所有的虚函数被放在类的虚函数表里,虚函数表由编译器生成;每个object有个指向虚函数表的指针,在运行时由构造函数、析构函数、赋值操作符来设置或者重置。

TypeInfo

类的类型信息,用来在运行时支持类型识别 (RTTI),也存放在虚函数表里,通常放在虚表的第一个slot。

总览

未涉及继承的class对象布局,总体的示意图就类似下面这样

image-20181118203839822

C++多态语义

c++多态从语法上说,需要的形式是基类指针或者引用指向子类对象,如

1
2
3
4
5
6
Derived d;
Base *bp = &d;
bp->DoSomething();
//或者
Base &bi = d;
bi.DoSomething();

如果直接将子类对象直接赋值给基类对象,则子类对象会被截断:

1
2
3
4
Base b;
Derived d;
b = d; //截断
b.DoSomething(); //调用的是基类的方法

因此,如果有一个基类的指针*bp 或者引用 bi,我们必然不能确定该指针或引用指向的是什么,可能是基类的对象,也可能是子类的对象,因此其调用的方法必须在运行时确定,这也是多态的具体表现。

但是也要注意,并不是任意一个指针或者引用就支持指针,比如下面

1
2
3
4
5
6
// no polymorphism
int *pi;
// no language supported polymorphism
void *pvi;
// ok: class x serves as a base class
x *px;

以下是多态语法形式的内存布局:

image-20181118212121033

更复杂的形式:

image-20181118212454317

深度探索C++对象模型总结2

发表于 2018-11-16 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>

class A {
public:
int x;
int y;
int z;
};

int main() {
printf("offset of z: %p\n", &A::z); //int A::*
A a;
printf("&a: %p, &a.z: %p\n", &a, &a.z);
return 0;
}

输出如下

1
2
offset of z: 0x8
&a: 0x7ffeeea7fa20, &a.z: 0x7ffeeea7fa28
12…4

philipyao

35 日志
20 标签
GitHub E-Mail
© 2020 philipyao
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
备案号:沪ICP备17048801号