golang netpoll的实现与分析


1:io模型

计算机的io模型区分为多种,目前用的最多的也就是nio、epoll、select。
结合不同场景使用不同的io模型才是正解。
具体可以查看我之前写的io模型演进。io模型演进

2:golang中网络io

golang天然适合并发,为什么?一个是轻量级的协程,二个是将复杂的io进行了抽象化,简化了流程。
比如我们简单的访问一个http服务,几行简单的代码就能实现

tr := &recordingTransport{}
client := &Client{Transport: tr}
url := "http://dummy.faketld/"
client.Get(url) // Note: doesn't hit network

那么golang对Io做了哪些优化呢?能实现如此简单的切换呢?

3:groutinue 针对io事件的调度

我们这里假设你对groutinue调度已经有一定的了解了。
我们知道,在go中,每个process绑定一个虚拟的machine,而在machine中,是具有一个g0的,g0在本地遍历自己的队列获取g或者从全局队列获取g。
grouting-schedule.png
我们也知道了,在g运行的时候,g会把执行权交给g0进行重新调度,那么在io事件中,g是怎么把事件交还给g0的呢?这时候就牵扯到我们今天的主角—-netpoll。

4:netpoll

o语言在网络轮询器中使用 I/O 多路复用模型处理 I/O 操作,但是他没有选择最常见的系统调用 select。 select 也可以提供 I/O 多路复用的能力,但是使用它有比较多的限制:

  • 监听能力有限 — 最多只能监听 1024 个文件描述符,可以通过手动修改limit来改变,但是各方面成本比较大;
  • 内存拷贝开销大 — 需要维护一个较大的数据结构存储文件描述符,该结构需要拷贝到内核中;
  • 时间复杂度 — 返回准备就绪的事件个数后,需要遍历所有的文件描述符;

golang官方统一封装一个网络事件的poll,和平台无关,为epoll/kqueue/port/AIX/Windows 提供了特定的实现。

  • src/runtime/netpoll_epoll.go
  • src/runtime/netpoll_kqueue.go
  • src/runtime/netpoll_solaris.go
  • src/runtime/netpoll_windows.go
  • src/runtime/netpoll_aix.go
  • src/runtime/netpoll_fake.go

这些模块在不同平台上实现了相同的功能,构成了一个常见的树形结构。编译器在编译 Go 语言程序时,会根据目标平台选择树中特定的分支进行编译

必须实现的方法有:

​netpollinit 初始化网络轮询器,通过 `sync.Once` 和 `netpollInited` 变量保证函数只会调用一次
​netpollopen 监听文件描述符上的边缘触发事件,创建事件并加入监听poll_runtime_pollOpen函数,这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联。
​netpoll  轮询网络并返回一组已经准备就绪的 Goroutine,传入的参数会决定它的行为:
  - 如果参数小于0,阻塞等待文件就绪
  - 如果参数等于0,非阻塞轮询
  - 如果参数大于0,阻塞定期轮询
​netpollBreak 唤醒网络轮询器,例如:计时器向前修改时间时会通过该函数中断网络轮询器
​netpollIsPollDescriptor  判断文件描述符是否被轮询器使用

netpoll中有2个重要的结构体:

//pollCache  
//pollDesc

type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool      // marks event scanning error happened
    user    uint32    // user settable cookie
    rseq    uintptr   // protects from stale read timers
    rg      uintptr   // pdReady, pdWait, G waiting for read or nil
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline
    wseq    uintptr   // protects from stale write timers
    wg      uintptr   // pdReady, pdWait, G waiting for write or nil
    wt      timer     // write deadline timer
    wd      int64     // write deadline
    self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

