首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在.NET 4.5中完成并行任务

如何在.NET 4.5中完成并行任务
EN

Stack Overflow用户
提问于 2013-02-11 13:12:11
回答 3查看 4.5K关注 0票数 9

我想将.NET迭代器与并行任务/等待?一起使用。如下所示:

代码语言:javascript
复制
IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    Parallel.ForEach(
        source,
        s=>
        {
            // Ordering is NOT important
            // items can be yielded as soon as they are done                
            yield return ExecuteOrDownloadSomething(s);
        }
}

不幸的是,.NET本身不能处理这个问题。到目前为止,最好的答案是@svick -使用AsParallel()。

奖励:有没有简单的异步/等待代码,可以实现多个发布者和一个订阅者?订阅者会让步,酒吧会处理。(仅限核心库)

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2013-02-11 18:11:34

这似乎是PLINQ的工作:

代码语言:javascript
复制
return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));

这将使用有限数量的线程并行执行委托,并在完成后立即返回每个结果。

如果ExecuteOrDownloadSomething()方法是IO绑定的(例如,它实际上下载了一些东西),并且您不想浪费线程,那么使用async-await可能是有意义的,但它会更复杂。

如果你想充分利用async,你不应该返回IEnumerable,因为它是同步的(即,如果没有可用的项目,它就会阻塞)。您需要的是某种异步收集,您可以使用TPL Dataflow中的ISourceBlock (具体地说,TransformBlock)来实现:

代码语言:javascript
复制
ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    var block = new TransformBlock<TSrc, TDest>(
        async s => await ExecuteOrDownloadSomethingAsync(s),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    return block;
}

如果源代码“很慢”(即您希望在完成source迭代之前开始处理来自Foo()的结果),您可能希望将foreachComplete()调用移动到单独的Task。更好的解决方案是把source也变成一个ISourceBlock<TSrc>

票数 11
EN

Stack Overflow用户

发布于 2013-02-11 23:16:25

因此,看起来您真正想要做的是根据任务的完成时间对一系列任务进行排序。这并不是非常复杂:

代码语言:javascript
复制
public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
    var input = tasks.ToList();

    var output = input.Select(task => new TaskCompletionSource<T>());
    var collection = new BlockingCollection<TaskCompletionSource<T>>();
    foreach (var tcs in output)
        collection.Add(tcs);

    foreach (var task in input)
    {
        task.ContinueWith(t =>
        {
            var tcs = collection.Take();
            switch (task.Status)
            {
                case TaskStatus.Canceled:
                    tcs.TrySetCanceled();
                    break;
                case TaskStatus.Faulted:
                    tcs.TrySetException(task.Exception.InnerExceptions);
                    break;
                case TaskStatus.RanToCompletion:
                    tcs.TrySetResult(task.Result);
                    break;
            }
        }
        , CancellationToken.None
        , TaskContinuationOptions.ExecuteSynchronously
        , TaskScheduler.Default);
    }

    return output.Select(tcs => tcs.Task);
}

因此,我们在这里为每个输入任务创建一个TaskCompletionSource,然后遍历每个任务,并设置一个从BlockingCollection获取下一个完成源并设置其结果的延续。完成的第一个任务获取返回的第一个tcs,完成的第二个任务获取返回的第二个tcs,依此类推。

现在你的代码变得非常简单了:

代码语言:javascript
复制
var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
    .Order();
foreach(var task in tasks)
{
    var result = task.Result;//or you could `await` each result
    //....
}
票数 1
EN

Stack Overflow用户

发布于 2013-02-11 16:52:02

在MS robotics团队制作的异步库中,他们有并发原语,允许使用迭代器生成异步代码。

这个库(CCR)是免费的(它过去不是免费的)。可以在这里找到一篇很好的介绍性文章:Concurrent affairs

也许你可以将这个库与.Net任务库一起使用,或者它会激励你“使用自己的库”。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/14806240

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档