Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
鉴于目前webmagic主要参考与scrapy,我找到了scrapy的redis分布式方案scrapy-redis.
通过对比当前webmagic源码与scrapy-redis源码发现,RedisScheduler中缺少对于queue的清理。
以scrapy-redis中的源码为例
scrapy-redis/src/scrapy-redis/scheduler.py
...
class Scheduler(object):
...
def open(self, spider):
...
if self.flush_on_start:
self.flush()
notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
scrapy-redis/src/scrapy_redis/queue.py
class Base(object):
...
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
...
scrapy-redis/src/scrapy_redis/dupefilter.py
class RFPDupeFilter(BaseDupeFilter):
...
def close(self, reason=''):
"""Delete data on close. Called by Scrapy's scheduler.
Parameters
reason : str, optional
"""
self.clear()
def clear(self):
"""Clears fingerprints data."""
self.server.delete(self.key)
...
个人对python不熟悉,简单理解了一下,flush_on_start可以设置为启动的时候对当前实例执行flush清理。或者使用者可以主动调用flush()根据自己的情况进行清理
这里的清理是对key的完整清理。
反观Webmagic这边
webmagic/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java
public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {
private static final String QUEUE_PREFIX = "queue_";
private static final String SET_PREFIX = "set_";
private static final String ITEM_PREFIX = "item_";
...
@OverRide
public void resetDuplicateCheck(Task task) {
Jedis jedis = pool.getResource();
try {
jedis.del(getSetKey(task));
} finally {
pool.returnResource(jedis);
}
}
...
protected String getSetKey(Task task) {
return SET_PREFIX + task.getUUID();
}
protected String getQueueKey(Task task) {
return QUEUE_PREFIX + task.getUUID();
}
protected String getItemKey(Task task)
{
return ITEM_PREFIX + task.getUUID();
}
...
这里只有DuplicateRemover接口中定义了一个resetDuplicateCheck方法对set_这个key进行了清理动作。
场景举例:目前有多台爬虫机器,一个redis服务。如果这个爬虫集群处理完第一批数据后,理论上来说第
二批数据属于主观需要抓取的,无论是与第一批数据是否重复。那么这里就需要清理之前的queue。
但是实际上的效果是,目前只能调用resetDuplicateCheck进行排除重复。但是随着数据量的持续上升
item_TASKID的队列一直没有得到清理。如下图所示:
redis内存使用(图太久了已裂)
实际情况(图太久了已裂)
其中set_可以得到清理。