概述
现在互联网中有不少便捷工具可以自动下载(或者说爬取)小说网站的小说、图片网站的图片或视频网站的视频,这些工具有的以命令方式提供,有的有自己的图形用户界面。下面就带领大家编写一个这样的简单工具,以起到抛砖引玉的作用。
把这个可以爬取图片的小程序命名为 finder,并把它的代码放置在示例项目的 gopcp.v2/chapter6/webcrawler/examples/finder 代码包及其子包中。它以命令的方式为使用者提供功能。大家可以从我的网盘中下载该代码包(链接:https://pan.baidu.com/s/1yzWHnK1t2jLDIcTPFMLPCA 提取码:slm5)。
命令参数
命令参数是指使用者在使用 finder 命令时可以提供的参数。在 Go语言中,这类参数称为 flag。通过 Go 标准库中的 flag 代码包,可以读取和解析这类参数。
finder 可以自动完成很多事情。但是,使用者还需要告知它一些必备的参数,包括:首次请求的 URL、目标 URL 的范围限定(广度和深度),以及爬取来的图片文件存放的目录。这些必备参数的给定就需要通过 flag 来实现。
为了让 finder 成为一个开箱即用的命令,这里为每一个命令参数都提供了默认值。请看下面的代码:
//命令参数 var ( firstURL string domains string depth uint dirPath string ), func init() { flag.StringVar(&firstURL, "first", "http://zhihu.sogou.com/zhihu?query=golang+logo", "The first URL which you want to access.") flag.StringVar(&domains, "domains", "zhihu.com", "The primary domains which you accepted. "+ "Please using comma-separated multiple domains.") flag.UintVar(&depth, "depth", 3, "The depth for crawling.") flag.StringVar(&dirPath, "dir", "./pictures", "The path which you want to save the image files.") } func Usage() { fmt.Fprintf(os.Stderr, "Usage of %s:/n", os.Args[0]) fmt.Fprintf(os.Stderr, "/tfinder [flags] /n") fmt.Fprintf(os.Stderr, "Flags:/n") flag.PrintDefaults() }
这些代码以及实现主流程的代码都包含在 finder 的主文件 finder.go 中。
flag 包为了让我们方便地定义命令参数,提供了很多函数,诸如上面的代码调用的函数 flag.StringVar 和 flag.UintVar。针对不同基本类型的参数值,flag 几乎都有一个函数与之相对应。以对 flag.StringVar 函数的第一次调用为例,调用参数的值依次是存放首次请求 URL 的变量 firstURL 的指针、命令参数的名称 first、first 的默认值以及 first 的文字说明。
这里声明一下,命令参数 first 的默认值并不要针对某些网站,而只是我发现的一个比较容易访问的 URL。该默认 URL 所属的网站不需要登录,而且在其链接的页面的源码中也比较容易找到图片。也许它并不是你所知道的最适合的首次请求 URL,但这仅仅是个默认值而已。
我们需要在 main 函数的开始处调用 flag.Parse 函数。只有这样,才能让 firstURL 等变量与 finder 使用者给定的命令参数值绑定。另外,为了让 finder 命令更加友好,需要把上面的 Usage 函数赋给 flag.Usage 变量,以便使用者在敲入 finder –help 和回车之后能看到命令使用提示信息。这个提示信息会包含上面那个 init 函数中声明的内容。
初始化调度器
我们通过操控调度器来使用网络爬虫框架提供的各种功能。在初始化调度器之前,我们需要先准备好 3 类参数:请求相关参数、数据相关参数和组件相关参数。其中,请求相关参数可以由命令参数直接给定,数据相关参数也可以由我们评估后给出。至于组件相关参数,我们会利用网络爬虫框架提供的各类组件的默认实现去创建。
很显然,分析器处理数据的速度肯定是最快的。其次是条目处理管道和下载器。分析器只会利用 CPU 和内存资源做一些计算,条目处理管道会把图片文件存储到计算机的文件系统中,而下载器则需要通过网络访问和下载内容。
虽然网络 I/O 的速度可能会超过磁盘 I/O 的速度,但是条目处理管道需要处理的数据总是最少的,所以我们可以把条目缓冲池的容量设置得更小。
为了简单直观,我们暂时认为,分析器处理数据的总耗时是条目处理管道的 10%,同时是下载器的 1%。注意,这只是一个粗略估计的结果。可以在使用 finder 的时候根据实际情况进行调整。至此,我们可以像下面这样组装一个 DataArgs 类型的值,其中的标识符 sched 代表代码包 gopcp.v2/chapter6/webcrawler/scheduler:
dataArgs := sched.DataArgs{ ReqBufferCap: 50, ReqMaxBufferNumber: 1000, RespBufferCap: 50, RespMaxBufferNumber: 10, ItemBufferCap: 50, ItemMaxBufferNumber: 100, ErrorBufferCap: 50, ErrorMaxBufferNumber: 1, }
或许可以依据这些调度器参数为 finder 添加一些命令参数,以便让该程序更加灵活。
相比之下,组件相关参数的创建是最烦琐的。为了给出下载器、分析器和条目处理管道的实例列表,我们需要分别自定义 HTTP 客户端、HTTP 响应解析函数和条目处理函数。这部分代码在 gopcp.v2/chapter6/webcrawler/examples/finder/internal 代码包中,并在 finder 的主程序中为该包起个别名——lib。
之前说过,net/http 包中的 Client 类型是做 HTTP 客户端程序的必选,并且有很多可定制的地方。它有一个公开的字段 Transport,是 http.RoundTripper 接口类型的,用于实施对单个 HTTP 请求的处理并输出 HTTP 响应。我们可以不对它进行设置,而让程序自动使用由变量 http.DefaultTransport 代表的默认值。实际上,你可以从 http.Default-Transport 的声明中看到自定义 Transport 字段的方法。
下面是为生成 HTTP 客户端而声明的函数:
//用于生成HTTP客户端 fund genHTTPClient() *http.Client { return &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net・Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, MaxIdleConns: 100, MaxIdleConnsPerHost: 5, IdleConnTimeout: 60 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, }, } }
http.Transport 结构体类型实现了 http.RoundTripper 接口,其中也有很多公开的字段供我们设置。我想在这里解释的是 MaxIdleConns、MaxIdleConnsPerHost 和 IdleConnTimeout,它们与我们要爬取的目标网站以及对目标 URL 的范围限定有很大关系。MaxIdleConns 是对空闲链接的最大数量进行设置。空闲的链接可以理解为已经没有数据在传输但是还未断开的链接。
MaxIdleConns 限制的是通过该 HTTP 客户端访问的所有域名和 IP 地址的空闲链接总量。而 MaxIdleConnsPerHost 就不同了,它限制的是针对某一个域名或 IP 地址的空闲链接最大数量。
对于这两个字段的值,一般需要联动地设置。当然,可能无法预知目标网站的二、三级域名有多少个,以及在爬取过程中会以怎样的频率访问到哪些域名。这显然也需要一个调优的过程。
IdleConnTimeout 字段的含义是指定空闲链接的生存时间。如果说 MaxIdleConns 和 MaxIdleConnsPerHost 设置的是什么情况下应该关闭更多的空闲链接的话,那么 IdleConnTimeout 设置的就是什么时候应该进一步减少现有的空闲链接。
HTTP 响应解析函数用于解析 HTTP 响应并试图找到出新的请求和条目。在 finder 中,我们需要两个这样的函数,一个用于查找新请求,另一个用于查找新条目,这里所说的条目即图片。
为了解析 HTML 格式的 HTTP 响应体,我们需要引入一个第三方代码包:github.com/PuerkitoBio/goquery,后面将其简称为 goquery。
类似地,声明了一个 genResponseParsers 函数用于返回 HTTP 响应解析函数的列表:
//用于生成响应解析器 func genResponseParsers() [[module.PaiseResponse { parseLink := func(httpResp *http.Response, respDepth uint32) ([]module.Data, []error) { //省略部分代码 } parselmg := func(httpResp *http.Response, respDepth uint32) ([[module.Data, []error) { //省略部分代码 //生成条目 item := make(map[string]interface{}) item["reader"] = httpRespBody item["name"] = path.Base(reqURL.Path) item["ext"] = pictureFomat dataList = append(dataList, module.Item(item)) return dataList, nil } return []module.ParseResponse{parseLink, parselmg} }
可以看到, genResponseParsers 函数返回的列表中有两个 HTTP 响应解析函数 parseLink 和 parseImg。parseLink 函数会利用 goquery 在 HTML 格式的 HTTP 响应体中查找新的请求。具体流程如下。
1) 检查响应。检查响应本身以及后面会用到的各个部分的有效性。如果无效,就忽略后面的步骤,并直接返回空的数据列表和包含相应错误值的错误列表。
2) 检查 HTTP 响应头中的内容类型。检查 HTTP 响应的 Header 部分中的 Content-Type 的值。如果它不以 text/html 开头,就忽略后面的步骤,并直接返回空的数据列表和 nil。
3) 解析 HTTP 响应体。在响应体中查找 a 标签,并提取它的 href 属性的值。如果该值是一个 URL,就将其封装成请求并追加到数据列表。再在响应体中查找 img 标签,并提取它的 src 属性的值。如果该值是一个 URL,就将其封装成请求并追加到数据列表。如果在解析过程中发生错误,就把错误值追加到错误列表,最后返回数据列表和错误列表。
parseImg 函数也会先检查响应和 HTTP 响应头中的内容类型。不过,它只会继续处理内容类型以 image 开头的 HTTP 响应。一旦可以继续处理,就说明 HTTP 响应体的内容是一个图片的字节序列,这时就可以生成条目了。
如前面的代码所示,在创建一个条目(实际上是一个字典)之后,会分别依据 HTTP 响应体、图片主文件名以及图片扩展名生成键 – 元素对并放入条目值。然后,把该条目追加到数据列表并返回。
相应地,在条目处理函数中,会根据条目中的这 3 个键 – 元素对在指定的目录中创建一个图片文件。用于生成条目处理函数的 genltemProcessors 函数的声明如下:
//用于生成条目处理器 func genItemProcessors(dirPath string) []module.ProcessItem { savePicture := func(item module.Item) (resuIt module.Item, err error) { //省略部分代码 //生成新的条目 result = make(map[string]interface{}) for k, v := range item { result[k] = v } result["file_path"] = filePath fileInfo, err := file・Stat() if err != nil { return nil, err } result["file_size"] = fileInfo.Size() return result, nil } recordPicture := func(item module.Item) (result module.Item, err error) { //省略部分代码 } return []module.ProcessItem{savePicture, recordPicture} }
该函数返回的列表中有两个条目处理函数 savePicture 和 recordPicture。savePicture 函数用于保存图片文件。值得一提的是,在成功保存文件之后,savePicture 函数会生成新的条目。该条目中除了包含 savePicture 接受的那个参数条目中的所有键 – 元素对外,还放入了用于说明图片文件绝对路径的尺寸的键 – 元素对。这样,recordPicture 函数就可以把已存图片文件的信息完整地记录在日志中了。
有了 genHTTPClient,genResponseParsers 和 genItemProcessors 这 3 个函数,我们就可以着手准备各类组件实例的列表了。首先,来看看下载器列表的生成。
要创建一个下载器,光有 HTTP 客户端是不够的,还需要设置它的组件 ID 和组件评分计算器。不过,对于其余两个参数,gopcp.v2/chapter6/webcrawler/module 代码包已经给予了支持。
组件 ID 的生成需要组件类型、序列号和组件网络地址,这里的组件 ID 不需要包含网络地址,因为我们要生成的组件实例与调度器处于同一进程内。为了统一生成序列号,这里声明了一个包级私有的全局变量:
//组件序列号生成器
var snGen = module.NewSNGenertor(l, 0)
至于下载器的组件类型,我们可以直接用 module.TYPE_DOWNLOADERo
在 module 包中有一个极其简单的组件评分计算函数 CalculateScoreSimple,这个函数原本是提供给测试函数使用的。不过对于 finder,我们可以直接使用这个函数,其代码如下:
//简易的组件评分计算函数 func CalculateScoreSimple(counts Counts) uint64 { return counts.CalledCount + counts.AcceptedCount<<1 + counts.CompletedCount<<2 + counts.HandlingNumber<<4 }
有了上述准备,我们可以编写一个用于生成下载器列表的函数。把这个函数命名为 GetDownloaders,并把它存在 gopcp.v2/chapter6/webcrawler/examples/finder/internal 中。注意,与前面描述的那些 internal 包的函数不同,它是公开的。该函数的代码如下:
//用于获取下载器列表 func GetDownloaders(number uint8) ([]module.Downloader, error) { downloaders := []module.Downloader{} if number == 0 { return downloaders, nil } for i := uint8(0); i < number; i++ { mid, err := module.GenMID( module.TYPE_DOWNLOADER, snGen.Get(), nil) if err != nil{ return downloaders, err } d, err := downloader.New( mid, genHTTPClient(), module.CalculateScoreSimple) if err != nil { return downloaders, err } downloaders = append(downloaders, d) } return downloaders, nil }
该函数的功能其实就是根据参数值生成一定数量的下载器并返回给调用方,其中的标识符 downloader 代表 gopcp.v2/chapter6/webcrawler/module/local/downloader 代码包。
用于生成并返回分析器列表的 GetAnalyzers 函数在流程上与 GetDownloaders 极其相似。在生成分析器的时候,它用到了 gopcp.v2/chapter6/webcrawler/module/local/analyzer 包的 New 函数和本包的 genResponseParsers 函数。
GetPipelines 函数的写法也是类似的。不过要注意,由于我们前面编写的条目处理函数需要把图片存到指定目录中,所以 GetPipelines 函数的参数除了 number 外,还有一个 string 类型的 dirPath。dirPath 指定图片存放目录。在调用 genItemProcessors 函数时,GetPipelines 函数会把 dirPath 直接传入。
另一个需要注意的地方是,我们在生成一个条目处理管道后,还要决定它是否是快速失败的。在这里,如果 savePicture 函数没能成功存储图片,那么我们就没必要再让 recordPicture 函数去记录日志了。因此,我们要通过调用条目处理管道的 SetFailFast 函数把它设置为快速失败的。
好了,我们现在已经准备好了初始化调度器所需的所有参数。至此,对于 finder.go 中的 main 函数,我们已经可以完成大半了:
func main() { flag.Usage = Usage flag.Parse() //创建调度器 scheduler := sched.NewScheduler() //准备调度器的初始化参数 domainParts := strings.Split(domains,",") acceptedDomains := []string{} for _, domain := range domainParts { domain = strings.TrimSpace(domain) if domain != "" { acceptedDomains = append(acceptedDomains, domain) } } requestArgs := sched.RequestArgs{ AcceptedDomains: acceptedDomains, MaxDepth: uint32(depth), } dataArgs := sched.DataArgs{ ReqBufferCap: 50, ReqMaxBufferNumber: 1000, RespBufferCap: 50, RespMaxBufferNumber:10, ItemBufferCap: 50, ItemMaxBufferNumber:100, ErrorBufferCap: 50, ErrorMaxBufferNumber:1, } downloaders, err := lib.GetDownloaders(1) if err != nil { logger.Fatalf("An error occurs when creating downloaders: %s", err) } analyzers, err := lib.GetAnalyzers(1) if err != nil { logger.Fatalf("An error occurs when creating analyzers: %s", err) } pipelines, err := lib.GetPipelines(1, dirPath) if err != nil { logger.Fatalf("An error occurs when creating pipelines: %s", err) } moduleArgs := sched.ModuleArgs{ Downloaders: downloaders, Analyzers: analyzers, Pipelines: pipelines, } //初始化调度器 err = scheduler.init( requestArgs, dataArgs, moduleArgs) if err != nil { logger.Fatalf("An error occurs when initializing scheduler: %s", err) } //省略部分代码 }
重申一下,其中的标识符 lib 代表 gopcp.v2/chapter6/webcrawler/examples/finder/ internal 代码包。另外注意,logger.Fatalf 总会在打印日志之后使当前进程非正常终止。所以,一旦调度器初始化失败,finder 就不会再做任何事了。
监控调度器
现在,我们已经可以启动调度器了。不过,或许我们可以准备得更充分一些。任何持续运行的软件服务都应该被监控,不论用什么方式监控。这几乎已经是互联网公司的一条铁律了。虽然 finder 只是一个可以查找并保存图片的工具,但是由于它可能会为了搜遍目标网站而运行很长一段时间,所以我们还是应该提供一种监控方法。
我会把用于监控调度器的代码封装在一个名为 Monitor 的函数里,并把它置于代码包 gopcp.v2/chapter6/webcrawler/examples/finder/monitor 中。
Monitor 函数的功能主要有如下 3 个。
- 在适当的时候停止自身和调度器。
- 实时监控调度器及其中的各个模块的运行状况。
- 一旦调度器及其模块在运行过程中发生错误,及时予以报告。
和其他部分一样,这些功能可以定制。
1) 确定参数
对于第一个功能,我们需要明确一点:只有在调度器空闲一段时间之后,才关闭它。所以,我们应该定时循环地去调用调度器的 Idle 方法,以检查它是否空闲。如果连续若干次的检查结果均为 true,那么就可以断定再没有新的数据需要处理了。
这时,关闭调度器就是安全的。这里有两个可以灵活掌握的环节:一个是检查的间隔时间,另一个是 检查结果连续为 true 的次数。只要给定了这两个可变量,自动关闭调度器的策略就完全 确定了。我们把检查的间隔时间与检査结果连续为 true 的最大次数的乘积称为最长持续 空闲时间,即:
最长持续空闲时间 = 检查间隔时间 x 检查结果连续为 true 的最大次数
一旦调度器空闲的时间达到了最长持续空闲时间,就可以关闭调度器了,不过这个决定应该由监控函数的使用方来做。
监控函数的第二个功能是对调度器的运行状况进行监控。我们在前面编写调度器以及相关模块的时候都留有摘要或统计接口,所以它实现起来并不难。这里也存在两个可变量:摘要获取间隔时间和摘要记录的方式。该函数的第三个功能也需要用到第二个可变量。
经过上述分析,我们已经可以确定调度器监控函数的签名了:
// Monitor用于监控调度器。 //参数 scheduler 代表作为监控目标的调度器。 //参数 checkInterval 代表检查间隔时间,单位:纳秒。 //泰数 summarizeInterval 代表摘要获取间隔时间,单位:纳秒。 //参数 maxIdleCount 代表最大空闲计数。 //参数 autoStop 用来指示该方法是否在调度器空闲足够长的时间之后自行停止调度器。 //参数 record 代表日志记录函数。 //当监控结束之后,该方法会向作为唯一结果值的通道发送一个代表空闲状态检查次数的数值 func Monitor( scheduler sched.Scheduler, checkInterval time.Duration, summarizeInterval time.Duration, maxIdleCount uint, autoStop bool, record Record) <-chan uint64
在 Monitor 函数的参数声明列表中,record 就是使用方需要定制的摘要的记录方式。Record 类型的声明如下:
// Record 代表日志记录函数的类型。
//参数 level 代表日志级别。级别设定:0-普通;1-警告;2-错误
type Record func(level uint8, content string)
这个函数类型表达的含义是根据指定的日志级别对内容进行记录。
另一方面,Monitor 函数会返回一个接收通道。在它执行结束之时,它还会向该通道发送一个数值,这个数值表示它检査调度器空闲状态的实际次数。使用方可以通过这个实际次数计算出当次爬取流程的总执行时间。不过,这个通道更重要的作用是作为使用方安全关闭调度器的依据。
2) 制定监控流程
大家可能会发现,Monitor 函数的 3 个功能之间实际上并没有交集。因此,我们可以在实现该函数的时候保持这 3 个功能的独立性,以避免它们彼此干扰。Monitor 函数的完整声明是这样的:
func Monitor( scheduler sched.Scheduler, checkInterval time.Duration, summarizeInterval time.Duration, maxIdleCount uint, autoStop bool, record Record) <-chan uint64 { //防止调度器不可用 if scheduler == nil { panic(errors.New("The scheduler is invalid!")) } //防止过小的检查间隔时间对爬取流程造成不良影响 if checkinterval < time.Millisecond*100 { checkinterval = time.Millisecond * 100 } //防止过小的摘要获取间隔时间对爬取流程造成不良影响 if summarizeInterval < time.Second { summarizeInterval = time.Second } //防止过小的最大空闲计数造成调度器的过早停止 if maxIdleCount < 10 { maxIdleCount = 10 } logger.Infof("Monitor parameters: checkInterval: %s, summarizeInterval: %s,"+ "maxIdleCount: %d, autoStop: %v", checkInterval, summarizeInterval, maxIdleCount, autoStop) //生成监控停止通知器 stopNotifier, stopFunc := context.WithCancel(context.Background()) //接收和报告错误 reportError(scheduler, record, stopNotifier) //记录摘要信息 recordsummary(scheduler, summarizeInterval, record, stopNotifier) //检查计数通道 checkCountChan := make(chan uint64, 2) //检查空闲状态 checkStatus(scheduler, checkInterval, maxIdleCount, autoStop, checkCountChan, record, stopFunc) return checkCountChan }
这里简要解释一下它体现的监控流程。首先,监控函数必须对传入函数的参数值进行检査,其中最重要的是代表调度器实例的 scheduler。如果它为 nil,那么这个监控流程就完全没有执行的必要了。监控函数在发现此情况时,会把它视为一个致命的错误并引发一个运行时恐慌。
此外,监控函数还需要对检查间隔时间、摘要获取间隔时间和最大空闲计数的值进行检查。这些值既不能是负数,也不能是过小的正数,因为过小的正数会影响到爬取流程的正常执行。所以,在这里分别为它们设定了最小值。
让实现那 3 个功能的代码并发执行。与调度器的实现类似,需要让这些代码知道什么时候需要停止。同样,这里使用一个可取消的 context.Context 类型值来传递停止信号,并由 stopNotifier 变量代表。同时,stopFunc 变量代表触发停止信号的函数。
函数 reportError、recordSummary 和 checkStatus 分别表示 Monitor 函数需要实现的那 3 个功能,它们都会启用一个 goroutine 来执行其中的代码。稍后会分别描述它们的实现细节。
对于 Monitor 函数的函数体,最后要说明的是变量 checkCountChan,它代表的就是用来传递检查调度器空闲状态的实际次数的通道。它的值会被传入 checkStatus 函数,然后 由 Monitor 函数返回给它的调用方。
3) 报告错误
报告错误的功能由 reportError 函数负责实现。一旦调度器启动,就应该通过调用它的 ErrorChan 方法获取错误通道,并不断地尝试从中接收错误值。我们已经在前面详述过这样做的原因。
函数 reportError 接受 3 个参数。除了代表调度器的 scheduler 之外,还有代表日志记录方式的 record,以及代表监控停止通知器的 stopNotifier。下面是它的完整声明:
//用于接收和报告错误 func reportError( scheduler sched.Scheduler, record Record, stopNotifier context.Context) { go func() { //等待调度器开启 waitForSchedulerStart(scheduler) errorChan := scheduler.ErrorChan() for { //查看监控停止通知器 select { case <- stopNotifier.Done(): return default: } err, ok := <- errorChan if ok { errMsg := fmt.Sprintf("Received an error from error channel: %s", err) record(2, errMsg) } time.Sleep(time.Microsecond) } }() }
这个函数启用了一个 goroutine 来执行其中的代码。在 go 函数中,它首先调用了 waitForSchedulerStart 函数。我们都知道,调度器有一个公开的方法 Status,该方法会返回一个可以表示调度器当前状态的值。因此,该函数要做的就是不断调用调度器的 Status 方法,直到该方法的结果值等于 sched.SCHED_STATUS_STARTED 为止。
当然,在对 Status 方法的多次调用之间,都需要有一个小小的停顿,这个停顿是通过 time.Sleep 函数实现的。这样做是为了避免因 for 循环迭代得太过频繁而可能带来的一些问题,比如挤掉其他 goroutine 运行的机会、致使 CPU 的使用率过高,等等。这是一种保护措施,虽然那些问题不一定会发生。
go 函数中的第 2 条语句,是调用调度器的 ErrorChan 方法并获取到错误通道。还记得 ErrorChan 方法是怎样实现的吗?它在每次调用时,都会创建一个错误通道,并持续从当前调度器持有的错误缓冲池向这个错误通道搬运错误值,直到错误缓冲池被关闭。正因为如此,我们不应该在 for 语句中调用它。
在 for 语句每次迭代开始,go 函数都会尝试用 select 语句从 stopNotifier 获取停止信号。一旦获取到停止信号,它就马上返回以结束当前流程的执行。select 语句中的 default case 意味着这个获取操作只是尝试一下而已。即使没获取到停止信号,select 语句的执行也会立即结束。
之后,go 函数就会试图从错误通道那里接收错误值。一旦接收到一个有效的错误值, 它就调用 record 函数记录下这个错误值。注意,与尝试获取停止信号的方式不同,这里的接收操作是阻塞式的。
在每次迭代的最后,go 函数也会通过调用 time.Sleep 函数实现一个小停顿。
4) 记录摘要信息
recordSummary 函数负责记录摘要信息,它的签名如下:
//用于记录摘要信息 func recordSummary( scheduler sched.Scheduler, summarizeInterval time.Duration, record Record, stopNotifier context.Context)
可以看到,它接受 4 个参数,其中的 3 个参数也是 reportError 函数所接受的。多出的那个参数是 summarizeInterval,即摘要获取间隔时间。
这个函数同样启用了一个 goroutine 来进行相关操作。与 reportError 函数相同,在一开始依然要先调用 waitForSchedulerStart 函数,以等待调度器完全启动。一旦调度器已启动,go 函数就要开始为摘要信息的获取、比对、组装和记录做准备了。这里需要先声明如下几个变量:
var prevSchedSummaryStruct sched.SummaryStruct
var prevNumGoroutine int
var recordCount uint64 = 1
startTime := time.Now()
其中,变量 recordCount 和 startTime 的值会参与到最终的摘要信息的组装过程中去。前者代表了记录的次数,而后者则代表开始准备记录时的时间。在它们前面声明的两个变量 prevSchedSummaryStruct 和 prevNumGoroutine 的含义分别是前一次获得的调度器摘要信息和 goroutine 数量,它们是是否需要真正记录当次摘要信息的决定因素。
go 函数每次都会把当前获取到的摘要信息与前一次的做比对。只有确定它们不同,才会对当前的摘要信息予以记录,这主要是为了减少摘要信息对其他日志的干扰。
go 函数应该在停下来之前定时且循环地获取和比对摘要信息。因此,我把后面的代码都放到了一个 for 代码块中。在每次迭代开始时,仍然需要通过 stopNotifier 检查停止信号。如果停止信号还没有发出,那么就开始着手获取摘要信息的各个部分,即:goroutine 数量和调度器摘要信息。goroutine 数量代表的是当前的 Go 运行时系统中活跃的 goroutine 的数量,而调度器摘要信息则体现了调度器当前的状态。获取它们的方式如下:
//获取 goroutine 数量和调度器摘要信息
currNumGoroutine := runtime.NumGoroutine()
currSchedSummaryStruct := scheduler.Summary().Struct()
一旦得到它们,就分别把它们与变量 prevNumGoroutine 和 prevSchedSummaryStruct 的值进行比较。这里的比较操作很简单。变量 currNumGoroutine 及 prevNumGoroutine 都是 int 类型的,可以直接比较,而调度器摘要信息的类型 sched.SummaryStruct 也提供了可判断相同性的 Same 方法。如果它们两两相同,就不再进行后面的组装和记录操作了。否则,就开始组装摘要信息:
//比对前后两份摘要信息的一致性,只有不一致时才会记录 if currNumGoroutine != prevNumGoroutine || !currSchedSummaryStruct.Same(prevSchedSummaryStruct) { //记录摘要信息 summay := summary{ NumGoroutine: runtime.NumGoroutine(), SchedSummary: currSchedSummaryStruct, EscapedTime: time.Since(startTime).String(), } b, err := json.MarshalIndent(summay,""," ") if err != nil { logger.Errorf("Occur error when generate scheduler summary: %s/n", err) continue } msg := fmt.Sprintf("Monitor summary[%d]:/n%s", recordCount, b) record(0, msg) prevNumGoroutine = currNumGoroutine prevSchedSummaryStruct = currSchedSummaryStruct recordCount++ }
组装摘要信息用到了当前包中声明的结构体类型 summary,其定义如下:
//代表监控结果摘要的结构 type summary struct { // goroutine 的数量 NumGoroutine int 'json:"goroutine_number"' //调度器的摘要信息 SchedSummary sched.SummaryStruct 'json:"sched_summary"' //从开始监控至今流逝的时间 EscapedTime string 'json:"escaped_time"' }
可以看到,该类型也为 JSON 格式的序列化做好了准备。使用 encoding/json 包中的 MarshalIndent 函数把该类型的值序列化为易读的 JSON 格式字符串,然后通过调用 record 函数记录它们。
紧接着,对 prevNumGoroutine 和 pievSchedSummaryStruct 进行赋值,以便进行后续的比较操作。最后,递增 recordCount 的值也非常必要,因为它是摘要信息的重要组成部分。
函数 recordSummary 中的 go 函数所包含的 for 代码块基本上就是如此。此外,为了让这个 for 循环在迭代之间能有一个小小的停顿,把下面这条语句放在了 for 代码块的最后:
time.Sleep(time.Microsecond)
实际上,与 reportError 函数相比,recordSummary 函数更有必要加上这条语句。
与错误的接收和报告一样,对摘要信息的获取、比对、组装和记录也是独立进行的。它由 Monitor 函数启动,并会在接收到停止信号之后结束。发送停止信号的代码存在于 checkstatus 函数中,因为只有它才知道什么时候停止监控。
5) 检查状态
函数 checkstatus 的主要功能是定时检查调度器是否空闲,并在它空闲持续一段时间之后停止监控和调度器。为此,该函数需要适时检查各种计数值,并在必要时发出停止信号。checkstatus 是一个比较重要的辅助函数,职责也较多,它的签名如下:
//用于检查狀态,并在满足持续空闲叶间的条件时采取必要措施 func checkstatus( scheduler sched.Scheduler, checkInterval time.Duration, maxIdleCount uint, autoStop bool, checkCountChan chan <- uint64, record Record, stopFunc context.CancelFunc)
其中的参数前面都介绍过。注意,参数 checkCountChan 的类型是一个发送通道,这是为了限制 checkstatus 函数对它的操作。
checkstatus 函数也把所有代码都放入了 go 函数。go 函数中的第一条语句是:
var checkCount uint64
该变量代表的是检查计数值。紧接着是一条 defer 语句:
defer func() { stopFunc() checkCountChan <- checkCount }()
它的作用是保证在go函数执行即将结束时发出停止信号和发送检查计数值,这个时 机非常关键。这个go函数总会在调度器空闲的时间达到最长持续空闲时间时结束执行。
在等待调度器开启之后,go函数首先要做的就是准确判定最长持续空闲时间是否到 达。为了让这一判定有据可依,下面这个变量是必需的:
var idleCount uint
它的值将会代表监控函数连续发现调度器空闲的计数值。另外,为了记录调度器持续空闲的时间,还需要声明一个这样的变量:
var firstIdleTime time.Time
注意,真实的持续空闲时间与理想的持续空闲时间(由参数 checkInterval 和 maxIdleCount 的值相乘得出的那个时间)之间肯定是有偏差的。并且,前者肯定会大于后者,因为执行判定的代码也是需要耗时的。
可以肯定的是,我们需要在一个 for 循环中进行与持续空闲时间判定有关的那些操作。由于这条 for 语句中的条件判断比较多且复杂,所以我先贴出它们然后再进行解释:
for { //检查调度器的空闲状态 if scheduler.Idle() { idleCount++ if idleCount == 1 { firstIdleTime = time.Now() } if idleCount >= maxIdleCount { msg := fmt.Sprintf(msgReachMaxIdleCount, time.Since(firstIdleTime).String()) record(0, msg) //再次检查调度器的空闲状态,确保它已经可以停止 if scheduler.Idle() { if autoStop { var result string if err := scheduler.Stop(); err == nil { result = "success" ' } else { result = fmt.Sprintf("failing(%s)", err) } msg = fmt.Sprintf(msgStopScheduler, result) record(0, msg) } break } else { if idleCount > 0 { idleCount = 0 } } } } else { if idleCount > 0 { idleCount = 0 } } checkCount++ time.Sleep(checkInterval) }
可以看到,总是让 checkCount 的值随着迭代的进行而递增,同时也会依据 checkInterval 让每次迭代之间存在一定的时间间隔。
在此 for 代码块的最开始,通过调用调度器的 Idle 方法来判断它是否已经空闲。如果不是,就及时清零 idleCount 的值。因为一旦发现调度器未空闲,就要重新进行计数。
反过来讲,如果发现调度器已空闲,就需要递增 idleCount 的值。同时,如果发现重新计数刚刚开始,就会把 firstIdleTime 的值设置为当前时间。只有这样才能在 idleCount 的值达到最大空闲计数时,根据 firstIdleTime 的值准确计算出真实的最长持续空闲时间。
在做好计数和起始时间的检查和校正工作之后,会马上把 idleCount 的值与最大空闲计数相比较。如果前者大于或等于后者,就可以初步判定调度器已经空闲了足够长的时间,这时,会立刻记下一条基于模板 msgReachMaxIdleCount 生成的消息。该模板的声明如下:
//已达到最大空闲计数的消息模板
var msgReachMaxIdleCount = "The scheduler has been idle for a period of time" +
"(about %s)." + " Consider to stop it now."
这条消息建议网络爬虫框架的使用方关闭调度器。不过,使用方可以通过把参数 autoStop 的值设置为 true,让调度器监控函数自动关闭调度器,这也是后面再次调用调度器的 Idle 方法的原因之一。
如果这里的调用结果值和 autoStop 参数的值均为 true,那么函数就帮助使用方停止调度器。如果调用结果值为 false,那么 idleCount 变量的值也会被及时清零,对调度器空闲的计数将重新开始。这显然是一种比较保守的做法,但却可以有效地避免过早地停止调度器。
实际上,只要对 Idle 方法第二次调用的结果值为 true,不管 autoStop 参数的值是怎样的,都会退出当前的 for 代码块。for 代码块执行结束就意味着 checkstatus 函数异步执行结束。还记得吗?在 checkstatus 中的 go 函数执行结束之际,它会发出停止信号,同时向通道 checkCountChan 发送检查计数值。
至此,已经展示和说明了 checkstatus 函数以及 Monitor 函数涉及的绝大多数代码。
6) 使用监控函数
有了 Monitor 函数,就可以在 finder 的 main 函数中这样使用它来启动对调度器的监控了:
//准备监控参数 checkInterval := time.Second summarizeInterval := 100 * time.Millisecond maxIdleCount := uint(5) //开始监控 checkCountChan := monitor.Monitor( scheduler, checkInterval, summarizeInterval, maxIdleCount, tiue, lib.Record) //省略部分代码 //等待监控结束 <-checkCountChan
把检查间隔时间设置为 10 毫秒,并把最大空闲计数设置为 50 同时,让 Monitor 函数在调度器的持续空闲时间达到最长持续空闲时间后自动关闭调度器。
调用 Monitor 函数的时机是在初始化调度器之后,以及启动调度器之前。因为只有调度器被初始化过,它的大多数方法才能正常执行。另外,只有在调度器启动之前开始监控,才能记录下它启动时的状况。
给予 Monitor 函数的参数值 lib.Record 代表前面所说的日志记录函数,它的声明是这样的:
//记录日志 func Record(level byte, content string) { if content == "" { return } switch level { case 0: logger.Infoln(content) case 1: logger.Warnln(content) case 2: logger.Infoln(content) } }
其中 logger 代表 gopcp.v2/helper/log/base 包下 MyLogger 类型的日志记录器。
启动调度器
现在,我们真的可以启动调度器了。做了这么多准备工作,只需区区几行代码就可以启动调度器了,这些代码在 finder 的 main 函数的最后面:
//准备调度器的启动参数 firstHTTPReq, err := http.NewRequest("GET", firstURL, nil) if err != nil { logger.Fatalln(err) return } //开启调度器 err = scheduler.Start(firstHTTPReq) if err != nil { logger.Fatalf("An error occurs when starting scheduler: %s", err) } //等待监控结束 <-checkCountChan
基于命令参数 firstURL,我们可以很容易地创建出首次请求。如果启动调度器不成功,就记下一条严重错误级别的日志。还记得吗?这会使当前进程非正常终止。纵观 main 函数的代码你就会发现,它遇到任何错误都会这样做。这是因为一旦主流程出错,finder 就真的无法再运行下去了。
最后,为了让主 goroutine 等待监控和调度器的停止,我们还加入了对检查计数通道 checkCountChan 的接收操作。
到这里,我们讲述了图片爬虫程序 finder 涉及的几乎所有流程的代码。强烈建议大家在自己的计算机上运行 finder,然后试着改变各种参数的值,再去运行它,并且多多试几次。当然,也可以修改 finder 甚至网络爬虫框架的代码。总之,不论在哪个阶段,阅读、理解、修改、试验是学习编程的必经之路。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/22622.html