随笔-118  评论-133  文章-4  trackbacks-0

引言

主要目的是为了搞清楚众多的socket

正文

1、RTSPClient::sendDescribeCommand

unsigned RTSPClient::sendDescribeCommand(responseHandler* responseHandler, Authenticator* authenticator) {
  
if (fCurrentAuthenticator < authenticator) fCurrentAuthenticator = *authenticator;
  return sendRequest(
new RequestRecord(++fCSeq, "DESCRIBE", responseHandler));
}
unsigned RTSPClient::sendRequest(RequestRecord* request) {
  char
* cmd = NULL;
  
do {
    
Boolean connectionIsPending = False;
    
if (!fRequestsAwaitingConnection.isEmpty()) {
      
// A connection is currently pending (with at least one enqueued request).  Enqueue this request also:
      connectionIsPending 
= True;
    } 
else if (fInputSocketNum < 0) { // we need to open a connection
      
int connectResult = openConnection();
      
if (connectResult < 0) break; // an error occurred
      
else if (connectResult == 0) {
    
// A connection is pending
        connectionIsPending 
= True;
      } 
// else the connection succeeded.  Continue sending the command.
    }
    
if (connectionIsPending) {
      fRequestsAwaitingConnection.enqueue(request);
      return request
->cseq();
    }
........
    if (send(fOutputSocketNum, cmd, strlen(cmd), 0< 0) {
      char 
const* errFmt = "%s send() failed: ";
      unsigned 
const errLength = strlen(errFmt) + strlen(request->commandName());
      char
* err = new char[errLength];
      sprintf(err, errFmt, request
->commandName());
      envir().setResultErrMsg(err);
      delete[] err;
      break;
    }

    
// The command send succeeded, so enqueue the request record, so that its response (when it comes) can be handled.
    
// However, note that we do not expect a response to a POST command with RTSP-over-HTTP, so don't enqueue that.
    int cseq = request->cseq();

    
if (fTunnelOverHTTPPortNum == 0 || strcmp(request->commandName(), "POST") != 0) {
      
fRequestsAwaitingResponse.enqueue(request); //加入fRequestsAwaitingResponse队列
    } 
else {
      delete request;
    }

    delete[] cmd;
    return cseq;
  } 
while (0);
RTSP交互用的TCP socket,就是在openConnection里面创建的:
int RTSPClient::openConnection() {
  
do {
    
// Set up a connection to the server.  Begin by parsing the URL:

    char
* username;
    char
* password;
    NetAddress destAddress;
    portNumBits urlPortNum;
    char 
const* urlSuffix;
    
if (!parseRTSPURL(envir(), fBaseURL, username, password, destAddress, urlPortNum, &urlSuffix)) break;
    portNumBits destPortNum 
= fTunnelOverHTTPPortNum == 0 ? urlPortNum : fTunnelOverHTTPPortNum;
    
if (username != NULL || password != NULL) {
      fCurrentAuthenticator.setUsernameAndPassword(username, password);
      delete[] username;
      delete[] password;
    }

    
// We don't yet have a TCP socket (or we used to have one, but it got closed).  Set it up now.
    fInputSocketNum = setupStreamSocket(envir(), 0); //RTSP交互TCP socket
    
if (fInputSocketNum < 0) break;
    ignoreSigPipeOnSocket(fInputSocketNum); 
// so that servers on the same host that get killed don't also kill us
    if (fOutputSocketNum < 0) fOutputSocketNum = fInputSocketNum;

    
// Connect to the remote endpoint:
    fServerAddress 
= *(netAddressBits*)(destAddress.data());
    
int connectResult = connectToServer(fInputSocketNum, destPortNum); //建立tcp连接
    
if (connectResult < 0) break;
    
else if (connectResult > 0) {
      
// The connection succeeded.  Arrange to handle responses to requests sent on it:
      envir().taskScheduler().setBackgroundHandling(fInputSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION,
                            (TaskScheduler::BackgroundHandlerProc
*)&incomingDataHandler, this); //在建立连接时马上向任务计划中加入处理从这个TCP接收数据的socket handler:RTSPClient::incomingDataHandler()
  

    }
    return connectResult;
while (0);

  resetTCPSockets();
  return 
-1;
}
收到Response后,inCommingDataHandler被调用,辗转调用handleResponseBytes:
void RTSPClient::incomingDataHandler(void* instance, int /*mask*/) {
  RTSPClient
* client = (RTSPClient*)instance;
  client
->incomingDataHandler1();
}

void RTSPClient::incomingDataHandler1() {
  struct sockaddr_in dummy; 
// 'from' address - not used

  
int bytesRead = readSocket(envir(), fInputSocketNum, (unsigned char*)&fResponseBuffer[fResponseBytesAlreadySeen], fResponseBufferBytesLeft, dummy);
  
handleResponseBytes(bytesRead);

}
void RTSPClient::handleResponseBytes(int newBytesRead) {
      // Find the handler function for "cseq":
      RequestRecord
* request;
      
while ((request = fRequestsAwaitingResponse.dequeue()) != NULL) {
        
if (request->cseq() < cseq) { // assumes that the CSeq counter will never wrap around
          
// We never received (and will never receive) a response for this handler, so delete it:
          
if (fVerbosityLevel >= 1 && strcmp(request->commandName(), "POST") != 0) {
            envir() 
<< "WARNING: The server did not respond to our \"" << request->commandName() << "\" request (CSeq: "
            
<< request->cseq() << ").  The server appears to be buggy (perhaps not handling pipelined requests properly).\n";
          }
          delete request;
        } 
else if (request->cseq() == cseq) {
          
// This is the handler that we want. Remove its record, but remember it, so that we can later call its handler:
          
foundRequest = request;
          break;
        else { // request->cseq() > cseq
          
// No handler was registered for this response, so ignore it.
          break;
        }
      }

      
if (foundRequest == NULL) {
         
// Hack: The response didn't have a "CSeq:" header; assume it's for our most recent request:
          foundRequest = fRequestsAwaitingResponse.dequeue();
      }

      
if (foundRequest != NULL) {
       
Boolean needToResendCommand = False// by default
       
if (responseCode == 200) {
         
// Do special-case response handling for some commands:
         
if (strcmp(foundRequest->commandName(), "SETUP"== 0) {
           
if (!handleSETUPResponse(*foundRequest->subsession(), sessionParamsStr, transportParamsStr, foundRequest->booleanFlags()&0x1)) break;
          } 
else if (strcmp(foundRequest->commandName(), "PLAY"== 0) {
           
if (!handlePLAYResponse(*foundRequest->session(), *foundRequest->subsession(), scaleParamsStr, speedParamsStr, rangeParamsStr, rtpInfoParamsStr)) break;
          } 
else if (strcmp(foundRequest->commandName(), "TEARDOWN"== 0) {
           
if (!handleTEARDOWNResponse(*foundRequest->session(), *foundRequest->subsession())) break;
          } 
else if (strcmp(foundRequest->commandName(), "GET_PARAMETER"== 0) {
           
if (!handleGET_PARAMETERResponse(foundRequest->contentStr(), bodyStart, responseEnd)) break;
          }
        } 
    if (foundRequest != NULL && foundRequest->handler() != NULL) {
      
int resultCode;
      char
* resultString;
      
if (responseSuccess) {
       
if (responseCode == 200) {
          resultCode 
= 0;
          resultString 
= numBodyBytes > 0 ? strDup(bodyStart) : strDup(publicParamsStr);
          
// Note: The "strDup(bodyStart)" call assumes that the body is encoded without interior '\0' bytes
        } else {
          resultCode 
= responseCode;
          resultString 
= strDup(responseStr);
          envir().setResultMsg(responseStr);
        }
        (*foundRequest->handler())(this, resultCode, resultString); //调用Request的回调函数(handler)
      } 
else {
       
// An error occurred parsing the response, so call the handler, indicating an error:
        handleRequestError(foundRequest);
      }
    }
DESCRIBE Request收到回复后,根据回复的sdp创建MediaSession(内部再创建MediaSubsession):
void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
  
do {
    UsageEnvironment
& env = rtspClient->envir(); // alias
    StreamClientState
& scs = ((ourRTSPClient*)rtspClient)->scs; // alias

    
if (resultCode != 0) {
      env 
<< *rtspClient << "Failed to get a SDP description: " << resultString << "\n";
      delete[] resultString;
      break;
    }

    char
* const sdpDescription = resultString;
    env 
<< *rtspClient << "Got a SDP description:\n" << sdpDescription << "\n";

    
// Create a media session object from this SDP description:
    scs.session 
= MediaSession::createNew(env, sdpDescription);
    delete[] sdpDescription; 
// because we don't need it anymore
    if (scs.session == NULL) {
      env 
<< *rtspClient << "Failed to create a MediaSession object from the SDP description: " << env.getResultMsg() << "\n";
      break;
    } 
else if (!scs.session->hasSubsessions()) {
      env 
<< *rtspClient << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
      break;
    }

    
// Then, create and set up our data source objects for the session.  We do this by iterating over the session's 'subsessions',
    // calling "MediaSubsession::initiate()"and then sending a RTSP "SETUP" command, on each one.
    
// (Each 'subsession' will have its own data source.)
    scs.iter = new MediaSubsessionIterator(*scs.session);
    
setupNextSubsession(rtspClient);  } 
    return;
while (0);
setupNextSubsession会遍历MediaSession里面的MediaSubsession,分别调用initiate进行初始化,并调用SETUP Request:
void setupNextSubsession(RTSPClient* rtspClient) {
  UsageEnvironment
& env = rtspClient->envir(); // alias
  StreamClientState
& scs = ((ourRTSPClient*)rtspClient)->scs; // alias

  scs.subsession 
= scs.iter->next();
  
if (scs.subsession != NULL) {
    
if (!scs.subsession->initiate()) {
      env << *rtspClient << "Failed to initiate the \"" << *scs.subsession << "\" subsession: " << env.getResultMsg() << "\n";
      setupNextSubsession(rtspClient); 
// give up on this subsession; go to the next one
    } 
else {
      env 
<< *rtspClient << "Initiated the \"" << *scs.subsession << "\" subsession (";
      
if (scs.subsession->rtcpIsMuxed()) {
        env 
<< "client port " << scs.subsession->clientPortNum();
      } 
else {
        env 
<< "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1;
      }
      env 
<< ")\n";

      
// Continue setting up this subsession, by sending a RTSP "SETUP" command:
      
rtspClient->sendSetupCommand(*scs.subsession, continueAfterSETUP, False, REQUEST_STREAMING_OVER_TCP);
    }
    return;
  }

  
// We've finished setting up all of the subsessions.  Now, send a RTSP "PLAY" command to start the streaming:
  if (scs.session->absStartTime() != NULL) {
    
// Special case: The stream is indexed by 'absolute' time, so send an appropriate "PLAY" command:
    rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY, scs.session->absStartTime(), scs.session->absEndTime());
  } 
else {
    scs.duration 
= scs.session->playEndTime() - scs.session->playStartTime();
    
rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY);
  }
}
Boolean MediaSubsession::initiate(int useSpecialRTPoffset) {

    
// Create a new socket:
    
if (isSSM()) {
      fRTPSocket 
= new Groupsock(env(), tempAddr, fSourceFilterAddr, 0);
    } 
else {
      
fRTPSocket = new Groupsock(env(), tempAddr, 0255); //创建RTP传输用socket,端口后会根据sdp来决定是server端指定还是client自动分配(setup的时候反馈给server端)  
    }
    if (fRTPSocket == NULL) {
      env().setResultMsg(
"MediaSession::initiate(): unable to create RTP and RTCP sockets");
      break;
    }

    
// Get the client port number:
    Port clientPort(
0);
    
if (!getSourcePort(env(), fRTPSocket->socketNum(), clientPort)) {
      break;
    }
    fClientPortNum 
= ntohs(clientPort.num());

    
if (fMultiplexRTCPWithRTP) {
      
// Use this RTP 'groupsock' object for RTCP as well:
      fRTCPSocket = fRTPSocket;
      success 
= True;
      break;
    }

    
// Make sure we can use the next (i.e., odd) port number, for RTCP:
    portNumBits rtcpPortNum 
= fClientPortNum|1;
    
if (isSSM()) {
      fRTCPSocket 
= new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);
    } 
else {
      
fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);    
    }
    if (fRTCPSocket != NULL && fRTCPSocket->socketNum() >= 0) {
      
// Success! Use these two sockets.
      success 
= True;
      break;
    }

    
// Try to use a big receive buffer for RTP - at least 0.1 second of
    
// specified bandwidth and at least 50 KB
    unsigned rtpBufSize 
= fBandwidth * 25 / 2// 1 kbps * 0.1 s = 12.5 bytes
    
if (rtpBufSize < 50 * 1024)
      rtpBufSize 
= 50 * 1024;
    increaseReceiveBufferTo(env(), fRTPSocket->socketNum(), rtpBufSize);

    
if (isSSM() && fRTCPSocket != NULL) {
      
// Special case for RTCP SSM: Send RTCP packets back to the source via unicast:
      fRTCPSocket
->changeDestinationParameters(fSourceFilterAddr,0,~0);
    }

    
// Create "fRTPSource" and "fReadSource":
    
if (!createSourceObjects(useSpecialRTPoffset)) break;

    if (fReadSource == NULL) {
      env().setResultMsg(
"Failed to create read source");
      break;
    }

    
// Finally, create our RTCP instance. (It starts running automatically)
    
if (fRTPSource != NULL && fRTCPSocket != NULL) {
      
// If bandwidth is specified, use it and add 5for RTCP overhead.
      
// Otherwise make a guess at 500 kbps.
      unsigned totSessionBandwidth
= fBandwidth ? fBandwidth + fBandwidth / 20 : 500;
      fRTCPInstance 
= RTCPInstance::createNew(env(), fRTCPSocket,
                          totSessionBandwidth,
                          (unsigned char 
const*)
                          fParent.CNAME(),
                          
NULL /* we're a client */,
                          fRTPSource);
      
if (fRTCPInstance == NULL) {
        env().setResultMsg(
"Failed to create RTCP instance");
        break;
      }
    }
Boolean MediaSubsession::createSourceObjects(int useSpecialRTPoffset) {
      
// Check "fCodecName" against the set of codecs that we support,
      
// and create our RTP source accordingly
      
// (Later make this code more efficient, as this set grows #####)
      
// (Also, add more fmts that can be implemented by SimpleRTPSource#####)

      } 
else if (strcmp(fCodecName, "H264"== 0) {
        fReadSource 
= fRTPSource = H264VideoRTPSource::createNew(env(), fRTPSocket,fRTPPayloadFormat,fRTPTimestampFrequency)
最后,调用createSourceObjects创建Source,有了source,就可以通过调用getNextFrame函数获取到数据了,当然也可以再创建Sink,然后由Sink来间接调用getNextFrame。

2、RTSPClient::sendSetupCommand

unsigned RTSPClient::sendSetupCommand(MediaSubsession& subsession, responseHandler* responseHandler,
                                      
Boolean streamOutgoing, Boolean streamUsingTCPBoolean forceMulticastOnUnspecified,
                      Authenticator
* authenticator) {
  
if (fTunnelOverHTTPPortNum != 0) streamUsingTCP = True// RTSP-over-HTTP tunneling uses TCP (by definition)
  
if (fCurrentAuthenticator < authenticator) fCurrentAuthenticator = *authenticator;

  u_int32_t booleanFlags 
= 0;
  
if (streamUsingTCP) booleanFlags |= 0x1;
  
if (streamOutgoing) booleanFlags |= 0x2;
  
if (forceMulticastOnUnspecified) booleanFlags |= 0x4;
  return sendRequest(
new RequestRecord(++fCSeq, "SETUP", responseHandler, NULL&subsession, booleanFlags
));
}
void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
  
do {
 

    
// Having successfully setup the subsession, create a data sink for it, and call "startPlaying()" on it.
    
// (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later,
    // after we've sent a RTSP "PLAY" command.)

    scs.subsession
->sink = DummySink::createNew(env, *scs.subsession, rtspClient->url());
      
// perhaps use your own custom "MediaSink" subclass instead
    
if (scs.subsession->sink == NULL) {
      env 
<< *rtspClient << "Failed to create a data sink for the \"" << *scs.subsession
      << "\" subsession: " << env.getResultMsg() << "\n";
      break;
    }

    env 
<< *rtspClient << "Created a data sink for the \"" << *scs.subsession << "\" subsession\n";
    scs.subsession
->miscPtr = rtspClient; // a hack to let subsession handler functions get the "RTSPClient" from the subsession
    scs.subsession
->sink->startPlaying(*(scs.subsession->readSource()),
                       subsessionAfterPlaying, scs.subsession);
    
// Also set a handler to be called if a RTCP "BYE" arrives for this subsession:
    
if (scs.subsession->rtcpInstance() != NULL) {
      scs.subsession
->rtcpInstance()->setByeHandler(subsessionByeHandler, scs.subsession);
    }
  } 
while (0);
  delete[] resultString;

  
// Set up the next subsession, if any:
  setupNextSubsession(rtspClient);
}
至此,RTSP交互已经完成,再来关注一下RTP_over_TCP的情况,sendSetupCommand的时候会传达streamUsingTCP参数,通知server端,收到回复后handleSETUPResponse用于处理相关事宜
Boolean RTSPClient::handleSETUPResponse(MediaSubsession& subsession, char const* sessionParamsStr, char const* transportParamsStr, Boolean streamUsingTCP) {
  
do {
..
    
// Parse the "Transport:" header parameters:
    char
* serverAddressStr;
    portNumBits serverPortNum;
    unsigned char rtpChannelId, rtcpChannelId;
    
if (!parseTransportParams(transportParamsStr, serverAddressStr, serverPortNum, rtpChannelId, rtcpChannelId)) {
      envir().setResultMsg("Missing or bad \"Transport:\" header");
      break;
    }
    delete[] subsession.connectionEndpointName();
    subsession.connectionEndpointName() 
= serverAddressStr;
    subsession.serverPortNum 
= serverPortNum;
    subsession.rtpChannelId 
= rtpChannelId;
    subsession.rtcpChannelId 
= rtcpChannelId;

    
if (streamUsingTCP) {
      // Tell the subsession to receive RTP (and send/receive RTCP) over the RTSP stream:
      
if (subsession.rtpSource() != NULL) {
       
subsession.rtpSource()->setStreamSocket(fInputSocketNum, subsession.rtpChannelId); //TCP传输沿用RTSP交互的socket,RTPInterface用于关联这个socket(fInputSocketNum)和session对应的id(rtpChannelId)
       
// So that we continue to receive & handle RTSP commands and responses from the server
        subsession.rtpSource()
->enableRTCPReports() = False;
       
// To avoid confusing the server (which won't start handling RTP/RTCP-over-TCP until "PLAY"), don't send RTCP "RR"s yet
        increaseReceiveBufferTo(envir(), fInputSocketNum, 50*1024);
      }
      
if (subsession.rtcpInstance() != NULLsubsession.rtcpInstance()->setStreamSocket(fInputSocketNum, subsession.rtcpChannelId);
      RTPInterface::setServerRequestAlternativeByteHandler(envir(), fInputSocketNum, handleAlternativeRequestByte, this);
    else {
     
// Normal case.
     
// Set the RTP and RTCP sockets' destination address and port from the information in the SETUP response (if present):
      netAddressBits destAddress = subsession.connectionEndpointAddress();
     
if (destAddress == 0) destAddress = fServerAddress;
      subsession.setDestinations(destAddress);
    }

    success 
= True;
  } 
while (0);
void RTPInterface::setStreamSocket(int sockNum,
                   unsigned char streamChannelId) {
  fGS
->removeAllDestinations();
  envir().taskScheduler().disableBackgroundHandling(fGS
->socketNum()); // turn off any reading on our datagram socket
  fGS
->reset(); // and close our datagram socket, because we won't be using it anymore

  
addStreamSocket(sockNum, streamChannelId);
关于RTPInterface更详细的介绍,请参考以下链接:https://blog.csdn.net/xy365/article/details/20942979
posted on 2019-07-31 13:41 lfc 阅读(424) 评论(0)  编辑 收藏 引用
只有注册用户登录后才能发表评论。