rocketmq-client-go-v2.0.0/producer/producer.go func (p *defaultProducer) SendAsync(ctx context.Context = nil { primitive.WithMethod(ctx, primitive.SendAsync) return p.interceptor(ctx, msg , nil, func(ctx context.Context, req, reply interface{}) error { return p.sendAsync(ctx, msg, f) }) } return p.sendAsync(ctx, msg, f) } SendAsync方法主要是执行p.sendAsync(ctx, msg, f) sendAsync rocketmq-client-go-v2.0.0/producer/producer.go func (p *defaultProducer) sendAsync(ctx context.Context
(前文接口图紫色箭头) Task<Packet> SendAsync(Packet pk); Task<IMessage> SendAsync(IMessage msg); event EventHandler <MessageEventArgs> MessageReceived; 异步发送SendAsync,可以像事件模型那样在MessageReceived里面处理,也可以 var rs = await SendAsync(pk); 把异步转为同步操作,满足同步业务需求。 更为重要的是,SendAsync支持单连接通道并行多异步请求! 也就是说,在一个网络连接上,第一个请求的响应还没有收到之前,业务逻辑可以连续发出更多的请求,不管这些请求的响应包先后顺序以后,网络库都能够准确配对,让await SendAsync得到正确的结果。
rocketmq-client-go-v2.0.0/producer/producer.go func (p *defaultProducer) SendAsync(ctx context.Context = nil { primitive.WithMethod(ctx, primitive.SendAsync) return p.interceptor(ctx, msg , nil, func(ctx context.Context, req, reply interface{}) error { return p.sendAsync(ctx, msg, f) }) } return p.sendAsync(ctx, msg, f) } SendAsync方法主要是执行p.sendAsync(ctx, msg, f) sendAsync rocketmq-client-go-v2.0.0/producer/producer.go func (p *defaultProducer) sendAsync(ctx context.Context
<string, List<string>>(); public void Send(ChatMessage body) { Clients.All.SendAsync UserController 中,定义了上面的接口 SendToUser ,客户端传入用户昵称和消息,然后服务端就会去根据 ChatHub.UserList 成员查找目标用户的连接信息,最后,通过 SendAsync ("Recv", body); // 给当前连接对象发送消息 await Clients.Caller.SendAsync("Recv", body); // 给其它所有连接的客户端发送消息,除了当前正在连接的客户端 await Clients.Others.SendAsync("Recv", body); / / 查找当前所有连接的客户端(排除自己),如果是已加入此分组,则给他们推送消息 await Clients.OthersInGroup("groupName").SendAsync
class Program { static Task Main()=> SendAsync1(); private static async Task SendAsync1() class Program { static Task Main()=> SendAsync2(); private static async Task SendAsync2() class Program { static Task Main()=> SendAsync3(); private static async Task SendAsync3() class Program { static Task Main()=> SendAsync4(); private static async Task SendAsync4()
自定义消息处理器 要编写一个消息处理器,需要从System.Net.Http.DelegatingHandler进行派生,并重写SendAsync方法。 2.调用base.SendAsync将请求发送给内部处理器。 3.内部处理器返回一条响应消息。(这一步是异步的) 4.处理响应,并把他返回给调用者。 private int _count = 0; protected override System.Threading.Tasks.Task<HttpResponseMessage> SendAsync (request, cancellationToken); } } 对base.SendAsync的调用是异步的。 stream); } protected override async System.Threading.Tasks.Task<HttpResponseMessage> SendAsync
= null) { sendAsync(pair.getLeft(), pair.getRight()); } } //...... } 不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID); sendAsync (r, cc); } } AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync 不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync
magicHttpClient = new HttpClient(handlerMock.Object); 然后我花了 9.96 分钟研究了 HttpClient.GetStringAsync() 的源代码,发现它最终调用的是 SendAsync response = await base.SendAsync(request, cts.Token).ConfigureAwait(false); // ... } 源代码位置:https:/ 因此,我们的 Mock Setup 如下: handlerMock .Protected() .Setup<Task<HttpResponseMessage>>( "SendAsync .Protected() .Setup<Task<HttpResponseMessage>>( "SendAsync
(前文接口图紫色箭头) Task<Packet> SendAsync(Packet pk); Task<IMessage> SendAsync(IMessage msg); event EventHandler <MessageEventArgs> MessageReceived; 异步发送SendAsync,可以像事件模型那样在MessageReceived里面处理,也可以 var rs = await SendAsync(pk); 把异步转为同步操作,满足同步业务需求。 更为重要的是,SendAsync支持单连接通道并行多异步请求! 也就是说,在一个网络连接上,第一个请求的响应还没有收到之前,业务逻辑可以连续发出更多的请求,不管这些请求的响应包先后顺序以后,网络库都能够准确配对,让await SendAsync得到正确的结果。
cts.Token); _squares[square - 1] = 1; await hubContext.Clients.Client(_id2).SendAsync cts.Token); _squares[square - 1] = 2; await hubContext.Clients.Client(_id1).SendAsync squares[1] == _squares[2]) { await hubContext.Clients.Clients(_id1, _id2).SendAsync squares[3] == _squares[6]) { await hubContext.Clients.Clients(_id1, _id2).SendAsync squares[4] == _squares[8]) { await hubContext.Clients.Clients(_id1, _id2).SendAsync
///
<string, List<string>>(); public void Send(ChatMessage body) { Clients.All.SendAsync UserController 中,定义了上面的接口 SendToUser ,客户端传入用户昵称和消息,然后服务端就会去根据 ChatHub.UserList 成员查找目标用户的连接信息,最后,通过 SendAsync ("Recv", body); // 给当前连接对象发送消息 await Clients.Caller.SendAsync("Recv", body); // 给其它所有连接的客户端发送消息,除了当前正在连接的客户端 await Clients.Others.SendAsync("Recv", body); / / 查找当前所有连接的客户端(排除自己),如果是已加入此分组,则给他们推送消息 await Clients.OthersInGroup("groupName").SendAsync
不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync metrics.register(gaugeName, (Gauge<Long>) () -> (long) inflightMessages.size()); } public abstract void sendAsync cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID); sendAsync (r, cc); } } AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync 不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync
getString(MessageHeader.MESSAGE_ID), e); } } @Override public Promise<SendResult> sendAsync (final Message message) { return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout() ); } @Override public Promise<SendResult> sendAsync(final Message message, final KeyValue properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return sendAsync (message, timeout); } private Promise<SendResult> sendAsync(final Message message, long timeout
req.Age, Name = req.Name, CreateTime = DateTime.Now }; await SendAsync > { public override async Task HandleAsync(MyRequest req, CancellationToken ct) { await SendAsync AllowAnonymous(); } public override async Task HandleAsync(CancellationToken ct) { await SendAsync AllowAnonymous(); } public override async Task HandleAsync(CancellationToken ct) { await SendAsync ILogger<MyEndpoint>>(); logger.LogInformation("hello service is resolved..."); await SendAsync
ethGetTransactionCount = ethClient.ethGetTransactionCount( from, DefaultBlockParameterName.LATEST).sendAsync EthSendTransaction transactionResponse = ethClient.ethSendTransaction(transaction).sendAsync ethGetTransactionCount = ethClient.ethGetTransactionCount( from, DefaultBlockParameterName.LATEST).sendAsync EthSendTransaction transactionResponse = ethClient.ethSendTransaction(transaction).sendAsync
对于这个SendAsync方法来说,request参数表示传递给当前HttpMessageHandler进行处理的请求,这是一个HttpRequestMessage对象。 针对请求消息和响应消息的处理均体现在这个SendAsync方法上。 具体来说,针对请求消息的处理直接实现在SendAsync方法中,而针对响应消息的处理则通过其返回的Task<HttpResponseMessage>对象来完成。 DelegatingHandler重写了定义在其类的抽象方法SendAsync来调用InnerHandler属性的同名方法。 在重写的SendAsync方法中,如果自身尚未被初始化,该Initialize方法会自动被调用以确保整个消息处理管道已经被成功构建。
HttpClient 的发送请求函数 :SendAsync() public Task<HttpResponseMessage> SendAsync(HttpRequestMessage request 事实上通过阅读源码发现,几乎所有继承 HttpMessageHandle 的子类都有一个 HttpMessageHandle 类型的属性 : _handle,而每个子类的 SendAsync 函数都调用 _handle 的 SendAsync()。 很容易想像,HttpClient 的 SendAsync 函数是 一个 HttpMessageHandle 调用 下一个 HttpMessageHanlde 的SendAsync,而下一个 HttpMessageHandle 的SendAsync 是调用下下一个HttpMessageHandle 的 SendAsync 函数。
\GuzzleHttp\Psr7\Request('GET', 'http://www.sopans.com/about'); $promise = $client->sendAsync echo 111; }); $promise = $client->sendAsync request2)->then(function ($response)use($client,$request3) { $client->sendAsync
Name = req.Name, CreateTime = DateTime.Now }; await SendAsync public override async Task HandleAsync(MyRequest req, CancellationToken ct) { await SendAsync hello-world"); } public override async Task HandleAsync(CancellationToken ct) { await SendAsync hello-world"); } public override async Task HandleAsync(CancellationToken ct) { await SendAsync Resolve<ILogger<MyEndpoint>>(); logger.LogInformation("hello service is resolved..."); await SendAsync