基础搞明白了,那么live555的RTSP服务器,又是如何创建、启动,如何和Source和Sink建立联系的呢?
主程序中会调用类似下面的代码,创建RTSP服务器:
// Create the RTSP server:
RTSPServer* rtspServer = RTSPServer::createNew(*env, 554, authDB);
if (rtspServer == NULL) {
*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
exit(1);
}
父类GenericMediaServer被构建,后续承担基础的服务:
RTSPServer::RTSPServer(UsageEnvironment& env,
int ourSocket, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationSeconds)
: GenericMediaServer(env, ourSocket, ourPort, reclamationSeconds),
fHTTPServerSocket(-1), fHTTPServerPort(0),
fClientConnectionsForHTTPTunneling(NULL), // will get created if needed
fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS)),
fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)),
fRegisterOrDeregisterRequestCounter(0), fAuthDB(authDatabase), fAllowStreamingRTPOverTCP(True) {
}
GenericMediaServer会创建一个后台任务,由于监听Client的连接:
GenericMediaServer
::GenericMediaServer(UsageEnvironment& env, int ourSocket, Port ourPort,
unsigned reclamationSeconds)
: Medium(env),
fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
fClientSessions(HashTable::create(STRING_HASH_KEYS)) {
ignoreSigPipeOnSocket(fServerSocket); // so that clients on the same host that are killed don't also kill us
// Arrange to handle connections from others:
env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, incomingConnectionHandler, this); //创建后台任务用于监听client的connect
}
收到Client的连接请求后,incomingConnectionHandler函数会被调用。
void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) {
struct sockaddr_in clientAddr;
SOCKLEN_T clientAddrLen = sizeof clientAddr;
int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);
if (clientSocket < 0) {
int err = envir().getErrno();
if (err != EWOULDBLOCK) {
envir().setResultErrMsg("accept() failed: ");
}
return;
}
// Create a new object for handling this connection:
(void)createNewClientConnection(clientSocket, clientAddr);
}
live555会为每个连接调用createNewClientConnection创建一个ClientConnection
RTSPServer::RTSPClientConnection
::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
: GenericMediaServer::ClientConnection(ourServer, clientSocket, clientAddr),
fOurRTSPServer(ourServer), fClientInputSocket(fOurSocket), fClientOutputSocket(fOurSocket),
fIsActive(True), fRecursionCount(0), fOurSessionCookie(NULL) {
resetRequestBuffer();
}
GenericMediaServer::ClientConnection会创建一个后台任务,由于监听connect上的Request:
GenericMediaServer::ClientConnection
::ClientConnection(GenericMediaServer& ourServer, int clientSocket, struct sockaddr_in clientAddr)
: fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) {
// Add ourself to our 'client connections' table:
fOurServer.fClientConnections->Add((char const*)this, this);
// Arrange to handle incoming requests:
resetRequestBuffer();
envir().taskScheduler()
.setBackgroundHandling(fOurSocket, SOCKET_READABLE|SOCKET_EXCEPTION, incomingRequestHandler, this); //创建后台任务用于监听connect上的Request
}
当Connect上收到来自Client的请求后,incomingRequestHandler函数会被调用。
void GenericMediaServer::ClientConnection::incomingRequestHandler() {
struct sockaddr_in dummy; // 'from' address, meaningless in this case
int bytesRead = readSocket(envir(), fOurSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);
handleRequestBytes(bytesRead);
}
void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead) {
.
Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer,
cmdName, sizeof cmdName,
urlPreSuffix, sizeof urlPreSuffix,
urlSuffix, sizeof urlSuffix,
cseq, sizeof cseq,
sessionIdStr, sizeof sessionIdStr,
contentLength);
.
} else if (strcmp(cmdName, "DESCRIBE") == 0) { //处理DESCRIBE request
handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
} else if (strcmp(cmdName, "SETUP") == 0) { //处理SETUP request
clientSession = (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
}
if (clientSession != NULL) {
clientSession->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
}
} else if (strcmp(cmdName, "TEARDOWN") == 0
|| strcmp(cmdName, "PLAY") == 0
|| strcmp(cmdName, "PAUSE") == 0
|| strcmp(cmdName, "GET_PARAMETER") == 0
|| strcmp(cmdName, "SET_PARAMETER") == 0) {
if (clientSession != NULL) {
clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); //处理PLAY请求等
}
.
}
具体RTSP协议下的Request和Respond不作详细介绍,这里只关注流程部分:
1、先看handleCmd_SETUP
void RTSPServer::RTSPClientSession
::handleCmd_SETUP(RTSPServer::RTSPClientConnection* ourClientConnection,
char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) {
fstreamStates = new struct streamState[fNumStreamStates]; //每个会话的状态,用streamState来管理
....
subsession->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr,
clientRTPPort, clientRTCPPort,
fStreamStates[trackNum].tcpSocketNum, rtpChannelId, rtcpChannelId,
destinationAddress, destinationTTL, fIsMulticast,
serverRTPPort, serverRTCPPort,
fStreamStates[trackNum].streamToken);
.
} getStreamParmeters这个函数很重要,它将完成source,RTPSink的创建工作,并将其与客户端建立联系void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,
netAddressBits clientAddress,
Port const& clientRTPPort,
Port const& clientRTCPPort,
int tcpSocketNum,
unsigned char rtpChannelId,
unsigned char rtcpChannelId,
netAddressBits& destinationAddress,
u_int8_t& /*destinationTTL*/,
Boolean& isMulticast,
Port& serverRTPPort,
Port& serverRTCPPort,
void*& streamToken) {
FramedSource* mediaSource
= createNewStreamSource(clientSessionId, streamBitrate);
// Normal case: We're streaming RTP (over UDP or TCP). Create a pair of
// groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
// (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) {
struct in_addr dummyAddr; dummyAddr.s_addr = 0;
serverRTPPort = serverPortNum;
rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort); //创建RTP传输用socket
if (rtpGroupsock->socketNum() < 0) {
delete rtpGroupsock;
continue; // try again
}
if (fMultiplexRTCPWithRTP) {
// Use the RTP 'groupsock' object for RTCP as well:
serverRTCPPort = serverRTPPort;
rtcpGroupsock = rtpGroupsock;
} else {
// Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
serverRTCPPort = ++serverPortNum;
rtcpGroupsock = createGroupsock(dummyAddr, serverRTCPPort);
if (rtcpGroupsock->socketNum() < 0) {
delete rtpGroupsock;
delete rtcpGroupsock;
continue; // try again
}
}
rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
streamToken = fLastStreamToken
= new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
streamBitrate, mediaSource,
rtpGroupsock, rtcpGroupsock);}
2、再来看handleCmd_PLAY
void RTSPServer::RTSPClientSession
::handleCmd_PLAY(RTSPServer::RTSPClientConnection* ourClientConnection,
ServerMediaSubsession* subsession, char const* fullRequestStr) {
.
// Now, start streaming:
for (i = 0; i < fNumStreamStates; ++i) {
if (subsession == NULL /* means: aggregated operation */
|| subsession == fStreamStates[i].subsession) {
unsigned short rtpSeqNum = 0;
unsigned rtpTimestamp = 0;
if (fStreamStates[i].subsession == NULL) continue;
fStreamStates[i].subsession->startStream(fOurSessionId,
fStreamStates[i].streamToken,
(TaskFunc*)noteClientLiveness, this,
rtpSeqNum, rtpTimestamp,
RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection);
.
} void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
void* streamToken,
TaskFunc* rtcpRRHandler,
void* rtcpRRHandlerClientData,
unsigned short& rtpSeqNum,
unsigned& rtpTimestamp,
ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
void* serverRequestAlternativeByteHandlerClientData) {
.
if (streamState != NULL) {
streamState->startPlaying(destinations, clientSessionId,
rtcpRRHandler, rtcpRRHandlerClientData,
serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
.
} void StreamState
::startPlaying(Destinations* dests, unsigned clientSessionId,
TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
void* serverRequestAlternativeByteHandlerClientData) {
.
if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
if (fRTPSink != NULL) {
fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
fAreCurrentlyPlaying = True;
.
} Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc,
void* afterClientData) {
.
return continuePlaying();
} Boolean H264or5VideoRTPSink::continuePlaying() {
// First, check whether we have a 'fragmenter' class set up yet.
// If not, create it now:
if (fOurFragmenter == NULL) {
fOurFragmenter = new H264or5Fragmenter(fHNumber, envir(), fSource, OutPacketBuffer::maxSize,
ourMaxPacketSize() - 12/*RTP hdr size*/);
} else {
fOurFragmenter->reassignInputSource(fSource);
}
fSource = fOurFragmenter;
// Then call the parent class's implementation:
return MultiFramedRTPSink::continuePlaying();
}
Boolean MultiFramedRTPSink::continuePlaying() {
// Send the first packet.
// (This will also schedule any future sends.)
buildAndSendPacket(True);
return True;
}
绕了好大一个圈,终于到达MultiFrameRTPSink的continuePlaying了,从现在开始,它将循环的获取RTSP服务器需要的RTP数据包,直到收到停止命令。
MultiFramedRTPSink是与帧有关的类,其实它要求每次必须从source获得一个帧的数据。
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket)
{
//此函数中主要是准备rtp包的头,为一些需要跟据实际数据改变的字段留出位置。
fIsFirstPacket = isFirstPacket;
// Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)
rtpHdr |= (fRTPPayloadType << 16);
rtpHdr |= fSeqNo; // sequence number
fOutBuf->enqueueWord(rtpHdr);//向包中加入一个字
// Note where the RTP timestamp will go.
// (We can't fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp 在缓冲中空出时间戳的位置
fOutBuf->enqueueWord(SSRC());
// Allow for a special, payload-format-specific header following the
// RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
fSpecialHeaderSize = specialHeaderSize();
fOutBuf->skipBytes(fSpecialHeaderSize);
// Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0;
packFrame(); //头准备好了,再打包帧数据}
void MultiFramedRTPSink::packFrame()
{
// First, see if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) {
//如果有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,因为一个包可能容纳不了一个帧。
// Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize();
struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
fOutBuf->useOverflowData();
afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
} else {
//否则,跟source要。
// Normal case: we need to read a new frame from the source
if (fSource == NULL)
return;
//更新缓冲中的一些位置
fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
//从source获取下一帧
fSource->getNextFrame(fOutBuf->curPtr(),
fOutBuf->totalBytesAvailable(),
afterGettingFrame,
this,
ourHandleClosure,
this);
}
}
source从文件(或某个设备)中读取一帧数据,读完后通过回调函数afterGettingFrame返回给Sink。
void MultiFramedRTPSink::afterGettingFrame(void* clientData,
unsigned numBytesRead, unsigned numTruncatedBytes,
struct timeval presentationTime, unsigned durationInMicroseconds)
{
MultiFramedRTPSink* sink = (MultiFramedRTPSink*) clientData;
sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime,
durationInMicroseconds);
}
void MultiFramedRTPSink::afterGettingFrame1(
unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds)
{
if (fIsFirstPacket) {
// Record the fact that we're starting to play now:
gettimeofday(&fNextSendTime, NULL);
}
unsigned curFragmentationOffset = fCurFragmentationOffset;
unsigned numFrameBytesToUse = frameSize;
unsigned overflowBytes = 0;
//如果包只已经打入帧数据了,并且不能再向这个包中加数据了,则把新获得的帧数据保存下来。
// If we have already packed one or more frames into this packet,
// check whether this new frame is eligible to be packed after them.
// (This is independent of whether the packet has enough room for this
// new frame; that check comes later.)
if (fNumFramesUsedSoFar > 0) {
//如果包中已有了一个帧,并且不允许再打入新的帧了,则只记录下新的帧。
if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
{
// Save away this frame for next time:
numFrameBytesToUse = 0;
fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
presentationTime, durationInMicroseconds);
}
}
//表示当前打入的是否是上一个帧的最后一块数据。
fPreviousFrameEndedFragmentation = False;
//下面是计算获取的帧中有多少数据可以打到当前包中,剩下的数据就作为overflow数据保存下来。
if (numFrameBytesToUse > 0) {
// Check whether this frame overflows the packet
if (fOutBuf->wouldOverflow(frameSize)) {
// Don't use this frame now; instead, save it as overflow data, and
// send it in the next packet instead. However, if the frame is too
// big to fit in a packet by itself, then we need to fragment it (and
// use some of it in this packet, if the payload format permits this.)
if (isTooBigForAPacket(frameSize)
&& (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
// We need to fragment this frame, and use some of it now:
overflowBytes = computeOverflowForNewFrame(frameSize);
numFrameBytesToUse -= overflowBytes;
fCurFragmentationOffset += numFrameBytesToUse;
} else {
// We don't use any of this frame now:
overflowBytes = frameSize;
numFrameBytesToUse = 0;
}
fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
overflowBytes, presentationTime, durationInMicroseconds);
} else if (fCurFragmentationOffset > 0) {
// This is the last fragment of a frame that was fragmented over
// more than one packet. Do any special handling for this case:
fCurFragmentationOffset = 0;
fPreviousFrameEndedFragmentation = True;
}
}
if (numFrameBytesToUse == 0 && frameSize > 0) {
// Send our packet now, because we have filled it up:
sendPacketIfNecessary();
} else {
//需要向包中打入数据。
// Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
fOutBuf->increment(numFrameBytesToUse);
// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes
// Here's where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime, overflowBytes);
++fNumFramesUsedSoFar;
//更新fNextSendTime,后面delay queue需要用到。
// Update the time at which the next packet should be sent, based
// on the duration of the frame that we just packed into it.
// However, if this frame has overflow data remaining, then don't
// count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
fNextSendTime.tv_usec %= 1000000;
}
//如果需要,就发出包,否则继续打入数据。
// Send our packet now if (i) it's already at our preferred size, or
// (ii) (heuristic) another frame of the same size as the one we just
// read would overflow the packet, or
// (iii) it contains the last fragment of a fragmented frame, and we
// don't allow anything else to follow this or
// (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize()
|| fOutBuf->wouldOverflow(numFrameBytesToUse)
|| (fPreviousFrameEndedFragmentation
&& !allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(
fOutBuf->curPtr() - frameSize, frameSize)) {
// The packet is ready to be sent now
sendPacketIfNecessary();
} else {
// There's room for more frames; try getting another:
packFrame();
}
}void MultiFramedRTPSink::sendPacketIfNecessary()
{
if (fNumFramesUsedSoFar > 0) {
++fPacketCount;
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize
- fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
++fSeqNo; // for next time
}
//如果还有剩余数据,则调整缓冲区
if (fOutBuf->haveOverflowData()
&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) {
// Efficiency hack: Reset the packet start pointer to just in front of
// the overflow data (allowing for the RTP header and special headers),
// so that we probably don't have to "memmove()" the overflow data
// into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()-
(rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
fOutBuf->adjustPacketStart(newPacketStart);
} else {
// Normal case: Reset the packet start pointer back to the start:
fOutBuf->resetPacketStart();
}
fOutBuf->resetOffset();
fNumFramesUsedSoFar = 0;
if (fNoFramesLeft) {
// We're done:
onSourceClosure(this);
} else {
//如果还有数据,则在下一次需要发送的时间再次打包发送。
// We have more frames left to send. Figure out when the next frame
// is due to start playing, then make sure that we wait this long before
// sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL);
int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
int64_t uSecondsToGo = secsDiff * 1000000
+ (fNextSendTime.tv_usec - timeNow.tv_usec);
if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;
}
// Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
(TaskFunc*) sendNext, this);}
}
// The following is called after each delay between packet sends:
void MultiFramedRTPSink::sendNext(void* firstArg) {
MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg;
sink->buildAndSendPacket(False); //继续循环
}
posted on 2017-02-08 16:47
lfc 阅读(711)
评论(0) 编辑 收藏 引用