ccrtp-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Ccrtp-devel] Some ccRTP problems handling RTCP BYE


From: Werner Dittmann
Subject: [Ccrtp-devel] Some ccRTP problems handling RTCP BYE
Date: Thu, 01 May 2008 15:59:44 +0200
User-agent: Thunderbird 2.0.0.9 (X11/20070801)

during the tests for ZRTP I came across some problems in GNU ccRTP as
well. A specific test program causes GNU ccRTP to malfunction during
RTCP session termination (dispachtBYE() ). The test program is
attached to this mail. Compile and link it using

g++ rtplock.cpp -lccrtp1 -lccgnu2 -o rtplock

To run it open two terminal windows and run ./rtplock -r in on window
(start first) and ./zrtplock -s in the second window. On my system the
receiver will terminate correctly, the sender hangs
indefinitly. This has several reasons, some of them are based in the test
program itself, some in the ccRTP stack. Even if the test program
contains "programmer's faults" the ccRTP stack should be robust
enough no to go into an endless wait state.

This problem may show up on faster machines only (I've a dual core AMD
Athlon Linux system here) because it may be also timing dependent.


The test program

the test program sequentially creates two RTP send/receive sessions
that send or receive data. Main() controls this.

- Both sessions use the same SSRC (0xdeadbeef) on the receive _and_
  the transmit sessions

- The first session uses another sending port than the second session
  (first session uses the default, the second session a specific port)

The specific setup and the timing of the two sessions force the second
ccRTP receiver session to send _two_ RTCP BYE messages (SSRC collision in
RTCP packets) for the _same_ SSRC to the transmitter.


ccRTP stack

These two BYE messages cause the transmitter session (that is the
receiver of the BYE messages) to decrease the number of its
sync source "members" to "-1" which in turn causes the dispatchBYE() to
assume it has more the 50 members (because the member counter uses
unsigned int). This in turn calls getOnlyBye() which hangs because
nobody send a RTCP BYE :-) anymore.

This problem is caused by several problems in the ccRTP stack.

At the receiver side:

The method QueueRTCPManager::checkSSRCInRTCPPkt() checks for SSRC
collisions and if it detects a collsision it executes the follwoing
code:

