我正在尝试用Java为Web爬虫构建一个简单的结构。到目前为止,原型只是试图做以下工作:
对于启动URL的队列,我使用ConcurrentLinkedQueue进行同步。为了产生新的线程,我正在使用ExecutorService。
但是在创建新线程时,应用程序需要检查ConcurrentLinkedQueue是否为空。我试着用:
.size().isEmpty()但两者似乎都没有返回ConcurrentLinkedQueue的真实状态。
问题出现在以下几个部分:
while (!crawler.getUrl_horizon().isEmpty()) {
workers.submitNewWorkerThread(crawler);
}正因为如此,ExecutorService在其限制范围内创建了所有线程,即使输入只有2个URL。
在这里实现多线程的方式有问题吗?如果没有,检查ConcurrentLinkedQueue状态的更好方法是什么?
应用程序的起始类:
public class CrawlerApp {
private static Crawler crawler;
public static void main(String[] args) {
crawler = = new Crawler();
initializeApp();
startCrawling();
}
private static void startCrawling() {
crawler.setUrl_visited(new HashSet<URL>());
WorkerManager workers = WorkerManager.getInstance();
while (!crawler.getUrl_horizon().isEmpty()) {
workers.submitNewWorkerThread(crawler);
}
try {
workers.getExecutor().shutdown();
workers.getExecutor().awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void initializeApp() {
Properties config = new Properties();
try {
config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
String[] horizon = config.getProperty("urls").split(",");
ConcurrentLinkedQueue<URL> url_horizon = new ConcurrentLinkedQueue<>();
for (String link : horizon) {
URL url = new URL();
url.setURL(link);
url_horizon.add(url);
}
crawler.setUrl_horizon(url_horizon);
} catch (IOException e) {
e.printStackTrace();
}
}
}Crawler.java,它维护URL队列和一组已经访问的URL。
public class Crawler implements Runnable {
private ConcurrentLinkedQueue<URL> url_horizon;
public void setUrl_horizon(ConcurrentLinkedQueue<URL> url_horizon) {
this.url_horizon = url_horizon;
}
public ConcurrentLinkedQueue<URL> getUrl_horizon() {
return url_horizon;
}
private Set<URL> url_visited;
public void setUrl_visited(Set<URL> url_visited) {
this.url_visited = url_visited;
}
public Set<URL> getUrl_visited() {
return Collections.synchronizedSet(url_visited);
}
@Override
public void run() {
URL url = nextURLFromHorizon();
scrap(url);
addURLToVisited(url);
}
private URL nextURLFromHorizon() {
if (!getUrl_horizon().isEmpty()) {
URL url = url_horizon.poll();
if (getUrl_visited().contains(url)) {
return nextURLFromHorizon();
}
System.out.println("Horizon URL:" + url.getURL());
return url;
}
return null;
}
private void scrap(URL url) {
new Scrapper().scrap(url);
}
private void addURLToVisited(URL url) {
System.out.println("Adding to visited set:" + url.getURL());
getUrl_visited().add(url);
}
}URL.java只是一个带有private String url和overriden hashCode()和equals()的类。
而且,到目前为止,Scrapper.scrap()只有虚拟实现:
public void scrap(URL url){
System.out.println("Done scrapping:"+url.getURL());
}创建线程的WorkerManager:
public class WorkerManager {
private static final Integer WORKER_LIMIT = 10;
private final ExecutorService executor = Executors.newFixedThreadPool(WORKER_LIMIT);
public ExecutorService getExecutor() {
return executor;
}
private static volatile WorkerManager instance = null;
private WorkerManager() {
}
public static WorkerManager getInstance() {
if (instance == null) {
synchronized (WorkerManager.class) {
if (instance == null) {
instance = new WorkerManager();
}
}
}
return instance;
}
public Future submitNewWorkerThread(Runnable run) {
return executor.submit(run);
}
}发布于 2016-05-28 22:28:40
问题
您最终创建的线程比队列中有URL的线程更多的原因是,在您多次遍历while循环之前,执行器的线程都不可能启动(实际上也很可能)。
每当使用线程时,您都应该记住线程是独立调度的,并且以自己的速度运行,除非您显式地同步它们。在这种情况下,线程可以在submit()调用之后的任何时候启动,尽管您似乎希望每个线程在while循环中的下一个迭代之前开始并通过nextURLFromHorizon。
解决方案
在将Runnable提交给执行器之前,考虑将URL从队列中删除。我还建议定义一个提交给执行器一次的CrawlerTask,而不是反复提交的Crawler。在这样的设计中,您甚至不需要一个线程安全容器来刮掉URL。
class CrawlerTask extends Runnable {
URL url;
CrawlerTask(URL url) { this.url = url; }
@Override
public void run() {
scrape(url);
// add url to visited?
}
}
class Crawler {
ExecutorService executor;
Queue urlHorizon;
//...
private static void startCrawling() {
while (!urlHorizon.isEmpty()) {
executor.submit(new CrawlerTask(urlHorizon.poll());
}
// ...
}
}https://stackoverflow.com/questions/37503526
复制相似问题