博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
webmagic的源码分析-核心流程和组件
阅读量:4074 次
发布时间:2019-05-25

本文共 21481 字,大约阅读时间需要 71 分钟。

概述:WebMagic的结构分为DownloaderPageProcessorSchedulerPipeline四大组件,并由Spider将它们彼此组织起来。这四大组件对应爬虫生命周期中的下载、处理、管理和持久化等功能。

webmagic 的启动类时Spider,启动时run()方法运行时,会进行初始化各种组件,然后循环运行,从scheduler中取request,包装成runnable,进行请求处理。这个流程中,Scheduler组件负责去重和装待处理的请求,Downloader组件负责下载请求结果包装成page类, pageProcessor组件负责处理page结果,并且可以从page中添加需要采集的另外url,用page.resultItems装处理的结果数据,给Pipeline组件处理结果。

具体使用请看官方文档,或搜索相关案例,本篇主要是对webmagic的源码解析。

    一、流程分析

1、主要方法    

public void run() {    	// 检查运行状态,并且compareAndSet成运行        checkRunningStat();        // 初始化组件        initComponent();        logger.info("Spider {} started!",getUUID());        // 当前线程非中断,运行状态持续运行。        while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {            final Request request = scheduler.poll(this);            if (request == null) {                if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {                    break;                }                // wait until new url added                waitNewUrl();            } else {                threadPool.execute(new Runnable() {                    @Override                    public void run() {                        try {                        	// 处理请求,下载page,pageProcess处理page,pipeline处理page                            processRequest(request);                            onSuccess(request);                        } catch (Exception e) {                            onError(request);                            logger.error("process request " + request + " error", e);                        } finally {                            pageCount.incrementAndGet();                            signalNewUrl();                        }                    }                });            }        }        // 设置状态为停止        stat.set(STAT_STOPPED);        if (destroyWhenExit) {            close();        }        logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());    }

      从上面可以看出,Spider就是协调各种组件执行,核心逻辑是从scheduler中取出request包装成任务交由线程池运行,几个主要的组件就是在这个流程中负责各自运行的职责。

   2)初始化组件initComponent