...
        } else {
                // New collision
                addConflict(s->getNetworkAddress(),
                            s->getDataTransportPort(),
                            s->getControlTransportPort());
                dispatchBYE("SSRC collision detected when receiving RTCP 
packet");
                renewLocalSSRC();
                setNetworkAddress(*s,network_address);
                setControlTransportPort(*s,transport_port);
                setControlTransportPort(*s,0);
                sourceLink.initStats();
...

Three topics here:

- The methods calls "dispatchBYE()" to signal the other party to close
  the RTP session because of a colliding SSRC. The other party may
  decide to close the session or just go on (AFAIK).

- the next step: it calls "renewLocalSSRC()" to create another local
  SSRC to avoid a further SSRC collision if the other party goes on
  to use its SSRC. However, renewLocalSSRC() uses random() to create
  a new SSRC and checks if it is already used but it _never_ sets the
  newly generated SSRC as a new local SSRC - this causes the detection
  of the second SSRC collision.

- The next lines set (or reset?) the network address and port of the
  sync source - why this? whey setting the port two times? why at all?


Now the problems during handling of received of RTCP packets:

- QueueRTCPManager::takeInControlPacket() receives the control packet,
  detects that is is a BYE and calls QueueRTCPManager::getBYE()

- getBYE() calls onGotGoodbye() if it was set by the application, then
  calls MembershipBookkeeping::BYESource() and sets the synch source
  into leaving state. No check is done if there was a BYE already for
  this source. Which part honors the "leaving" state here or elsewhere?

- BYESource() checks if the sync source is registered. If it is
  registered it just notes the fact and decreases the member count
  without further checking. The member count may drop way below zero
  (it the counter would be a signed int) if several BYE packets for the
  same SSRC are received. BYESource also does not check this.

- The next problem is: the sync members counter is unsigned, thus the
  check  in memebrs > 50 dispatchBYE() triggers because -1 unsigned
  is bigger than 50 :-) .

Those who are more familiar with the ccRTP code can you please check
my findings? Maybe we can have a short discussion how to fix ccRTP.

Regards,
Werner


// Test for ccRTP
//
// Copyright (C) 2008 Werner Dittmann <address@hidden>
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

#include <cstdlib>
#include <ccrtp/rtp.h>

#ifdef  CCXX_NAMESPACES
using namespace ost;
using namespace std;
#endif

class PacketsPattern
{
public:
    inline const InetHostAddress&
    getDestinationAddress() const
    { return destinationAddress; }

    inline const tpport_t
    getDestinationPort() const
    { return destinationPort; }

    uint32
    getPacketsNumber() const
    { return packetsNumber; }

    uint32
    getSsrc() const
    { return 0xdeadbeef; }

    const unsigned char*
    getPacketData(uint32 i)
    { return data[i%2]; }

    const size_t
    getPacketSize(uint32 i)
    { return strlen((char*)data[i%2]) + 1 ; }

private:
    static const InetHostAddress destinationAddress;
    static const uint16 destinationPort = 10002;
    static const uint32 packetsNumber = 10;
    static const uint32 packetsSize = 12;
    static const unsigned char* data[];
};

const InetHostAddress PacketsPattern::destinationAddress =
    InetHostAddress("localhost");

const unsigned char* PacketsPattern::data[] = {
    (unsigned char*)"0123456789\n",
    (unsigned char*)"987654321\n"
};

PacketsPattern pattern;

class
Test
{
public:
    virtual int
    doTest() = 0;
};

/**
 * SymmetricRTPSession using the standard address / port during receiving
 * and using other ports to send
 *
 * The next two classes show how to use <code>SymmetricRTPSession</code>
 * in the same way as <code>RTPSession</code>. This is straightforward,
 * just don't do any configuration or initialization.
 */
class
SendPacketTransmissionTest : public Test, public Thread, public TimerPort
{
public:
    void
    run()
    {
        doTest();
        cout << "exiting thread" << endl;
    }

    int doTest()
    {
        // should be valid?
        //RTPSession tx();
        SymmetricRTPSession tx(pattern.getSsrc(), InetHostAddress("localhost"));
        tx.setSchedulingTimeout(10000);
        tx.setExpireTimeout(1000000);

        tx.startRunning();

        tx.setPayloadFormat(StaticPayloadFormat(sptPCMU));
        if (!tx.addDestination(pattern.getDestinationAddress(),
                               pattern.getDestinationPort()) ) {
            return 1;
        }

        // 2 packets per second (packet duration of 500ms)
        uint32 period = 500;
        uint16 inc = tx.getCurrentRTPClockRate()/2;
        TimerPort::setTimer(period);
        uint32 i;
        for (i = 0; i < pattern.getPacketsNumber(); i++ ) {
            tx.putData(i*inc,
                       pattern.getPacketData(i),
                       pattern.getPacketSize(i));
            cout << "Sent some data: " << i << ", " << i*inc << ", " <<  
TimerPort::getTimer() << endl;
            Thread::sleep(TimerPort::getTimer());
            TimerPort::incTimer(period);
        }
        tx.putData(i*inc, (unsigned char*)"exit", 5);
        cout << "Sent exit string: " << i << ", " << i*inc << ", " <<  
TimerPort::getTimer() << endl;
        Thread::sleep(TimerPort::getTimer());
        cout << "exiting thread" << endl;
        return 0;
    }
};


class
RecvPacketTransmissionTest : public Test, public Thread
{
public:
    void
    run() {
        doTest();
        cout << "exiting thread" << endl;
    }

    int
    doTest() {
        SymmetricRTPSession rx(pattern.getSsrc(), 
pattern.getDestinationAddress(),
                                pattern.getDestinationPort());

        rx.setSchedulingTimeout(10000);
        rx.setExpireTimeout(1000000);

        rx.startRunning();
        rx.setPayloadFormat(StaticPayloadFormat(sptPCMU));
        // arbitrary number of loops to provide time to start transmitter
        if (!rx.addDestination(pattern.getDestinationAddress(),
                               pattern.getDestinationPort()+2) ) {
            return 1;
        }
        for ( int i = 0; i < 5000 ; i++ ) {
            const AppDataUnit* adu;
            while ( (adu = rx.getData(rx.getFirstTimestamp())) ) {
                cerr << "got some data: " << adu->getData() << endl;
                if (*adu->getData() == 'e') {
                    cout << " returning from thread" << endl;
                    delete adu;
                    return 0;
                }
                delete adu;
            }
            Thread::sleep(70);
        }
        return 0;
    }
};


/**
 * Another SymmetricRTPSession, using differnt port as receive port.
 *
 */

class
ZrtpSendPacketTransmissionTest : public Test, public Thread, public TimerPort
{
public:
    void
    run()
    {
        cout << "thread id: " << hex << Thread::getId() << dec << endl;
        doTest();
        cout << "exiting thread" << endl;
    }

    int doTest()
    {
        // should be valid?
        //RTPSession tx();
        SymmetricRTPSession tx(pattern.getSsrc(), 
pattern.getDestinationAddress(),
                                pattern.getDestinationPort()+2);

        tx.setSchedulingTimeout(10000);
        tx.setExpireTimeout(1000000);

        tx.startRunning();

        tx.setPayloadFormat(StaticPayloadFormat(sptPCMU));
        if (!tx.addDestination(pattern.getDestinationAddress(),
                               pattern.getDestinationPort()) ) {
            return 1;
        }
        cout << "thread id tx: " << hex << tx.Thread::getId() << dec << endl;

        // 2 packets per second (packet duration of 500ms)
        uint32 period = 500;
        uint16 inc = tx.getCurrentRTPClockRate()/2;
        TimerPort::setTimer(period);
        uint32 i;
        for (i = 0; i < pattern.getPacketsNumber(); i++ ) {
            tx.putData(i*inc,
                       pattern.getPacketData(i),
                       pattern.getPacketSize(i));
            cout << "Sent some data: " << i << ", " << i*inc << ", " <<  
TimerPort::getTimer() << endl;
            Thread::sleep(TimerPort::getTimer());
            TimerPort::incTimer(period);
        }
        tx.putData(i*inc, (unsigned char*)"exit", 5);
        cout << "Sent exit string: " << i << ", " << i*inc << ", " <<  
TimerPort::getTimer() << endl;
        Thread::sleep(200);
        cout << "before return" << endl;
        return 0;
    }
};

class
ZrtpRecvPacketTransmissionTest : public Test, public Thread
{
public:
    void
    run() {
        doTest();
        cout << "exiting thread" << endl;
    }

    int
    doTest() {
        SymmetricRTPSession rx(pattern.getSsrc(), 
pattern.getDestinationAddress(),
                                pattern.getDestinationPort());

        rx.setSchedulingTimeout(10000);
        rx.setExpireTimeout(1000000);

        rx.startRunning();
        rx.setPayloadFormat(StaticPayloadFormat(sptPCMU));

        if (!rx.addDestination(pattern.getDestinationAddress(),
                               pattern.getDestinationPort()+2) ) {
            return 1;
        }

        // arbitrary number of loops to provide time to start transmitter
        for ( int i = 0; i < 700 ; i++ ) {
            const AppDataUnit* adu;
            while ( (adu = rx.getData(rx.getFirstTimestamp())) ) {
                cerr << "got some data: " << adu->getData() << endl;
                if (*adu->getData() == 'e') {
                    cout << " returning from thread" << endl;
                    delete adu;
                    return 0;
                }
                delete adu;
            }
            Thread::sleep(70);
        }
        return 0;
    }
};


int main(int argc, char *argv[])
{
    int result = 0;
    bool send = false;
    bool recv = false;

    char c;

    /* check args */
    while (1) {
        c = getopt(argc, argv, "rs");
        if (c == -1) {
            break;
        }
        switch (c) {
        case 'r':
            recv = true;
            break;
        case 's':
            send = true;
            break;
        default:
            cerr << "Wrong Arguments" << endl;
        }
    }

    if (send || recv) {
        if (send) {
            cout << "Running as sender" << endl;
        }
        else {
            cout << "Running as receiver" << endl;
        }
    }
    else {
        cerr << "No send or receive argument specificied" << endl;
        exit(1);
    }

    // accept as parameter if must run as --send or --recv

//#if 0
     RecvPacketTransmissionTest *rx;
     SendPacketTransmissionTest *tx;

    // run several tests in parallel threads
    if ( send ) {
        tx = new SendPacketTransmissionTest();
        tx->start();
        tx->join();
        cout << "After join tx" << endl;
    } else if ( recv ) {
        rx = new RecvPacketTransmissionTest();
        rx->start();
        rx->join();
        cout << "After join rx" << endl;
    }
//#endif
//#if 0
    ZrtpRecvPacketTransmissionTest *zrx;
    ZrtpSendPacketTransmissionTest *ztx;

    if ( send ) {
        ztx = new ZrtpSendPacketTransmissionTest();
        ztx->start();
        ztx->join();
        cout << "After join ztx" << endl;
    } else if ( recv ) {
        zrx = new ZrtpRecvPacketTransmissionTest();
        zrx->start();
        zrx->join();
        cout << "After join zrx" << endl;
    }
//#endif

    exit(result);
}

/** EMACS **
 * Local variables:
 * mode: c++
 * c-default-style: ellemtel
 * c-basic-offset: 4
 * End:
 */

reply via email to

[Prev in Thread] Current Thread [Next in Thread]