引言
主要目的是为了搞清楚众多的socket
正文
1、RTSPClient::sendDescribeCommand
unsigned RTSPClient::sendDescribeCommand(responseHandler* responseHandler, Authenticator* authenticator) {
if (fCurrentAuthenticator < authenticator) fCurrentAuthenticator = *authenticator;
return sendRequest(new RequestRecord(++fCSeq, "DESCRIBE", responseHandler));
}
unsigned RTSPClient::sendRequest(RequestRecord* request) {
char* cmd = NULL;
do {
Boolean connectionIsPending = False;
if (!fRequestsAwaitingConnection.isEmpty()) {
// A connection is currently pending (with at least one enqueued request). Enqueue this request also:
connectionIsPending = True;
} else if (fInputSocketNum < 0) { // we need to open a connection
int connectResult = openConnection();
if (connectResult < 0) break; // an error occurred
else if (connectResult == 0) {
// A connection is pending
connectionIsPending = True;
} // else the connection succeeded. Continue sending the command.
}
if (connectionIsPending) {
fRequestsAwaitingConnection.enqueue(request);
return request->cseq();
}........
if (send(fOutputSocketNum, cmd, strlen(cmd), 0) < 0) {
char const* errFmt = "%s send() failed: ";
unsigned const errLength = strlen(errFmt) + strlen(request->commandName());
char* err = new char[errLength];
sprintf(err, errFmt, request->commandName());
envir().setResultErrMsg(err);
delete[] err;
break;
}
// The command send succeeded, so enqueue the request record, so that its response (when it comes) can be handled.
// However, note that we do not expect a response to a POST command with RTSP-over-HTTP, so don't enqueue that.
int cseq = request->cseq();
if (fTunnelOverHTTPPortNum == 0 || strcmp(request->commandName(), "POST") != 0) {
fRequestsAwaitingResponse.enqueue(request); //加入fRequestsAwaitingResponse队列
} else {
delete request;
}
delete[] cmd;
return cseq;
} while (0);
RTSP交互用的TCP socket,就是在openConnection里面创建的:
int RTSPClient::openConnection() {
do {
// Set up a connection to the server. Begin by parsing the URL:
char* username;
char* password;
NetAddress destAddress;
portNumBits urlPortNum;
char const* urlSuffix;
if (!parseRTSPURL(envir(), fBaseURL, username, password, destAddress, urlPortNum, &urlSuffix)) break;
portNumBits destPortNum = fTunnelOverHTTPPortNum == 0 ? urlPortNum : fTunnelOverHTTPPortNum;
if (username != NULL || password != NULL) {
fCurrentAuthenticator.setUsernameAndPassword(username, password);
delete[] username;
delete[] password;
}
// We don't yet have a TCP socket (or we used to have one, but it got closed). Set it up now.
fInputSocketNum = setupStreamSocket(envir(), 0); //RTSP交互TCP socket
if (fInputSocketNum < 0) break;
ignoreSigPipeOnSocket(fInputSocketNum); // so that servers on the same host that get killed don't also kill us
if (fOutputSocketNum < 0) fOutputSocketNum = fInputSocketNum;
// Connect to the remote endpoint:
fServerAddress = *(netAddressBits*)(destAddress.data());
int connectResult = connectToServer(fInputSocketNum, destPortNum); //建立tcp连接
if (connectResult < 0) break;
else if (connectResult > 0) {
// The connection succeeded. Arrange to handle responses to requests sent on it:
envir().taskScheduler().setBackgroundHandling(fInputSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION,
(TaskScheduler::BackgroundHandlerProc*)&incomingDataHandler, this); //在建立连接时马上向任务计划中加入处理从这个TCP接收数据的socket
handler:RTSPClient::incomingDataHandler()
}
}
return connectResult;while (0);
resetTCPSockets();
return -1;
}
收到Response后,inCommingDataHandler被调用,辗转调用handleResponseBytes:
void RTSPClient::incomingDataHandler(void* instance, int /*mask*/) {
RTSPClient* client = (RTSPClient*)instance;
client->incomingDataHandler1();
}
void RTSPClient::incomingDataHandler1() {
struct sockaddr_in dummy; // 'from' address - not used
int bytesRead = readSocket(envir(), fInputSocketNum, (unsigned char*)&fResponseBuffer[fResponseBytesAlreadySeen], fResponseBufferBytesLeft, dummy);
handleResponseBytes(bytesRead);
}void RTSPClient::handleResponseBytes(int newBytesRead) {
// Find the handler function for "cseq":
RequestRecord* request;
while ((request = fRequestsAwaitingResponse.dequeue()) != NULL) {
if (request->cseq() < cseq) { // assumes that the CSeq counter will never wrap around
// We never received (and will never receive) a response for this handler, so delete it:
if (fVerbosityLevel >= 1 && strcmp(request->commandName(), "POST") != 0) {
envir() << "WARNING: The server did not respond to our \"" << request->commandName() << "\" request (CSeq: "
<< request->cseq() << "). The server appears to be buggy (perhaps not handling pipelined requests properly).\n";
}
delete request;
} else if (request->cseq() == cseq) {
// This is the handler that we want. Remove its record, but remember it, so that we can later call its handler:
foundRequest = request;
break;
else { // request->cseq() > cseq
// No handler was registered for this response, so ignore it.
break;
}
}
if (foundRequest == NULL) {
// Hack: The response didn't have a "CSeq:" header; assume it's for our most recent request:
foundRequest = fRequestsAwaitingResponse.dequeue();
}
if (foundRequest != NULL) {
Boolean needToResendCommand = False; // by default
if (responseCode == 200) {
// Do special-case response handling for some commands:
if (strcmp(foundRequest->commandName(), "SETUP") == 0) {
if (!handleSETUPResponse(*foundRequest->subsession(), sessionParamsStr, transportParamsStr, foundRequest->booleanFlags()&0x1)) break;
} else if (strcmp(foundRequest->commandName(), "PLAY") == 0) {
if (!handlePLAYResponse(*foundRequest->session(), *foundRequest->subsession(), scaleParamsStr, speedParamsStr, rangeParamsStr, rtpInfoParamsStr)) break;
} else if (strcmp(foundRequest->commandName(), "TEARDOWN") == 0) {
if (!handleTEARDOWNResponse(*foundRequest->session(), *foundRequest->subsession())) break;
} else if (strcmp(foundRequest->commandName(), "GET_PARAMETER") == 0) {
if (!handleGET_PARAMETERResponse(foundRequest->contentStr(), bodyStart, responseEnd)) break;
}
}
if (foundRequest != NULL && foundRequest->handler() != NULL) {
int resultCode;
char* resultString;
if (responseSuccess) {
if (responseCode == 200) {
resultCode = 0;
resultString = numBodyBytes > 0 ? strDup(bodyStart) : strDup(publicParamsStr);
// Note: The "strDup(bodyStart)" call assumes that the body is encoded without interior '\0' bytes
} else {
resultCode = responseCode;
resultString = strDup(responseStr);
envir().setResultMsg(responseStr);
}
(*foundRequest->handler())(this, resultCode, resultString); //调用Request的回调函数(handler)
} else {
// An error occurred parsing the response, so call the handler, indicating an error:
handleRequestError(foundRequest);
}
}
DESCRIBE Request收到回复后,根据回复的sdp创建MediaSession(内部再创建MediaSubsession):
void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
do {
UsageEnvironment& env = rtspClient->envir(); // alias
StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias
if (resultCode != 0) {
env << *rtspClient << "Failed to get a SDP description: " << resultString << "\n";
delete[] resultString;
break;
}
char* const sdpDescription = resultString;
env << *rtspClient << "Got a SDP description:\n" << sdpDescription << "\n";
// Create a media session object from this SDP description:
scs.session = MediaSession::createNew(env, sdpDescription);
delete[] sdpDescription; // because we don't need it anymore
if (scs.session == NULL) {
env << *rtspClient << "Failed to create a MediaSession object from the SDP description: " << env.getResultMsg() << "\n";
break;
} else if (!scs.session->hasSubsessions()) {
env << *rtspClient << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
break;
}
// Then, create and set up our data source objects for the session. We do this by iterating over the session's 'subsessions',
// calling "MediaSubsession::initiate()", and then sending a RTSP "SETUP" command, on each one.
// (Each 'subsession' will have its own data source.)
scs.iter = new MediaSubsessionIterator(*scs.session);
setupNextSubsession(rtspClient); }
return;
while (0);
setupNextSubsession会遍历MediaSession里面的MediaSubsession,分别调用initiate进行初始化,并调用SETUP Request:
void setupNextSubsession(RTSPClient* rtspClient) {
UsageEnvironment& env = rtspClient->envir(); // alias
StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias
scs.subsession = scs.iter->next();
if (scs.subsession != NULL) {
if (!scs.subsession->initiate()) {
env << *rtspClient << "Failed to initiate the \"" << *scs.subsession << "\" subsession: " << env.getResultMsg() << "\n";
setupNextSubsession(rtspClient); // give up on this subsession; go to the next one
} else {
env << *rtspClient << "Initiated the \"" << *scs.subsession << "\" subsession (";
if (scs.subsession->rtcpIsMuxed()) {
env << "client port " << scs.subsession->clientPortNum();
} else {
env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1;
}
env << ")\n";
// Continue setting up this subsession, by sending a RTSP "SETUP" command:
rtspClient->sendSetupCommand(*scs.subsession, continueAfterSETUP, False, REQUEST_STREAMING_OVER_TCP);
}
return;
}
// We've finished setting up all of the subsessions. Now, send a RTSP "PLAY" command to start the streaming:
if (scs.session->absStartTime() != NULL) {
// Special case: The stream is indexed by 'absolute' time, so send an appropriate "PLAY" command:
rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY, scs.session->absStartTime(), scs.session->absEndTime());
} else {
scs.duration = scs.session->playEndTime() - scs.session->playStartTime();
rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY);
}
}
Boolean MediaSubsession::initiate(int useSpecialRTPoffset) {
// Create a new socket:
if (isSSM()) {
fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, 0);
} else {
fRTPSocket = new Groupsock(env(), tempAddr, 0, 255); //创建RTP传输用socket,端口后会根据sdp来决定是server端指定还是client自动分配(setup的时候反馈给server端)
}
if (fRTPSocket == NULL) {
env().setResultMsg("MediaSession::initiate(): unable to create RTP and RTCP sockets");
break;
}
// Get the client port number:
Port clientPort(0);
if (!getSourcePort(env(), fRTPSocket->socketNum(), clientPort)) {
break;
}
fClientPortNum = ntohs(clientPort.num());
if (fMultiplexRTCPWithRTP) {
// Use this RTP 'groupsock' object for RTCP as well:
fRTCPSocket = fRTPSocket;
success = True;
break;
}
// Make sure we can use the next (i.e., odd) port number, for RTCP:
portNumBits rtcpPortNum = fClientPortNum|1;
if (isSSM()) {
fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);
} else {
fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);
}
if (fRTCPSocket != NULL && fRTCPSocket->socketNum() >= 0) {
// Success! Use these two sockets.
success = True;
break;
}
// Try to use a big receive buffer for RTP - at least 0.1 second of
// specified bandwidth and at least 50 KB
unsigned rtpBufSize = fBandwidth * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes
if (rtpBufSize < 50 * 1024)
rtpBufSize = 50 * 1024;
increaseReceiveBufferTo(env(), fRTPSocket->socketNum(), rtpBufSize);
if (isSSM() && fRTCPSocket != NULL) {
// Special case for RTCP SSM: Send RTCP packets back to the source via unicast:
fRTCPSocket->changeDestinationParameters(fSourceFilterAddr,0,~0);
}
// Create "fRTPSource" and "fReadSource":
if (!createSourceObjects(useSpecialRTPoffset)) break;
if (fReadSource == NULL) {
env().setResultMsg("Failed to create read source");
break;
}
// Finally, create our RTCP instance. (It starts running automatically)
if (fRTPSource != NULL && fRTCPSocket != NULL) {
// If bandwidth is specified, use it and add 5% for RTCP overhead.
// Otherwise make a guess at 500 kbps.
unsigned totSessionBandwidth = fBandwidth ? fBandwidth + fBandwidth / 20 : 500;
fRTCPInstance = RTCPInstance::createNew(env(), fRTCPSocket,
totSessionBandwidth,
(unsigned char const*)
fParent.CNAME(),
NULL /* we're a client */,
fRTPSource);
if (fRTCPInstance == NULL) {
env().setResultMsg("Failed to create RTCP instance");
break;
}
}
Boolean MediaSubsession::createSourceObjects(int useSpecialRTPoffset) {
// Check "fCodecName" against the set of codecs that we support,
// and create our RTP source accordingly
// (Later make this code more efficient, as this set grows #####)
// (Also, add more fmts that can be implemented by SimpleRTPSource#####)
} else if (strcmp(fCodecName, "H264") == 0) {
fReadSource = fRTPSource = H264VideoRTPSource::createNew(env(), fRTPSocket,fRTPPayloadFormat,fRTPTimestampFrequency) 最后,调用createSourceObjects创建Source,有了source,就可以通过调用getNextFrame函数获取到数据了,当然也可以再创建Sink,然后由Sink来间接调用getNextFrame。
2、RTSPClient::sendSetupCommand
unsigned RTSPClient::sendSetupCommand(MediaSubsession& subsession, responseHandler* responseHandler,
Boolean streamOutgoing, Boolean streamUsingTCP, Boolean forceMulticastOnUnspecified,
Authenticator* authenticator) {
if (fTunnelOverHTTPPortNum != 0) streamUsingTCP = True; // RTSP-over-HTTP tunneling uses TCP (by definition)
if (fCurrentAuthenticator < authenticator) fCurrentAuthenticator = *authenticator;
u_int32_t booleanFlags = 0;
if (streamUsingTCP) booleanFlags |= 0x1;
if (streamOutgoing) booleanFlags |= 0x2;
if (forceMulticastOnUnspecified) booleanFlags |= 0x4;
return sendRequest(new RequestRecord(++fCSeq, "SETUP", responseHandler, NULL, &subsession, booleanFlags
));
}
void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
do {
// Having successfully setup the subsession, create a data sink for it, and call "startPlaying()" on it.
// (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later,
// after we've sent a RTSP "PLAY" command.)
scs.subsession->sink = DummySink::createNew(env, *scs.subsession, rtspClient->url());
// perhaps use your own custom "MediaSink" subclass instead
if (scs.subsession->sink == NULL) {
env << *rtspClient << "Failed to create a data sink for the \"" << *scs.subsession
<< "\" subsession: " << env.getResultMsg() << "\n";
break;
}
env << *rtspClient << "Created a data sink for the \"" << *scs.subsession << "\" subsession\n";
scs.subsession->miscPtr = rtspClient; // a hack to let subsession handler functions get the "RTSPClient" from the subsession
scs.subsession->sink->startPlaying(*(scs.subsession->readSource()),
subsessionAfterPlaying, scs.subsession);
// Also set a handler to be called if a RTCP "BYE" arrives for this subsession:
if (scs.subsession->rtcpInstance() != NULL) {
scs.subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, scs.subsession);
}
} while (0);
delete[] resultString;
// Set up the next subsession, if any:
setupNextSubsession(rtspClient);
}
至此,RTSP交互已经完成,再来关注一下RTP_over_TCP的情况,sendSetupCommand的时候会传达streamUsingTCP参数,通知server端,收到回复后handleSETUPResponse用于处理相关事宜:
Boolean RTSPClient::handleSETUPResponse(MediaSubsession& subsession, char const* sessionParamsStr, char const* transportParamsStr, Boolean streamUsingTCP) {
do {
..
// Parse the "Transport:" header parameters:
char* serverAddressStr;
portNumBits serverPortNum;
unsigned char rtpChannelId, rtcpChannelId;
if (!parseTransportParams(transportParamsStr, serverAddressStr, serverPortNum, rtpChannelId, rtcpChannelId)) {
envir().setResultMsg("Missing or bad \"Transport:\" header");
break;
}
delete[] subsession.connectionEndpointName();
subsession.connectionEndpointName() = serverAddressStr;
subsession.serverPortNum = serverPortNum;
subsession.rtpChannelId = rtpChannelId;
subsession.rtcpChannelId = rtcpChannelId;
if (streamUsingTCP) {
// Tell the subsession to receive RTP (and send/receive RTCP) over the RTSP stream:
if (subsession.rtpSource() != NULL) {
subsession.rtpSource()->setStreamSocket(fInputSocketNum, subsession.rtpChannelId); //TCP传输沿用RTSP交互的socket,RTPInterface用于关联这个socket(fInputSocketNum)和session对应的id(rtpChannelId)
// So that we continue to receive & handle RTSP commands and responses from the server
subsession.rtpSource()->enableRTCPReports() = False;
// To avoid confusing the server (which won't start handling RTP/RTCP-over-TCP until "PLAY"), don't send RTCP "RR"s yet
increaseReceiveBufferTo(envir(), fInputSocketNum, 50*1024);
}
if (subsession.rtcpInstance() != NULL) subsession.rtcpInstance()->setStreamSocket(fInputSocketNum, subsession.rtcpChannelId);
RTPInterface::setServerRequestAlternativeByteHandler(envir(), fInputSocketNum, handleAlternativeRequestByte, this);
else {
// Normal case.
// Set the RTP and RTCP sockets' destination address and port from the information in the SETUP response (if present):
netAddressBits destAddress = subsession.connectionEndpointAddress();
if (destAddress == 0) destAddress = fServerAddress;
subsession.setDestinations(destAddress);
}
success = True;
} while (0);
void RTPInterface::setStreamSocket(int sockNum,
unsigned char streamChannelId) {
fGS->removeAllDestinations();
envir().taskScheduler().disableBackgroundHandling(fGS->socketNum()); // turn off any reading on our datagram socket
fGS->reset(); // and close our datagram socket, because we won't be using it anymore
addStreamSocket(sockNum, streamChannelId);
posted on 2019-07-31 13:41
lfc 阅读(423)
评论(0) 编辑 收藏 引用