本文共 21481 字,大约阅读时间需要 71 分钟。
概述:WebMagic的结构分为Downloader
、PageProcessor
、Scheduler
、Pipeline
四大组件,并由Spider将它们彼此组织起来。这四大组件对应爬虫生命周期中的下载、处理、管理和持久化等功能。
webmagic 的启动类时Spider,启动时run()方法运行时,会进行初始化各种组件,然后循环运行,从scheduler中取request,包装成runnable,进行请求处理。这个流程中,Scheduler组件负责去重和装待处理的请求,Downloader组件负责下载请求结果包装成page类, pageProcessor组件负责处理page结果,并且可以从page中添加需要采集的另外url,用page.resultItems装处理的结果数据,给Pipeline组件处理结果。
具体使用请看官方文档,或搜索相关案例,本篇主要是对webmagic的源码解析。
一、流程分析
1、主要方法
public void run() { // 检查运行状态,并且compareAndSet成运行 checkRunningStat(); // 初始化组件 initComponent(); logger.info("Spider {} started!",getUUID()); // 当前线程非中断,运行状态持续运行。 while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { // 处理请求,下载page,pageProcess处理page,pipeline处理page processRequest(request); onSuccess(request); } catch (Exception e) { onError(request); logger.error("process request " + request + " error", e); } finally { pageCount.incrementAndGet(); signalNewUrl(); } } }); } } // 设置状态为停止 stat.set(STAT_STOPPED); if (destroyWhenExit) { close(); } logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get()); }
从上面可以看出,Spider就是协调各种组件执行,核心逻辑是从scheduler中取出request包装成任务交由线程池运行,几个主要的组件就是在这个流程中负责各自运行的职责。
2)初始化组件initComponent
protected void initComponent() { if (downloader == null) { // 默认Downloader为HttpClientDownloader this.downloader = new HttpClientDownloader(); } if (pipelines.isEmpty()) { // 默认Pipeline为ConsolePipeline pipelines.add(new ConsolePipeline()); } downloader.setThread(threadNum); if (threadPool == null || threadPool.isShutdown()) { if (executorService != null && !executorService.isShutdown()) { threadPool = new CountableThreadPool(threadNum, executorService); } else { threadPool = new CountableThreadPool(threadNum); // 线程池为CountableThreadPool包装 } } if (startRequests != null) { // 初始添加请求到scheduler, 默认scheduler为QueueScheduler for (Request request : startRequests) { addRequest(request); } startRequests.clear(); } startTime = new Date(); }
几个核心组件的的初始实现downloader-HttpClientDownloader、pipelines-ConsolePipeline、scheduler-QueueScheduler pageProcessor-手动制定。
3、处理请求方法processRequest
private void processRequest(Request request) { Page page = downloader.download(request, this); // 下载-即通过HttpClient下载页面,并且构建Page对象。 if (page.isDownloadSuccess()){ // 下载成功 onDownloadSuccess(request, page); } else { // 失败循环 onDownloaderFail(request); } }
// 下载成功处理
private void onDownloadSuccess(Request request, Page page) { if (site.getAcceptStatCode().contains(page.getStatusCode())){ pageProcessor.process(page); // 先用pageProcessor处理page extractAndAddRequests(page, spawnUrl); // 提取添加的request if (!page.getResultItems().isSkip()) { // 如果没有设置isSkip for (Pipeline pipeline : pipelines) { pipeline.process(page.getResultItems(), this); // 多个pipelines处理结果 } } } sleep(site.getSleepTime()); return; }
下载成功的处理逻辑,就是进过pageProcessor进行处理page,然后提取addTragetRequest添加到scheduler中,然后在判断是否设置了isSkip,如果没有,那么就用多个pipelines进行结果的处理。
///下载失败的处理
private void onDownloaderFail(Request request) { if (site.getCycleRetryTimes() == 0) { // 如果错误循环重试次数为0,那么只停顿 sleep(site.getSleepTime()); } else { doCycleRetry(request); // 循环重试---详见下一代码片段 } }
// 循环重试
private void doCycleRetry(Request request) { Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES); if (cycleTriedTimesObject == null) { // 循环重试次数0时,clone一个新请求添加设置次数1 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1)); } else { int cycleTriedTimes = (Integer) cycleTriedTimesObject; cycleTriedTimes++; // 重试1次是,clone一个新请求,将附加参数递增。 if (cycleTriedTimes < site.getCycleRetryTimes()) { addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes)); } } sleep(site.getRetrySleepTime()); }
对上面的流程,画了2个较官方的版本更为详细的一个流程图
二、组件分析-scheduler (调度)
SchedulerScheduler负责管理调度待抓取的URL,以及一些去重的工作。WebMagic默认提供了LinkedBlockingQueue的内存队列来管理URL,并用集合来进行去重。也支持使用Redis进行分布式管理。主要的scheduler实现有下面几种。
DuplicateRemovedScheduler: Scheduler基类,内置一个DuplicatedRemover实现为HashSetDuplicateRemover。
QueueScheduler:队列Scheduler, 就是通过一个LinkedBlockingQueue存储待采集连接。FileCacheQueueScheduler:文件缓存队列Scheduler,在队列的基础上,通过文件记录所有urls,以及poll的位置, 如果临时停止,那么再次采集可以通过文件恢复。RedisScheduler:使用Redis保存抓取队列,其中list保存queue队列,set负责去重,hash,存储请求附加数据。2.1 DuplicateRemovedScheduler
public abstract class DuplicateRemovedScheduler implements Scheduler { // 默认重复器 private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover(); public DuplicateRemover getDuplicateRemover() { return duplicatedRemover; } public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) { this.duplicatedRemover = duplicatedRemover; return this; } // 是post请求或循环重试或不重复才添加。 @Override public void push(Request request, Task task) { logger.trace("get a candidate url {}", request.getUrl()); if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) { logger.debug("push to queue {}", request.getUrl()); pushWhenNoDuplicate(request, task); } } // 循环重试的请求不进行判断 protected boolean shouldReserved(Request request) { return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; } // POST请求不进行判断 protected boolean noNeedToRemoveDuplicate(Request request) { return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod()); } // 子类负责实现 protected void pushWhenNoDuplicate(Request request, Task task) { } }
2.2 QueueScheduler 内存队列url调度
QueueScheduler实现就是由内部的一个LinkedBlockingQueue负责get或者poll获取url。
2.3 RedisScheduler Redis分布式url调度
1)成员变量
protected JedisPool pool;private static final String QUEUE_PREFIX = "queue_"; // 构建list队列的key的前缀private static final String SET_PREFIX = "set_"; // 构建set的key的前缀private static final String ITEM_PREFIX = "item_"; // 构建hash的key的前缀public RedisScheduler(JedisPool pool) { this.pool = pool; setDuplicateRemover(this); // 设置重复的实现为自身。} protected String getSetKey(Task task) { return SET_PREFIX + task.getUUID(); } protected String getQueueKey(Task task) { return QUEUE_PREFIX + task.getUUID(); } protected String getItemKey(Task task) { return ITEM_PREFIX + task.getUUID(); }
上面是成员变量,通过queue保存url队列,通过set负责去重,通过hash负责存储附属数据。
另外构造函数,传入了或自己构建jedisPool,并且自身实现了DuplicateRemover, 且设置到此对象的duplicateRemover字段中。
2) 去重实现
public boolean isDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { return jedis.sadd(getSetKey(task), request.getUrl()) == 0; } finally { pool.returnResource(jedis); } }
3) push
public void pushWhenNoDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { jedis.rpush(getQueueKey(task), request.getUrl()); if (request.getExtras() != null) { String field = DigestUtils.md5Hex(request.getUrl()); String value = JSON.toJSONString(request); jedis.hset(getItemKey(task), field, value); } } finally { pool.returnResource(jedis); } }
4) pool
public Request poll(Task task) { Jedis jedis = pool.getResource(); try { String url = jedis.lpop(getQueueKey(task)); if (url == null) { return null; } String key = getItemKey(task); String field = DigestUtils.md5Hex(url); byte[] bytes = jedis.hget(key.getBytes(), field.getBytes()); if (bytes != null) { Request o = JSON.parseObject(new String(bytes), Request.class); return o; } return new Request(url); } finally { pool.returnResource(jedis); } }
2.3 FileCacheQueueScheduler
FileCacheQueueScheduler是文件缓存队列Scheduler,其主要是利用2个文件分别记录所有urls和已经使用过的个数cursor,在push或poll方式执行时进行init方法的初始化,初始化时会读取2个文件,其中文件存储的所有urls添加到urls中,大于cursor数量的url包装成request添加到queue中。
并且FileCacheQueueScheduler有2个printWriter初始化时创建,负责将push时,增加的url同时写入到此writer(增量), 另poll时,通过cursor记录poll的个数,这样恢复的时候,就可以通过cursor判断,大于cursor的url才是没有poll使用过的。
@Override public Request poll(Task task) { if (!inited.get()) { init(task); } // poll时候,需要文件记录cursor角标 (poll过的个数) fileCursorWriter.println(cursor.incrementAndGet()); return queue.poll(); } @Override protected void pushWhenNoDuplicate(Request request, Task task) { if (!inited.get()) { init(task); } queue.add(request); // push时候,需要文件记录url fileUrlWriter.println(request.getUrl()); } push和poll添加url和cursor到队列和urls时,同步写入到文件中。 private void init(Task task) { this.task = task; File file = new File(filePath); if (!file.exists()) { file.mkdirs(); } readFile(); initWriter(); initFlushThread(); inited.set(true); logger.info("init cache scheduler success"); }
初始化方法主要是用来恢复urls和request的, 从file文件读取cursor值,从url文件读取urls。然后queue的恢复就是根据大于cursor的url进行构建的request(url)。
三、组件分析-Downloader
Downloader负责从互联网上下载页面,以便后续处理。WebMagic默认使用了作为下载工具。
默认的Download实现是HttpClientDownloader。核心处理就是通过httpClient下载页面数据组装成Page对象。
public Page download(Request request, Task task) { if (task == null || task.getSite() == null) { throw new NullPointerException("task or site can not be null"); } CloseableHttpResponse httpResponse = null; CloseableHttpClient httpClient = getHttpClient(task.getSite()); Proxy proxy = proxyProvider != null ? proxyProvider.getProxy(task) : null; HttpClientRequestContext requestContext = httpUriRequestConverter.convert(request, task.getSite(), proxy); Page page = Page.fail(); try { httpResponse = httpClient.execute(requestContext.getHttpUriRequest(), requestContext.getHttpClientContext()); page = handleResponse(request, request.getCharset() != null ? request.getCharset() : task.getSite().getCharset(), httpResponse, task); onSuccess(request); logger.info("downloading page success {}", request.getUrl()); return page; } catch (IOException e) { logger.warn("download page {} error", request.getUrl(), e); onError(request); return page; } finally { if (httpResponse != null) { //ensure the connection is released back to pool EntityUtils.consumeQuietly(httpResponse.getEntity()); } if (proxyProvider != null && proxy != null) { proxyProvider.returnProxy(proxy, page, task); } } }
四、组件分析-Pipeline
Pipeline负责抽取结果的处理,包括计算、持久化到文件、数据库等。WebMagic默认提供了“输出到控制台”和“保存到文件”两种结果处理方案。
1、控制台Pipelinepublic class ConsolePipeline implements Pipeline { @Override public void process(ResultItems resultItems, Task task) { System.out.println("get page: " + resultItems.getRequest().getUrl()); for (Map.Entryentry : resultItems.getAll().entrySet()) { System.out.println(entry.getKey() + ":\t" + entry.getValue()); } } }
2、文件Pipeline
public class FilePipeline extends FilePersistentBase implements Pipeline { public FilePipeline() { setPath("/data/webmagic/"); } public FilePipeline(String path) { setPath(path); } @Override public void process(ResultItems resultItems, Task task) { String path = this.path + PATH_SEPERATOR + task.getUUID() + PATH_SEPERATOR; try { PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(getFile(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".html")),"UTF-8")); printWriter.println("url:\t" + resultItems.getRequest().getUrl()); for (Map.Entryentry : resultItems.getAll().entrySet()) { if (entry.getValue() instanceof Iterable) { Iterable value = (Iterable) entry.getValue(); printWriter.println(entry.getKey() + ":"); for (Object o : value) { printWriter.println(o); } } else { printWriter.println(entry.getKey() + ":\t" + entry.getValue()); } } printWriter.close(); } catch (IOException e) { logger.warn("write file error", e); } }}
第二部分 webmagic解析工作
webmagic的页面抽取解析工作,webmagic解析页面元素可以通过xpath、css、json、正则。
抽取解析工作主要由2个顶级接口实现。 Selectable:包装页面元素内容的一个链式解析操作的顶级接口。 (其中的一些api会调用不同的selector实现不同的解析方式) Selector:解析器的顶级接口一、先从解析实际操作的Selector接口讲起。
顶级接口Selector常见几个实现类
BaseElementSelector: 是通过jsoup实现的一个基类,做了一个jsoup.parse(text),以及子类实现的模板方法 CssSelector: css选择器的实现 LinksSelector:css选择器的实现link提取 实现原理 element.select('a') element.attr("abs:href") or element.attr("href") XpathSelector:基于Xsoup(扩展jsoup)实现的xpath选择器的实现 JsonPathSelector:基于jsonPath的一个json解析实现1、顶级接口Selector public interface Selector { // 提取一个结果 public String select(String text); // 提取所有的结果 public ListselectList(String text); }
2、BaseElementSelector基类 扩展实现了ElementSelector接口,根据返回值是是Element和结果值是String,以及结果List还是single一共有4个未实现 的抽象方法,并且有四个已经实现的方法(通过Jsoup.parse(text) 将参数变成Element) public abstract Element selectElement(Element element); public abstract ListselectElements(Element element); public String select(Element element); public List selectList(Element element);
3、CssSelector类 元素返回值的实现很简单,就是调用Jsoup本身的节点api操作。 @Override public Element selectElement(Element element) { Elements elements = element.select(selectorText); if (CollectionUtils.isNotEmpty(elements)) { return elements.get(0); } return null; } @Override public ListselectElements(Element element) { return element.select(selectorText); } String返回值的实现,就要提取元素中的text字符串 public List selectList(Element doc) { List strings = new ArrayList (); List elements = selectElements(doc); if (CollectionUtils.isNotEmpty(elements)) { for (Element element : elements) { String value = getValue(element); // getValue获取每个元素的值,默认是提取outerHtml if (value != null) { strings.add(value); } } } return strings; }
4、XpathSelector类 --- xpath解析
主要是调用基于Xsoup扩展了css选择器按照xpath规则选择的选择器,此扩展另行单独分析。
5、RegexSelector类 --- regex正则解析 private void compileRegex(String regexStr) { // 大小写不分,点可以代表全部(默认不匹配换行) this.regex = Pattern.compile(regexStr, Pattern.DOTALL | Pattern.CASE_INSENSITIVE); this.regexStr = regexStr; } // 构造方法 public RegexSelector(String regexStr) { this.compileRegex(regexStr); if (regex.matcher("").groupCount() == 0) { this.group = 0; // 没有捕捉组是全部 } else { this.group = 1; // 有捕捉组是第一个捕捉组内容 } } // select实现 @Override public String select(String text) { return selectGroup(text).get(group); } // selectGroup所有的 public RegexResult selectGroup(String text) { Matcher matcher = regex.matcher(text); if (matcher.find()) { String[] groups = new String[matcher.groupCount() + 1]; // +1是因为全部结果计算为0. for (int i = 0; i < groups.length; i++) { groups[i] = matcher.group(i); } return new RegexResult(groups); } return RegexResult.EMPTY_RESULT; }
二、Selectable 通用包装页面元素,并且提供多种解析方式的api接口,即可以通过调用不同的Selector实现不同的功能。
1、Selectable 顶级接口 主要实现: AbstractSelectable HtmlNode : HTML元素 PlainText : 纯文本
public interface Selectable { public Selectable xpath(String xpath); public Selectable $(String selector); public Selectable $(String selector, String attrName); public Selectable css(String selector); public Selectable css(String selector, String attrName); public Selectable smartContent(); public Selectable links(); public Selectable regex(String regex); public Selectable regex(String regex, int group); public Selectable replace(String regex, String replacement); public String toString(); public String get(); public boolean match(); public Listall(); public Selectable jsonPath(String jsonPath); public Selectable select(Selector selector); public Selectable selectList(Selector selector); public List nodes(); }
2、功能实现,由于接口方法比较多,选几个常用进行分析。
1) css方法 ($()方法) 由HtmlNode实现,HtmlNode的属性有List,在创建节点时就会进行赋值。 public Selectable $(String selector, String attrName) { CssSelector cssSelector = Selectors.$(selector, attrName); // 创建了一个css选择器 return selectElements(cssSelector); // 具体解析方法 } protected Selectable selectElements(BaseElementSelector elementSelector) { ListIterator elementIterator = getElements().listIterator(); if (!elementSelector.hasAttribute()) { // 无设置属性 List resultElements = new ArrayList (); while (elementIterator.hasNext()) { Element element = checkElementAndConvert(elementIterator); // 设置第一个节点为Document节点 List selectElements = elementSelector.selectElements(element); resultElements.addAll(selectElements); } return new HtmlNode(resultElements); // 默认htmlNode最后返回也是包装成htmlNode } else { // 有设置属性 // has attribute, consider as plaintext List resultStrings = new ArrayList (); while (elementIterator.hasNext()) { Element element = checkElementAndConvert(elementIterator); List selectList = elementSelector.selectList(element); resultStrings.addAll(selectList); } return new PlainText(resultStrings); // 包装成纯文本节点返回 } }
2)、regex 正则实现 public Selectable regex(String regex, int group) { RegexSelector regexSelector = Selectors.regex(regex, group); // 正则选择器 return selectList(regexSelector, getSourceTexts()); // getSourceTexts获取文本--由子类实现 } protected Selectable selectList(Selector selector, List3、selectable的使用strings) { List results = new ArrayList (); for (String string : strings) { List result = selector.selectList(string); results.addAll(result); } return new PlainText(results); } //getSourceTexts的子类实现---HtmlNode protected List getSourceTexts() { List sourceTexts = new ArrayList (getElements().size()); for (Element element : getElements()) { // 所有的element进行toString操作。 sourceTexts.add(element.toString()); } return sourceTexts; } //getSourceTexts的子类实现---PlainText protected List getSourceTexts() { return sourceTexts; }
3、selectable的使用 这些不同的selectable都挂载在Page对象 的属性下。 private Html html; private Json json; private String rawText; private Selectable url; public Html getHtml() { if (html == null) { html = new Html(rawText, request.getUrl()); } return html; } public Json getJson() { if (json == null) { json = new Json(rawText); } return json; } // 下载阶段设置的url属性包装的Selectable page.setUrl(new PlainText(request.getUrl())); // 可自由使用的rawText page.setRawText(new String(bytes, charset));
第二部分总结: webmagic讲待解析的元素包装成通用的接口selectable,将解析的各种操作包装成selector, selectable的继承链下主要分为文本节点和html节点,接口的api方法,都会返回selectable对象,而文本节点html节点可能存在不能通用操作的部分采用方法不支持进行实现,能够实现的方法通过调用不同的选择器进行实现。
end!
转载地址:http://vruni.baihongyu.com/