protected void initComponent() {        if (downloader == null) { // 默认Downloader为HttpClientDownloader            this.downloader = new HttpClientDownloader();        }        if (pipelines.isEmpty()) { // 默认Pipeline为ConsolePipeline            pipelines.add(new ConsolePipeline());        }        downloader.setThread(threadNum);        if (threadPool == null || threadPool.isShutdown()) {            if (executorService != null && !executorService.isShutdown()) {                threadPool = new CountableThreadPool(threadNum, executorService);            } else {                threadPool = new CountableThreadPool(threadNum); // 线程池为CountableThreadPool包装            }        }        if (startRequests != null) { // 初始添加请求到scheduler, 默认scheduler为QueueScheduler            for (Request request : startRequests) {                addRequest(request);            }            startRequests.clear();        }        startTime = new Date();    }

 几个核心组件的的初始实现downloader-HttpClientDownloader、pipelines-ConsolePipeline、scheduler-QueueScheduler pageProcessor-手动制定。

 3、处理请求方法processRequest

private void processRequest(Request request) {        Page page = downloader.download(request, this); // 下载-即通过HttpClient下载页面,并且构建Page对象。        if (page.isDownloadSuccess()){ // 下载成功            onDownloadSuccess(request, page);        } else { // 失败循环            onDownloaderFail(request);        }    }

 // 下载成功处理

    private void onDownloadSuccess(Request request, Page page) {        if (site.getAcceptStatCode().contains(page.getStatusCode())){            pageProcessor.process(page); // 先用pageProcessor处理page            extractAndAddRequests(page, spawnUrl); // 提取添加的request            if (!page.getResultItems().isSkip()) { // 如果没有设置isSkip                for (Pipeline pipeline : pipelines) {                     pipeline.process(page.getResultItems(), this); // 多个pipelines处理结果                }            }        }        sleep(site.getSleepTime());        return;    }

  下载成功的处理逻辑,就是进过pageProcessor进行处理page,然后提取addTragetRequest添加到scheduler中,然后在判断是否设置了isSkip,如果没有,那么就用多个pipelines进行结果的处理。

///下载失败的处理

private void onDownloaderFail(Request request) {        if (site.getCycleRetryTimes() == 0) { // 如果错误循环重试次数为0,那么只停顿            sleep(site.getSleepTime());        } else {             doCycleRetry(request); // 循环重试---详见下一代码片段        }    }

// 循环重试

private void doCycleRetry(Request request) {        Object cycleTriedTimesObject = request.getExtra(Request.CYCLE_TRIED_TIMES);        if (cycleTriedTimesObject == null) { // 循环重试次数0时,clone一个新请求添加设置次数1            addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, 1));        } else {            int cycleTriedTimes = (Integer) cycleTriedTimesObject;            cycleTriedTimes++; // 重试1次是,clone一个新请求,将附加参数递增。            if (cycleTriedTimes < site.getCycleRetryTimes()) {                 addRequest(SerializationUtils.clone(request).setPriority(0).putExtra(Request.CYCLE_TRIED_TIMES, cycleTriedTimes));            }        }        sleep(site.getRetrySleepTime());    }

 

 

对上面的流程,画了2个较官方的版本更为详细的一个流程图

 

二、组件分析-scheduler (调度)

SchedulerScheduler负责管理调度待抓取的URL,以及一些去重的工作。WebMagic默认提供了LinkedBlockingQueue的内存队列来管理URL,并用集合来进行去重。也支持使用Redis进行分布式管理。主要的scheduler实现有下面几种。

DuplicateRemovedScheduler: Scheduler基类,内置一个DuplicatedRemover实现为HashSetDuplicateRemover。

QueueScheduler:队列Scheduler, 就是通过一个LinkedBlockingQueue存储待采集连接。
FileCacheQueueScheduler:文件缓存队列Scheduler,在队列的基础上,通过文件记录所有urls,以及poll的位置, 如果临时停止,那么再次采集可以通过文件恢复。
RedisScheduler:使用Redis保存抓取队列,其中list保存queue队列,set负责去重,hash,存储请求附加数据。

2.1 DuplicateRemovedScheduler

public abstract class DuplicateRemovedScheduler implements Scheduler {			// 默认重复器		    private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();					    public DuplicateRemover getDuplicateRemover() {		        return duplicatedRemover;		    }					    public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) {		        this.duplicatedRemover = duplicatedRemover;		        return this;		    }						// 是post请求或循环重试或不重复才添加。		    @Override		    public void push(Request request, Task task) {		        logger.trace("get a candidate url {}", request.getUrl());		        if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) {		            logger.debug("push to queue {}", request.getUrl());		            pushWhenNoDuplicate(request, task);		        }		    }			// 循环重试的请求不进行判断		    protected boolean shouldReserved(Request request) {		        return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;		    }			// POST请求不进行判断		    protected boolean noNeedToRemoveDuplicate(Request request) {		        return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());		    }			// 子类负责实现		    protected void pushWhenNoDuplicate(Request request, Task task) {				    }		}

 2.2 QueueScheduler  内存队列url调度

QueueScheduler实现就是由内部的一个LinkedBlockingQueue负责get或者poll获取url。

2.3 RedisScheduler  Redis分布式url调度

1)成员变量

protected JedisPool pool;private static final String QUEUE_PREFIX = "queue_"; // 构建list队列的key的前缀private static final String SET_PREFIX = "set_"; // 构建set的key的前缀private static final String ITEM_PREFIX = "item_"; // 构建hash的key的前缀public RedisScheduler(JedisPool pool) {	this.pool = pool;	setDuplicateRemover(this); // 设置重复的实现为自身。}	protected String getSetKey(Task task) {		return SET_PREFIX + task.getUUID();	}	protected String getQueueKey(Task task) {		return QUEUE_PREFIX + task.getUUID();	}	protected String getItemKey(Task task) {		return ITEM_PREFIX + task.getUUID();	}

上面是成员变量,通过queue保存url队列,通过set负责去重,通过hash负责存储附属数据。

另外构造函数,传入了或自己构建jedisPool,并且自身实现了DuplicateRemover, 且设置到此对象的duplicateRemover字段中。

2) 去重实现

public boolean isDuplicate(Request request, Task task) {				Jedis jedis = pool.getResource();				try { 					return jedis.sadd(getSetKey(task), request.getUrl()) == 0;				} finally {					pool.returnResource(jedis);				}			}

3) push 

      public void pushWhenNoDuplicate(Request request, Task task) {                Jedis jedis = pool.getResource();                try {                    jedis.rpush(getQueueKey(task), request.getUrl());                    if (request.getExtras() != null) {                        String field = DigestUtils.md5Hex(request.getUrl());                        String value = JSON.toJSONString(request);                        jedis.hset(getItemKey(task), field, value);                    }                } finally {                    pool.returnResource(jedis);                }            }        

4) pool

 public Request poll(Task task) {            Jedis jedis = pool.getResource();                try {                    String url = jedis.lpop(getQueueKey(task));                    if (url == null) {                        return null;                    }                    String key = getItemKey(task);                    String field = DigestUtils.md5Hex(url);                    byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());                    if (bytes != null) {                        Request o = JSON.parseObject(new String(bytes), Request.class);                        return o;                    }                    return new Request(url);                } finally {                    pool.returnResource(jedis);                }            }

    2.3 FileCacheQueueScheduler 

FileCacheQueueScheduler是文件缓存队列Scheduler,其主要是利用2个文件分别记录所有urls和已经使用过的个数cursor,在push或poll方式执行时进行init方法的初始化,初始化时会读取2个文件,其中文件存储的所有urls添加到urls中,大于cursor数量的url包装成request添加到queue中。

 并且FileCacheQueueScheduler有2个printWriter初始化时创建,负责将push时,增加的url同时写入到此writer(增量), 另poll时,通过cursor记录poll的个数,这样恢复的时候,就可以通过cursor判断,大于cursor的url才是没有poll使用过的。

@Override			public Request poll(Task task) {				if (!inited.get()) {					init(task);				} 				// poll时候,需要文件记录cursor角标  (poll过的个数)				fileCursorWriter.println(cursor.incrementAndGet());				return queue.poll();			}						@Override			protected void pushWhenNoDuplicate(Request request, Task task) {				if (!inited.get()) {					init(task);				}				queue.add(request);				// push时候,需要文件记录url				fileUrlWriter.println(request.getUrl());			}						push和poll添加url和cursor到队列和urls时,同步写入到文件中。					private void init(Task task) {		        this.task = task;		        File file = new File(filePath);		        if (!file.exists()) {		            file.mkdirs();		        }		        readFile();		        initWriter();		        initFlushThread();		        inited.set(true);		        logger.info("init cache scheduler success");		    }

初始化方法主要是用来恢复urls和request的, 从file文件读取cursor值,从url文件读取urls。然后queue的恢复就是根据大于cursor的url进行构建的request(url)。

     三、组件分析-Downloader

Downloader负责从互联网上下载页面,以便后续处理。WebMagic默认使用了作为下载工具。

默认的Download实现是HttpClientDownloader。核心处理就是通过httpClient下载页面数据组装成Page对象。

public Page download(Request request, Task task) {        if (task == null || task.getSite() == null) {            throw new NullPointerException("task or site can not be null");        }        CloseableHttpResponse httpResponse = null;        CloseableHttpClient httpClient = getHttpClient(task.getSite());        Proxy proxy = proxyProvider != null ? proxyProvider.getProxy(task) : null;        HttpClientRequestContext requestContext = httpUriRequestConverter.convert(request, task.getSite(), proxy);        Page page = Page.fail();        try {            httpResponse = httpClient.execute(requestContext.getHttpUriRequest(), requestContext.getHttpClientContext());            page = handleResponse(request, request.getCharset() != null ? request.getCharset() : task.getSite().getCharset(), httpResponse, task);            onSuccess(request);            logger.info("downloading page success {}", request.getUrl());            return page;        } catch (IOException e) {            logger.warn("download page {} error", request.getUrl(), e);            onError(request);             return page;        } finally {            if (httpResponse != null) {                //ensure the connection is released back to pool                EntityUtils.consumeQuietly(httpResponse.getEntity());            }            if (proxyProvider != null && proxy != null) {                proxyProvider.returnProxy(proxy, page, task);            }        }    }

     四、组件分析-Pipeline

    Pipeline负责抽取结果的处理,包括计算、持久化到文件、数据库等。WebMagic默认提供了“输出到控制台”和“保存到文件”两种结果处理方案。

    1、控制台Pipeline

public class ConsolePipeline implements Pipeline {		    @Override	    public void process(ResultItems resultItems, Task task) {	        System.out.println("get page: " + resultItems.getRequest().getUrl());	        for (Map.Entry
entry : resultItems.getAll().entrySet()) { System.out.println(entry.getKey() + ":\t" + entry.getValue()); } } }

 2、文件Pipeline

public class FilePipeline extends FilePersistentBase implements Pipeline {    public FilePipeline() {        setPath("/data/webmagic/");    }    public FilePipeline(String path) {        setPath(path);    }    @Override    public void process(ResultItems resultItems, Task task) {        String path = this.path + PATH_SEPERATOR + task.getUUID() + PATH_SEPERATOR;        try {            PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(getFile(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".html")),"UTF-8"));            printWriter.println("url:\t" + resultItems.getRequest().getUrl());            for (Map.Entry
entry : resultItems.getAll().entrySet()) { if (entry.getValue() instanceof Iterable) { Iterable value = (Iterable) entry.getValue(); printWriter.println(entry.getKey() + ":"); for (Object o : value) { printWriter.println(o); } } else { printWriter.println(entry.getKey() + ":\t" + entry.getValue()); } } printWriter.close(); } catch (IOException e) { logger.warn("write file error", e); } }}

 

第二部分  webmagic解析工作

      webmagic的页面抽取解析工作,webmagic解析页面元素可以通过xpath、css、json、正则。

      抽取解析工作主要由2个顶级接口实现。
        Selectable:包装页面元素内容的一个链式解析操作的顶级接口。 (其中的一些api会调用不同的selector实现不同的解析方式)
        Selector:解析器的顶级接口

一、先从解析实际操作的Selector接口讲起。

顶级接口Selector常见几个实现类 

        BaseElementSelector: 是通过jsoup实现的一个基类,做了一个jsoup.parse(text),以及子类实现的模板方法
                CssSelector: css选择器的实现
                LinksSelector:css选择器的实现link提取   实现原理 element.select('a') element.attr("abs:href") or element.attr("href")
                XpathSelector:基于Xsoup(扩展jsoup)实现的xpath选择器的实现
        JsonPathSelector:基于jsonPath的一个json解析实现

1、顶级接口Selector		public interface Selector {					// 提取一个结果		    public String select(String text);				     // 提取所有的结果		    public List
selectList(String text); }
2、BaseElementSelector基类				扩展实现了ElementSelector接口,根据返回值是是Element和结果值是String,以及结果List还是single一共有4个未实现		的抽象方法,并且有四个已经实现的方法(通过Jsoup.parse(text) 将参数变成Element)				 public abstract Element selectElement(Element element);   		 public abstract List
selectElements(Element element); public String select(Element element); public List
selectList(Element element);

 

3、CssSelector类   		    		 元素返回值的实现很简单,就是调用Jsoup本身的节点api操作。   		    		    @Override		    public Element selectElement(Element element) {		        Elements elements = element.select(selectorText);		        if (CollectionUtils.isNotEmpty(elements)) {		            return elements.get(0);		        }		        return null;		    }				    @Override		    public List
selectElements(Element element) { return element.select(selectorText); } String返回值的实现,就要提取元素中的text字符串 public List
selectList(Element doc) { List
strings = new ArrayList
(); List
elements = selectElements(doc); if (CollectionUtils.isNotEmpty(elements)) { for (Element element : elements) { String value = getValue(element); // getValue获取每个元素的值,默认是提取outerHtml if (value != null) { strings.add(value); } } } return strings; }

4、XpathSelector类 --- xpath解析

        
            主要是调用基于Xsoup扩展了css选择器按照xpath规则选择的选择器,此扩展另行单独分析。

 

5、RegexSelector类 --- regex正则解析					  private void compileRegex(String regexStr) {		    // 大小写不分,点可以代表全部(默认不匹配换行)            this.regex = Pattern.compile(regexStr, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);            this.regexStr = regexStr;		  }				 // 构造方法		  public RegexSelector(String regexStr) {	         this.compileRegex(regexStr);	         if (regex.matcher("").groupCount() == 0) {	            this.group = 0; // 没有捕捉组是全部	         }  else {	            this.group = 1; // 有捕捉组是第一个捕捉组内容	         }		  }							// select实现	    @Override	    public String select(String text) {	        return selectGroup(text).get(group);	    }				// selectGroup所有的		public RegexResult selectGroup(String text) {	        Matcher matcher = regex.matcher(text);	        if (matcher.find()) {	            String[] groups = new String[matcher.groupCount() + 1]; // +1是因为全部结果计算为0.	            for (int i = 0; i < groups.length; i++) {	                groups[i] = matcher.group(i);	            }	            return new RegexResult(groups);	        }	        return RegexResult.EMPTY_RESULT;	    }

        二、Selectable 通用包装页面元素,并且提供多种解析方式的api接口,即可以通过调用不同的Selector实现不同的功能。

1、Selectable 顶级接口

        
        主要实现: AbstractSelectable
            HtmlNode : HTML元素
            PlainText : 纯文本

public interface Selectable {		    public Selectable xpath(String xpath);				    public Selectable $(String selector);				    public Selectable $(String selector, String attrName);				    public Selectable css(String selector);				    public Selectable css(String selector, String attrName);				    public Selectable smartContent();				    public Selectable links();				    public Selectable regex(String regex);				    public Selectable regex(String regex, int group);				    public Selectable replace(String regex, String replacement);				    public String toString();		    		    public String get();				    public boolean match();				    public List
all(); public Selectable jsonPath(String jsonPath); public Selectable select(Selector selector); public Selectable selectList(Selector selector); public List
nodes(); }

2、功能实现,由于接口方法比较多,选几个常用进行分析。

1) css方法 ($()方法) 			 由HtmlNode实现,HtmlNode的属性有List
,在创建节点时就会进行赋值。 public Selectable $(String selector, String attrName) { CssSelector cssSelector = Selectors.$(selector, attrName); // 创建了一个css选择器 return selectElements(cssSelector); // 具体解析方法 } protected Selectable selectElements(BaseElementSelector elementSelector) { ListIterator
elementIterator = getElements().listIterator(); if (!elementSelector.hasAttribute()) { // 无设置属性 List
resultElements = new ArrayList
(); while (elementIterator.hasNext()) { Element element = checkElementAndConvert(elementIterator); // 设置第一个节点为Document节点 List
selectElements = elementSelector.selectElements(element); resultElements.addAll(selectElements); } return new HtmlNode(resultElements); // 默认htmlNode最后返回也是包装成htmlNode } else { // 有设置属性 // has attribute, consider as plaintext List
resultStrings = new ArrayList
(); while (elementIterator.hasNext()) { Element element = checkElementAndConvert(elementIterator); List
selectList = elementSelector.selectList(element); resultStrings.addAll(selectList); } return new PlainText(resultStrings); // 包装成纯文本节点返回 } }
2)、regex 正则实现		 		  public Selectable regex(String regex, int group) {	        RegexSelector regexSelector = Selectors.regex(regex, group); // 正则选择器	        return selectList(regexSelector, getSourceTexts()); // getSourceTexts获取文本--由子类实现	    }	    	     protected Selectable selectList(Selector selector, List
strings) { List
results = new ArrayList
(); for (String string : strings) { List
result = selector.selectList(string); results.addAll(result); } return new PlainText(results); } //getSourceTexts的子类实现---HtmlNode protected List
getSourceTexts() { List
sourceTexts = new ArrayList
(getElements().size()); for (Element element : getElements()) { // 所有的element进行toString操作。 sourceTexts.add(element.toString()); } return sourceTexts; } //getSourceTexts的子类实现---PlainText protected List
getSourceTexts() { return sourceTexts; }

3、selectable的使用

3、selectable的使用    	    	这些不同的selectable都挂载在Page对象 的属性下。    		    private Html html;	    private Json json;	    private String rawText;	    private Selectable url;	    	    	public Html getHtml() {	        if (html == null) {	            html = new Html(rawText, request.getUrl());	        }	        return html;	    }	    		    public Json getJson() {	        if (json == null) {	            json = new Json(rawText);	        }	        return json;	    }	    	    // 下载阶段设置的url属性包装的Selectable	    page.setUrl(new PlainText(request.getUrl()));	    	    // 可自由使用的rawText	    page.setRawText(new String(bytes, charset));

   第二部分总结:   webmagic讲待解析的元素包装成通用的接口selectable,将解析的各种操作包装成selector, selectable的继承链下主要分为文本节点和html节点,接口的api方法,都会返回selectable对象,而文本节点html节点可能存在不能通用操作的部分采用方法不支持进行实现,能够实现的方法通过调用不同的选择器进行实现。

 

end!

 

 

 

 

 

 

 

 

 

 

转载地址:http://vruni.baihongyu.com/

你可能感兴趣的文章
webServer kzserver/1.0.0
查看>>
OS + Unix IBM Aix basic / topas / nmon / filemon / vmstat / iostat / sysstat/sar
查看>>
my ReadMap subway / metro / map / ditie / gaotie / traffic / jiaotong
查看>>
OS + Linux DNS Server Bind
查看>>
linux下安装django
查看>>
Android 解决TextView设置文本和富文本SpannableString自动换行留空白问题
查看>>
Android开发中Button按钮绑定监听器的方式完全解析
查看>>
Android自定义View实现商品评价星星评分控件
查看>>
postgresql监控工具pgstatspack的安装及使用
查看>>
postgresql查看表的和索引的情况,判断是否膨胀
查看>>
postgresql中根据oid和filenode去找表的物理文件的位置
查看>>
postgresql减少wal日志生成量的方法
查看>>
swift中单例的创建及销毁
查看>>
获取App Store中App的ipa包
查看>>
iOS 关于pods-frameworks.sh:permission denied报错的解决
查看>>
设置RGBColor
查看>>
设置tabbaritem的title的颜色及按钮图片
查看>>
动态设置label的高度
查看>>
获取 一个文件 在沙盒Library/Caches/ 目录下的路径
查看>>
图片压缩
查看>>