type pollCache struct {
    lock  mutex
    first *pollDesc
    // PollDesc objects must be type-stable,
    // because we can get ready notification from epoll/kqueue
    // after the descriptor is closed/reused.
    // Stale notifications are detected using seq variable,
    // seq is incremented when deadlines are changed or descriptor is reused.
}
  • rseqwseq — 表示文件描述符被重用或者计时器被重置;
  • rgwg — 表示二进制的信号量,可能为 pdReady、pdWait、等待文件描述符可读或者可写的 Goroutine 以及 nil;
  • rdwd — 等待文件描述符可读或者可写的截止日期;
  • rtwt — 用于等待文件描述符的计时器;
    golang关于io时间做了很多统一的封装在runtime/netpoll之下(其实调用的是internal/poll包下的),然后通过internal包下对 runtime包进行调用,internal包下也封装了一个同名的pollDesc对象,不过是一个指针(关于internal有个细节就是这个包是不能被外部调用):
    type pollDesc struct {
      runtimeCtx uintptr
    }
    其实最终都是对runtime底下的调用,只不过封装了一些易用的方法,比如read,write,做了一些抽象化的处理。
    func runtime_pollServerInit()  //初始化
    func runtime_pollOpen(fd uintptr) (uintptr, int)  //打开
    func runtime_pollClose(ctx uintptr)   //关闭
    func runtime_pollWait(ctx uintptr, mode int) int //等待
    func runtime_pollWaitCanceled(ctx uintptr, mode int) int  //等待并(失败时)退出
    func runtime_pollReset(ctx uintptr, mode int) int  //重置状态,复用
    func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) //设置读/写超时时间
    func runtime_pollUnblock(ctx uintptr)  // 解锁 
    func runtime_isPollServerDescriptor(fd uintptr) bool  
    // 这里的ctx实际上是一个io fd,不是上下文
    // mod 是 r 或者 w  ,io事件毕竟只有有这两种
    // d 意义和time.d差不多,就是关于时间的
    这些方法的具体实现都在runtime下,我们挑几个重要的看看:
//将就绪好得io事件,写入就绪的grotion对列
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}
//轮询时调用的方法,如果io就绪了返回ok,如果没就绪,返回flase
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to pdWait
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // need to recheck error states after setting gpp to pdWait
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    if waitio || netpollcheckerr(pd, mode) == 0 {
      //gopark是很重要得一个方法,本质上是让出当前协程执行权,一般是返回到g0让g0重新调度
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

//获取到当前io所在的协程,如果协程已关闭,直接返回nil
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set pdReady for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdWait {
                old = 0
            }
            return (*g)(unsafe.Pointer(old))
        }
    }
}

思考:

1、 a、b两个协程,b io阻塞,完成后,一直没有获取到调度权,会出现什么后果。
2、 a、b两个协程,b io阻塞,2s time out,但是a一直占用执行权,b一直没有获取到调度权,5s后才获得到,b对使用端已经超时,这时候是超时还是不超时

所以设置的timeout,不一定是真实的io waiting,可能是没有获取到执行权。

5:怎么触发读事件的?

因为写io是我们主动操作的,那么读是怎么进行操作的呢?这是一个被动的状态
首先我们了解一个结构体。golang中所有的网络事件和文件读写都用fd进行标识(位于internal包下)。

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex

    // System file descriptor. Immutable until Close.
    Sysfd int

    // I/O poller.
    pd pollDesc

    // Writev cache.
    iovecs *[]syscall.Iovec

    // Semaphore signaled when file is closed.
    csema uint32

    // Non-zero if this file has been set to blocking mode.
    isBlocking uint32

    // Whether this is a streaming descriptor, as opposed to a
    // packet-based descriptor like a UDP socket. Immutable.
    IsStream bool

    // Whether a zero byte read indicates EOF. This is false for a
    // message based socket connection.
    ZeroReadIsEOF bool

    // Whether this is a file rather than a network socket.
    isFile bool
}

我们看到,fd中关联的pollDesc,通过pollDesc调用了runtime包内部的实现的各种平台的io事件。

当我们进行read操作时(下面是代码截取)

for {
    n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
    if err != nil {
        n = 0
        if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil {
                continue
            }
        }
    }
    err = fd.eofError(n, err)
    return n, err
    }

会阻塞调用waiteRead方法,方法内部主要就是调用的runtime_pollWait。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    for !netpollblock(pd, int32(mode), false) {
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return pollNoError
}

这里主要是由netpollblock控制,netpollblock方法我们上面就说过,当io还未就绪的时候,直接释放当前的执行权,否则就是已经可读写的io事件,直接进行读取操作即可。

