首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何检查ConcurrentLinkedQueue的大小()或ConcurrentLinkedQueue()

如何检查ConcurrentLinkedQueue的大小()或ConcurrentLinkedQueue()
EN

Stack Overflow用户
提问于 2016-05-28 20:34:42
回答 1查看 868关注 0票数 1

我正在尝试用Java为Web爬虫构建一个简单的结构。到目前为止,原型只是试图做以下工作:

  • 使用启动URL列表初始化队列
  • 从队列中取出一个URL并提交到一个新线程
  • 做一些工作,然后将该URL添加到一组已经访问过的URL中

对于启动URL的队列,我使用ConcurrentLinkedQueue进行同步。为了产生新的线程,我正在使用ExecutorService

但是在创建新线程时,应用程序需要检查ConcurrentLinkedQueue是否为空。我试着用:

  • .size()
  • .isEmpty()

但两者似乎都没有返回ConcurrentLinkedQueue的真实状态。

问题出现在以下几个部分:

代码语言:javascript
复制
while (!crawler.getUrl_horizon().isEmpty()) {
                workers.submitNewWorkerThread(crawler);
            }

正因为如此,ExecutorService在其限制范围内创建了所有线程,即使输入只有2个URL。

在这里实现多线程的方式有问题吗?如果没有,检查ConcurrentLinkedQueue状态的更好方法是什么?

应用程序的起始类:

代码语言:javascript
复制
public class CrawlerApp {

    private static Crawler crawler;

    public static void main(String[] args) {
        crawler = = new Crawler();
        initializeApp();
        startCrawling();

    }

    private static void startCrawling() {
        crawler.setUrl_visited(new HashSet<URL>());
        WorkerManager workers = WorkerManager.getInstance();
        while (!crawler.getUrl_horizon().isEmpty()) {
            workers.submitNewWorkerThread(crawler);
        }
        try {
            workers.getExecutor().shutdown();
            workers.getExecutor().awaitTermination(10, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void initializeApp() {

        Properties config = new Properties();
        try {
            config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
            String[] horizon = config.getProperty("urls").split(",");
            ConcurrentLinkedQueue<URL> url_horizon = new ConcurrentLinkedQueue<>();
            for (String link : horizon) {
                URL url = new URL();
                url.setURL(link);
                url_horizon.add(url);
            }
            crawler.setUrl_horizon(url_horizon);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

Crawler.java,它维护URL队列和一组已经访问的URL。

代码语言:javascript
复制
public class Crawler implements Runnable {
    private ConcurrentLinkedQueue<URL> url_horizon;

    public void setUrl_horizon(ConcurrentLinkedQueue<URL> url_horizon) {
        this.url_horizon = url_horizon;
    }

    public ConcurrentLinkedQueue<URL> getUrl_horizon() {
        return url_horizon;
    }

    private Set<URL> url_visited;

    public void setUrl_visited(Set<URL> url_visited) {
        this.url_visited = url_visited;
    }

    public Set<URL> getUrl_visited() {
        return Collections.synchronizedSet(url_visited);
    }

    @Override
    public void run() {
        URL url = nextURLFromHorizon();
        scrap(url);
        addURLToVisited(url);

    }

    private URL nextURLFromHorizon() {
        if (!getUrl_horizon().isEmpty()) {
            URL url = url_horizon.poll();
            if (getUrl_visited().contains(url)) {
                return nextURLFromHorizon();
            }
            System.out.println("Horizon URL:" + url.getURL());
            return url;

        }
        return null;

    }

    private void scrap(URL url) {
        new Scrapper().scrap(url);
    }

    private void addURLToVisited(URL url) {
        System.out.println("Adding to visited set:" + url.getURL());
        getUrl_visited().add(url);
    }

}

URL.java只是一个带有private String url和overriden hashCode()equals()的类。

而且,到目前为止,Scrapper.scrap()只有虚拟实现:

代码语言:javascript
复制
public void scrap(URL url){
        System.out.println("Done scrapping:"+url.getURL());
    }

创建线程的WorkerManager

代码语言:javascript
复制
public class WorkerManager {
    private static final Integer WORKER_LIMIT = 10;
    private final ExecutorService executor = Executors.newFixedThreadPool(WORKER_LIMIT);

    public ExecutorService getExecutor() {
        return executor;
    }

    private static volatile WorkerManager instance = null;

    private WorkerManager() {
    }

    public static WorkerManager getInstance() {
        if (instance == null) {
            synchronized (WorkerManager.class) {
                if (instance == null) {
                    instance = new WorkerManager();
                }
            }
        }

        return instance;
    }

    public Future submitNewWorkerThread(Runnable run) {
        return executor.submit(run);
    }

}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-05-28 22:28:40

问题

您最终创建的线程比队列中有URL的线程更多的原因是,在您多次遍历while循环之前,执行器的线程都不可能启动(实际上也很可能)。

每当使用线程时,您都应该记住线程是独立调度的,并且以自己的速度运行,除非您显式地同步它们。在这种情况下,线程可以在submit()调用之后的任何时候启动,尽管您似乎希望每个线程在while循环中的下一个迭代之前开始并通过nextURLFromHorizon

解决方案

在将Runnable提交给执行器之前,考虑将URL从队列中删除。我还建议定义一个提交给执行器一次的CrawlerTask,而不是反复提交的Crawler。在这样的设计中,您甚至不需要一个线程安全容器来刮掉URL。

代码语言:javascript
复制
class CrawlerTask extends Runnable {
   URL url;

   CrawlerTask(URL url) { this.url = url; }

   @Override
   public void run() {
     scrape(url);
     // add url to visited?
   }
}

class Crawler {
  ExecutorService executor;
  Queue urlHorizon;

  //...

  private static void startCrawling() {
    while (!urlHorizon.isEmpty()) {
      executor.submit(new CrawlerTask(urlHorizon.poll());
    }
    // ...
  }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37503526

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档