当使用BufferedReader从套接字读取时,它声明readLine()方法返回
一个字符串,包含行的内容,不包括任何行终止字符,如果到达流的末尾,则为null。
它怎么知道它已经到了溪流的尽头?它使用什么字符序列来确定这一点。
我想模拟发送相同的字符序列,以正确关闭另一个使用PipedStreams的连接。
编辑:这里是有问题的代码。从响应看,似乎没有这样的序列,对PipedOutput流调用close()应该解除对输出流上的readLine()的阻塞。它现在似乎没有这样做,这就是为什么我感到困惑,所以我认为它可能是其他地方的一个bug。
所发生的事情是,当incomingEventIn.close()被阻塞时,inputLine = incomingEventIn.readLine()线看起来是阻塞的。如果没有在另一个线程上执行inputLine = incomingEventIn.readLine(),那么incomingEventIn.close()就执行得很好。为什么会发生这种情况?
public class SocketManager {
private Socket socket = null;
private PrintWriter out = null;
private BufferedReader in = null;
private PipedOutputStream incomingEventOutStream = null;
private PrintWriter incomingEventOut = null;
private BufferedReader incomingEventIn = null;
private PipedOutputStream incomingResponsOutStream = null;
private PrintWriter incomingResponseOut = null;
private BufferedReader incomingResponseIn = null;
private ArrayList<AsteriskLiveComsEventListener> listeners = new ArrayList<AsteriskLiveComsEventListener>();
private final ExecutorService eventsDispatcherExecutor;
private String ip;
private int port;
private Object socketLock = new Object();
public SocketManager(String ip, int port) {
this.ip = ip;
this.port = port;
eventsDispatcherExecutor = Executors.newSingleThreadExecutor();
}
public void connect() throws UnableToConnectException, AlreadyConnectedException {
synchronized(socketLock) {
if (socket != null && !socket.isClosed()) {
throw (new AlreadyConnectedException());
}
try {
socket = new Socket(ip, port);
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
incomingEventOutStream = new PipedOutputStream();
incomingEventIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingEventOutStream)));
incomingEventOut = new PrintWriter(incomingEventOutStream);
incomingResponsOutStream = new PipedOutputStream();
incomingResponseIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingResponsOutStream)));
incomingResponseOut = new PrintWriter(incomingResponsOutStream);
} catch (IOException e) {
throw (new UnableToConnectException());
}
new Thread(new IncomingEventThread()).start();
new Thread(new SocketThread()).start();
}
}
public void disconnect() throws NotConnectedException {
disconnect(false);
}
private void disconnect(boolean notRequested) throws NotConnectedException {
synchronized(socketLock) {
if (!isConnected()) {
throw (new NotConnectedException());
}
try {
incomingEventIn.close();
} catch (IOException e2) {}
// IT NEVER GETS TO HERE!
incomingEventOut.close();
try {
incomingResponseIn.close();
} catch (IOException e1) {}
System.out.println("disconnecting");
incomingResponseOut.close();
try {
socket.shutdownInput();
} catch (IOException e) {}
try {
socket.shutdownOutput();
} catch (IOException e) {}
try {
socket.close();
} catch (IOException e) {}
if (notRequested) {
System.out.println("disconnecting event");
dispatchEvent(new ConnectionLostEvent());
}
}
}
public boolean isConnected() {
synchronized(socketLock) {
return (socket != null && !socket.isClosed());
}
}
public void addEventListener(AsteriskLiveComsEventListener a) {
synchronized(listeners) {
listeners.add(a);
}
}
public void removeEventListener(AsteriskLiveComsEventListener a) {
synchronized(listeners) {
listeners.remove(a);
}
}
private void dispatchEvent(final AsteriskLiveComsEvent e) {
synchronized (listeners) {
synchronized (eventsDispatcherExecutor) {
eventsDispatcherExecutor.execute(new Runnable()
{
public void run()
{
for(int i=0; i<listeners.size(); i++) {
listeners.get(i).onAsteriskLiveComsEvent(e);
}
}
});
}
}
}
public JSONObject sendRequest(JSONObject request) throws JSONException, NotConnectedException {
synchronized(socketLock) {
System.out.println("sending request "+request.toString());
out.println(request.toString());
try {
return new JSONObject(incomingResponseIn.readLine());
} catch (IOException e) {
// lets close the connection
try {
disconnect(true);
} catch (NotConnectedException e1) {}
throw(new NotConnectedException());
}
}
}
private class SocketThread implements Runnable {
@Override
public void run() {
String inputLine = null;
try {
while((inputLine = in.readLine()) != null) {
// determine if this is a response or event and send to necessary location
JSONObject lineJSON = new JSONObject(inputLine);
if (lineJSON.getString("type").equals("response")) {
incomingResponseOut.println(inputLine);
incomingResponseOut.flush();
}
else if (lineJSON.getString("type").equals("event")) {
incomingEventOut.println(inputLine);
incomingEventOut.flush();
}
}
if (isConnected()) {
try {
disconnect(true);
} catch (NotConnectedException e) {}
}
} catch (IOException e) {
// try and disconnect (if not already disconnected) and end thread
if (isConnected()) {
try {
disconnect(true);
} catch (NotConnectedException e1) {}
}
}
}
}
private class IncomingEventThread implements Runnable {
@Override
public void run() {
String inputLine = null;
try {
while((inputLine = incomingEventIn.readLine()) != null) {
JSONObject lineJSON = new JSONObject(inputLine);
String eventType = lineJSON.getString("eventType");
// determine what type of event it is and then fire one that represents it
if (eventType.equals("channelAdded")) {
JSONObject a = lineJSON.getJSONObject("payload");
Hashtable<String,Object> data = new Hashtable<String,Object>();
Object[] keys = a.keySet().toArray();
for(int i=0; i<keys.length; i++) {
data.put((String) keys[i], a.get((String) keys[i]));
}
dispatchEvent(new ChannelAddedEvent(data));
}
else if (eventType.equals("channelRemoved")) {
dispatchEvent(new ChannelRemovedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
}
else if (eventType.equals("channelsToRoom")) {
ArrayList<Integer> data = new ArrayList<Integer>();
JSONObject a = lineJSON.getJSONObject("payload");
JSONArray ids = a.getJSONArray("channelIds");
for(int i=0; i<ids.length(); i++) {
data.add(ids.getInt(i));
}
dispatchEvent(new ChannelsToRoomEvent(data));
}
else if (eventType.equals("channelToHolding")) {
dispatchEvent(new ChannelToHoldingEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
}
else if (eventType.equals("channelVerified")) {
dispatchEvent(new ChannelVerifiedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
}
else if (eventType.equals("serverResetting")) {
dispatchEvent(new ServerResettingEvent());
}
}
} catch (IOException e) {}
System.out.println("here");
}
}编辑2:,我认为这是一个死锁问题,因为如果我在调试器中在它前面放置了一些断点,它就会运行良好,inputLine = incomingEventIn.readLine()返回null。如果我试着正常运行它就会被锁上。
编辑3:通过格雷的回答解决了问题。在导致锁定的输出之前,输入流正在关闭。它必须是相反的方式。首先关闭输出流,然后通知输入流该流已关闭,并解除对readLine()方法的阻塞。
发布于 2013-07-22 21:49:08
它怎么知道它已经到了溪流的尽头?它使用什么字符序列来确定这一点。
答案是操作系统依赖,但操作系统‘我熟悉,没有读取EOF字符。操作系统向底层调用方返回表示流(文件描述符)已达到EOF的返回值。JVM看到返回值并返回适当的返回(null,-1,.)到InputStream或Reader调用者,具体取决于方法。
我想模拟发送相同的字符序列,以正确关闭另一个使用PipedStreams的连接。
如果正在从PipedReader读取数据,则关闭关联的PipedWriter。然后,Reader或InputStream将向调用方返回适当的EOF值。
编辑:
由于您的IncomingEventThread是从incomingEventIn读取的,所以disconnect()方法应该首先关闭incomingEventOut。线程本身应该关闭in侧。那你应该关闭回应。
我不会让线程调用disconnect(...)。它只应该关闭它的读者和作家,而不是所有的溪流。
发布于 2013-07-22 21:45:38
查看以下问题:文件末尾的字符是什么?
发布于 2013-07-22 21:50:18
从您的角度来看,只需在用于连接到测试的PipedOutputStream上调用PipedOutputStream即可。
套接字的实际close由客户端和服务器上的TCP堆栈执行。
应该这样做(请注意,您不能在同一个线程上读取/写入管道流,因此两个方法和一个线程创建):
void runTest ( final PipedInputStream sink ) throws Exception
{
try( final PipedOutputStream stream = new PipedOutputStream( sink ) )
{
try ( final OutputStreamWriter swriter =
new OutputStreamWriter( stream, "UTF-8" )
)
{
try ( final PrintWriter writer = new PrintWriter( swriter ) )
{
writer.println( "Hello" );
writer.println( "World!" );
}
}
}
}
void test ( final PipedInputStream sink ) throws InterruptedException
{
final Thread outputThread =
new Thread(
new Runnable ( )
{
@Override
public void run ( )
{
try
{
runTest( sink );
}
catch ( final Exception ex )
{
throw new RuntimeException( ex );
}
}
}
);
outputThread.start( );
outputThread.join( );
}https://stackoverflow.com/questions/17797969
复制相似问题