6:总结

整体流程
listenStream –> bind&listen&init –> pollDesc.Init -> poll_runtime_pollOpen –> runtime.netpollopen -> epollctl(EPOLL_CTL_ADD)
epoll_ctl_add.png
pollDesc是由pollCache进行维护的,并且不受GC监控

// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
    pd := (*pollDesc)(add(mem, i*pdSize))
    pd.link = c.first
    c.first = pd
}

golang中遇到io事件时,统一对其做了封装,首先建立系统事件(本文主要针对epoll),然后让出cpu(gopark),然后进行协程调度执行其他g。当g io事件完成时,会从epoll进行交互看是否就绪(epoll就绪列表),就绪则pop取出一个g往下执行,未就绪则调度其他g。(其实pop取就绪列表也有一定逻辑,时候延时处理之类的)

runtime/proc.go,

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()
    //......
    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }
    //......
}

另外在sysmon中,也对netpoll进行了调度。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
    lock(&sched.lock)
    sched.nmsys++
    checkdead()
    unlock(&sched.lock)
    //......
    // poll network if not polled for more than 10ms
    lastpoll := int64(atomic.Load64(&sched.lastpoll))
    if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
        atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
        list := netpoll(0) // non-blocking - returns list of goroutines
        if !list.empty() {
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before injectglist.
            // Otherwise it can lead to the following situation:
            // injectglist grabs all P's but before it starts M's to run the P's,
            // another M returns from syscall, finishes running its G,
            // observes that there is no work to do and no other running M's
            // and reports deadlock.
            incidlelocked(-1)
            injectglist(&list)
            incidlelocked(1)
        }
    }
    //......
}

7:备注

epoll

  • epoll是由系统内核单独维护的一个线程,不由go本身维护

常量

  • FD_CLOEXEC用来设置文件的close-on-exec状态标准。 这,emm 就挺难理解得。

gc

  • pollDesc是由pollCache进行维护的,并且不受GC监控(persistentalloc方法分配),所以,在正常情况关于io的操作,我们一定要进行手动关闭,对epoll中的引用对象进行清理(具体实现在poll_runtime_Semrelease)。
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
    pd := (*pollDesc)(add(mem, i*pdSize))
    pd.link = c.first
    c.first = pd
}

sysmon

  • Go 的标准库提供了一种监测应用程序的线程,并帮你 (找寻) 程序可能遇到的瓶颈. 该线程称为sysmon,即系统监视器 (system monitor).在GMP 模型中,这个 (特殊) 线程未链接任何的 P, 这意味着调度器 (scheduler) 没有将其考虑在内, 因此始终处于运行状态.

netpoll-sysmon-running-status.png
sysmon线程的作用很广, 主要涉及以下方面:

  • 由应用程序创建的计时器 (timers). sysmon线程查看应该在运行却仍在等待执行时间的计时器. 在这种情况下, Go 将查看空闲的 M 和 P 列表, 以便尽可能快地运行它们.
  • 网络轮询器和系统调用. 它将运行在网络操作中被阻塞的 goroutine.
  • 垃圾回收器(如果已经很长时间没有运行). 如果垃圾回收器已经两分钟没有运行,则 sysmon 将强制执行一轮垃圾回收 (GC).
  • 长时间运行的 goroutine 的抢占. 任何运行时间超过10 毫秒的 goroutine 都会被抢占, 将运行时间 (running time) 留给其他 goroutine.

文章作者: 小张哥
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小张哥 !
评论
 上一篇
golang从内核到epoll golang从内核到epoll
引子:在之前的文章里 golang netpoll的实现与分析 讲了一些,对于golang netpoll的实现,但是,数据是怎么通过硬件到达golang的这块不是太明确,今天就主要分析下这一块。 linux的网络的基本实现在 TCP/
下一篇 
io模型的演进 io模型的演进
学习三大要素 what、why、how,每天给自己强调一遍 1:什么是IO程序是由数据+指令构成的,运行程序的过程可以分成下面这几步:将代码加载到内存中,逐条运行内存中的代码在运行代码的过程中,可能需要对文件的读写,即将文件输入(Inp
  目录