首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在循环或递归中使用AsynchronousSocketChannel#read?

如何在循环或递归中使用AsynchronousSocketChannel#read?
EN

Stack Overflow用户
提问于 2012-11-09 19:28:43
回答 4查看 7.5K关注 0票数 3

我找到了一个相关问题,但是它并不特别有用,因为它没有提供一个完整的示例。

问题:如何使用AsynchronousSocketChannel使用固定大小的缓冲区读取未知长度的数据

第一次尝试(读一次):

代码语言:javascript
复制
final int bufferSize = 1024;
final SocketAddress address = /*ip:port*/;
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
final ExecutorService executor = Executors.newCachedThreadPool(threadFactory);
final AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(executor, 5);
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(asyncChannelGroup);
client.connect(address).get(5, TimeUnit.SECONDS);//block until the connection is established

//write the request
Integer bytesWritten = client.write(StandardCharsets.US_ASCII.encode("a custom request in a binary format")).get();

//read the response
final ByteBuffer readTo = ByteBuffer.allocate(bufferSize);
final StringBuilder responseBuilder = new StringBuilder();
client.read(readTo, readTo, new CompletionHandler<Integer, ByteBuffer>() {
        public void completed(Integer bytesRead, ByteBuffer buffer) {
            buffer.flip();
            responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer).toString());
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
);
asyncChannelGroup.awaitTermination(5, TimeUnit.SECONDS);
asyncChannelGroup.shutdown();
System.out.println(responseBuilder.toString());

(即到达流的末尾)时,我需要对实现进行哪些更改,以实现对缓冲区的持续读取?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2012-11-14 20:13:58

下面是我所做的事情的简化版本(使用番石榴ListenableFuture):

代码语言:javascript
复制
class SomeUtilClass {
public interface Processor<T> {
    boolean process(Integer byteCount, ByteBuffer buffer);
    T result();
}
public static <T> ListenableFuture<T> read(
    final AsynchronousSocketChannel delegate,
    final Processor<T> processor,
    ByteBuffer buffer
) {
    final SettableFuture<T> resultFuture = SettableFuture.create();
    delegate.read(buffer, buffer, new Handler<T, Integer, ByteBuffer>(resultFuture) {
        public void completed(Integer bytesRead, ByteBuffer buffer) {
            buffer.flip();
            if(processor.process(bytesRead, buffer)) {
                buffer.clear();
                delegate.read(buffer, buffer, this);
            } else {
                resultFuture.set(processor.result());
            }
        }
    });
    return resultFuture;
}
}

进一步的改进包括使用公地池 of ByteBuffer

票数 1
EN

Stack Overflow用户

发布于 2012-11-10 22:35:34

在我看来,最简单的方法是将这段代码分割成自己的方法,然后让CompletionHandler在bytesRead != -1时递归地调用该方法。这样,您就可以分离代码的职责,避免在运行异步读取时“忙于等待”或使用Thread.sleep()。当然,您也可以在bytesRead == -1对已读取的数据进行处理时添加一个案例。

票数 0
EN

Stack Overflow用户

发布于 2012-11-10 23:12:37

我不会对回调方法failedcompleted做任何长时间的操作,因为它们运行在不受您控制的线程上。

我知道您希望继续侦听套接字中的新字节,即使流已到达其末尾(bytesRead == -1)。将read方法放入while(true)循环中。在其中,侦听由failedcompleted方法设置的failed字段。我们叫它myBytesRead吧。

为了能够停止无止境的读取,将while(true)替换为其他一些synchronized条件。

代码语言:javascript
复制
private static final BYTES_READ_INIT_VALUE = Integer.MIN_VALUE;
private static final BYTES_READ_COMPLETED_VALUE = -1;
private static final BYTES_READ_FAILED_VALUE = -2;
private Integer myBytesRead = BYTES_READ_INIT_VALUE;

private void setMyBytesRead(final Integer bytesRead) {
    synchronized(myBytesRead) {
        this.myBytesRead = bytesRead;
    }
}

private Integer getMyBytesRead() {
    synchronized(myBytesRead) {
        return myBytesRead;
    }
}

...

// in your method
while (true) {
    final int lastBytesRead = getMyBytesRead();
    if (lastBytesRead == BYTES_READ_FAILED_VALUE) {
        // log failure and retry?
    } else if (lastBytesRead != BYTES_READ_COMPLETED_VALUE) {
        // Thread.sleep( a while ); to avoid exhausting CPU
        continue;
    }

    // else lastBytesRead == BYTES_READ_COMPLETED_VALUE and you can start a new read operation
    client.read(readTo, readTo, new CompletionHandler<Integer, ByteBuffer>() {
            public void completed(Integer bytesRead, ByteBuffer buffer) {
                setMyBytesRead(bytesRead);
                buffer.flip();
                responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer).toString());
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    setMyBytesRead(BYTES_READ_FAILED_VALUE);
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    );
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13314650

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档