首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java并发任务实现

Java并发任务实现
EN

Stack Overflow用户
提问于 2014-04-14 21:44:10
回答 1查看 980关注 0票数 0

我的问题是:我最多可以运行三个并发任务。这些任务可以同时处理1到100个作业。我有很多线程不断地提交单个作业,我想尽快回复它们。在一个任务中处理100个作业所需的时间与在一个任务中处理一个作业所需的时间相同。每隔一段时间就会有工作出现。提交作业的线程需要阻塞,直到作业完成或超时。快速响应提交作业的线程是这里的驱动程序。

因此,我当前的逻辑是:如果有<3项任务正在运行,并且一个作业到达,则创建一个新任务来单独处理该任务。如果有3个任务正在运行,则将作业放在一个队列中,等待另一个任务完成,然后从队列中提取所有作业(限制为100),并创建一个任务来处理所有这些任务。

我只是不太确定在Java中设置这个程序的最佳方法。我创建了一个简单的信号量版本,它工作良好,但没有利用同时提交作业的能力。我应该如何最好地扩展它以完全满足我的要求?(没有使用信号量的要求,这正是我目前所掌握的)。

代码语言:javascript
复制
private static final Semaphore semaphore = new Semaphore(3);

public static Response doJob(Job job) throws Exception
{ 
    final boolean tryAcquire = this.semaphore.tryAcquire(this.maxWaitTime, TimeUnit.MILLISECONDS);

    if (tryAcquire)
    {
        try
        {
            return doJobInNewTask(job); // we'd actually like to do all the jobs which are queued up waiting for the semaphore (if there are any)
        }
        finally
        {
            this.semaphore.release()
        }       
    }
}
EN

回答 1

Stack Overflow用户

发布于 2014-04-14 22:01:21

您可以使用具有固定大小线程池的Executor服务:

代码语言:javascript
复制
class ExecutorExample {
    private final static ExecutorService executorService;
    private final static long maxWaitTime = 5000;

    static {
        executorService = Executors.newFixedThreadPool(3);
    }

    private static class Response {}
    private static class Job {}

    public static Response doJob(final Job job) throws Exception {
        final Future<Response> future = executorService.submit(
            new Callable<Response>() {
                @Override
                public Response call() throws Exception {
                    return doJobInNewTask(job);
                }
            }
        );
        try {
            // get() blocks until the task finishes.
            return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
        }
        catch (final TimeoutException e) {
            // we timed out, so *try* to cancel the task (may be too late)
            future.cancel(/*mayInterruptIfRunning:*/false);
            throw e;
        }
    }

    private static Response doJobInNewTask(final Job job) {
        try { Thread.sleep(maxWaitTime / 2); }
        catch (final InterruptedException ignored) {}
        return new Response();
    }

    public static void main(final String[] args) {
        final List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println(doJob(new Job()));
                    }
                    catch (final Exception e) {
                        System.out.println(e.getClass().getSimpleName());
                    }
                }
            };
            threads.add(t);
            t.start();
        }

        for (final Thread thread : threads) {
            try { thread.join(); }
            catch (final InterruptedException ignored) {}
        }

        System.out.println("Done!");
    }
}

输出:

代码语言:javascript
复制
ExecutorExample$Response@1fe4169
ExecutorExample$Response@9fdee
ExecutorExample$Response@15b123b
ExecutorExample$Response@bbfa5c
ExecutorExample$Response@10d95cd
ExecutorExample$Response@131de9b
TimeoutException
TimeoutException
TimeoutException
TimeoutException
Done!

这方面的一个潜在问题是取消。由于调度超出了您的控制范围,所以在等待任务超时之后,但在cancel()有机会完成任务之前,可能会启动任务。结果不会被传播,但是如果任务有有意义的副作用,这种方法可能会产生问题。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23070997

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档