我正在尝试将一个Peer.JS对等对象转换为一个可观察的dataConnections序列。但是,如下所示,接收连接请求的对等体的可观察性并不总是向其订阅者发出dataConnection,即使总是建立连接也是如此。
根据我所读到的内容,fromEvent()创建了热点观察点?我的订阅者是否因为订阅不够早而错过了dataConnection?
import Rx from 'rx';
import Peer from 'peerjs';
import config from '../config';
/**
* Converts a Peer.JS Peer into an Observable sequence of Peer.JS DataConnections
* @param {Peer} [peer] A Peer.js Peer
* @param {Observable} [connectionIds] An observable sequence of peerids that will be used to initiate dataConnections
* @returns {Observable} An observable sequence of DataConnections
*/
function fromPeer(peer, connectionIds) {
var fromEvent = Rx.Observable.fromEvent;
var throwError = Rx.Observable.throwError;
var open = fromEvent(peer, 'open');
var remoteConnections = fromEvent(peer, 'connection');
return Rx.Observable.when(
open.thenDo(function(){
var localConnections = connectionIds.map((id) => {return peer.connect(id);});
return remoteConnections.merge(localConnections);
})
).mergeAll();
}
function test0(){
var peer1 = new Peer("peer1", config.peer)
var peer2 = new Peer("peer2", config.peer)
var peer1connections = fromPeer(peer1, Rx.Observable.just("peer2"));
var peer2connections = fromPeer(peer2, Rx.Observable.empty());
peer1connections.subscribe(
(conn) => {console.log("Got a connection: " + conn.peer);},
(err) => {console.log("Error: " + err);},
() => {console.log("complete");}
);
peer2connections.subscribe(
(conn) => {console.log("Got a connection: " + conn.peer);},
(err) => {console.log("Error: " + err);},
() => {console.log("complete");}
);
peer1.on('open', ()=>{console.log("open1")})
peer2.on('open', ()=>{console.log("open2")})
peer2.on('connection', ()=>{console.log("connect2")})
};
// => Got a connection: peer2
// => open1
// => connect2
// => open2
// => Got a connection: peer1 -- This will not always be printed to the console.发布于 2015-12-21 12:57:48
我想我已经解决了这个问题。问题似乎是,如果对等体在发出“打开”之前发出“连接”,则对等体发出的“连接”事件的任何结果都不会合并到由fromPeer返回的可观察对象中。
因此,我使用ReplaySubject作为缓冲区。connectionSubject捕获任何DataConnections,并在flatMap操作符内部的回调运行时释放它们。
以下是修改后的fromPeer函数。希望这能帮助到一些人!
function fromPeer(peer, connectionIds) {
var fromEvent = Rx.Observable.fromEvent;
var throwError = Rx.Observable.throwError;
var connectionSubject = new Rx.ReplaySubject(10);
var open = fromEvent(peer, 'open');
var close = fromEvent(peer, 'close');
var disconnect = fromEvent(peer, 'disconnected');
var error = fromEvent(peer, 'error').flatMap(throwError);
var remoteConnections = fromEvent(peer, 'connection');
remoteConnections.subscribe(connectionSubject);
/*
* The Peer can fire a connection event before it receives an open event
* Therefore, remoteConnections must be buffered until the open event arrives
*/
return open.flatMap(
() => {
var localConnections = connectionIds.map((id) => {return peer.connect(id);});
return connectionSubject.merge(localConnections);
}
).merge(error).takeUntil(close.amb(disconnect));
}https://stackoverflow.com/questions/34217891
复制相似问题