首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何了解OmniThreadLibrary中管道各阶段的状态?

如何了解OmniThreadLibrary中管道各阶段的状态?
EN

Stack Overflow用户
提问于 2015-04-14 08:12:46
回答 2查看 398关注 0票数 1

gabr's answer to another question展示了一个使用Parallel.Pipeline进行数据处理的示例。

目前,我需要知道管道何时启动,何时完成所有阶段。我读了另一篇关于这个问题的答案,How to monitor Pipeline stages in OmniThreadLibrary?。我试图这样做(根据answer修改):

代码语言:javascript
复制
unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FCounterTotal: IOmniCounter;
    FCounterProcessed: IOmniCounter;
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i, cnt: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min
    cnt := 0;

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Inc(cnt);
      Sleep(1000); // simulate a work
    end;
    FCounterTotal.Value := cnt;

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
  cnt: integer;
begin
  for value in input do begin
    JSON := value.AsInterface as ISuperObject;
    // do something with JSON

    cnt := FCounterProcessed.Increment;
    if FCounterTotal.Value = cnt then
      task.Comm.Send(WM_ENDED); // !!! message is not sent
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FCounterTotal := CreateCounter(-1);
  FCounterProcessed := CreateCounter(0);

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.

对于task.Comm.Send(WM_STARTED),一切都是可以的,但是行task.Comm.Send(WM_ENDED)永远不会被执行。我如何知道最后阶段何时完成?正确的方法是什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-04-16 12:26:42

我感谢gabr,他的建议使用了一个特殊的sentinel值,帮助我找到了解决问题的方法。此代码按预期工作:

代码语言:javascript
复制
unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Sleep(1000); // simulate a work
    end;
    output.TryAdd(0); // to send a special 'sentinel' value

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  if input.IsInteger and (input.AsInteger = 0) then begin
    output := 0; // if we got 'sentinel' value send it to the next stage
    Exit;
  end;

  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
begin
  for value in input do begin

    if value.IsInteger and (value.AsInteger = 0) then begin
      task.Comm.Send(WM_ENDED); // if we got 'sentinel' value
      Continue;
    end;

    JSON := value.AsInterface as ISuperObject;
    // do something with JSON
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.

使用异常作为哨兵的另一种选择(还没有起作用,但我可能做错了什么):

代码语言:javascript
复制
unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  ESentinelException = class(Exception);

  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Sleep(1000); // simulate a work
    end;

    raise ESentinelException.Create('sentinel');

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
begin
  for value in input do begin

    if value.IsException and (value.AsException is ESentinelException) then begin
      task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception
      value.AsException.Free;
    end
    else begin
      JSON := value.AsInterface as ISuperObject;
      // do something with JSON
    end;
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .HandleExceptions
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.
票数 2
EN

Stack Overflow用户

发布于 2015-04-15 12:58:35

您的方法(我最初建议的)有一个种族条件,这阻碍了它的工作。(对不起,这是我最初设计的一个缺陷。)

基本上,所发生的是:

  • Async_Files将最后一个文件发送到管道。
  • Async_Files块(模拟一些工作负载)。
  • Async_JSON接收并处理最后一个文件。
  • Async_Files现在设置FCounterTotal计数器。

此时,Async_JSON已经在等待下一个永远不会出现的数据,并且不再检查FCounterTotal值了。

替代方法是将一个特殊的sentinel值作为最后一项发送到管道中。

异常也可以用作哨兵。如果在第一阶段引发异常,它将通过管道“流”到可以处理它的末尾。不需要对任何特定的阶段进行特殊的工作--默认情况下,一个阶段只会重新划分一个异常。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29622261

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档