//------------------------------------------------------------------------------
// File: OutputQ.cpp
//
// Desc: DirectShow base classes - implements COutputQueue class used by an
//       output pin which may sometimes want to queue output samples on a
//       separate thread and sometimes call Receive() directly on the input
//       pin.
//
// Copyright (c) 1992-2001 Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------------------------


#include <streams.h>


//
//  COutputQueue Constructor :
//
//  Determines if a thread is to be created and creates resources
//
//     pInputPin  - the downstream input pin we're queueing samples to
//
//     phr        - changed to a failure code if this function fails
//                  (otherwise unchanges)
//
//     bAuto      - Ask pInputPin if it can block in Receive by calling
//                  its ReceiveCanBlock method and create a thread if
//                  it can block, otherwise not.
//
//     bQueue     - if bAuto == FALSE then we create a thread if and only
//                  if bQueue == TRUE
//
//     lBatchSize - work in batches of lBatchSize
//
//     bBatchEact - Use exact batch sizes so don't send until the
//                  batch is full or SendAnyway() is called
//
//     lListSize  - If we create a thread make the list of samples queued
//                  to the thread have this size cache
//
//     dwPriority - If we create a thread set its priority to this
//
COutputQueue::COutputQueue(
             IPin         *pInputPin,          //  Pin to send stuff to
             HRESULT      *phr,                //  'Return code'
             BOOL          bAuto,              //  Ask pin if queue or not
             BOOL          bQueue,             //  Send through queue
             LONG          lBatchSize,         //  Batch
             BOOL          bBatchExact,        //  Batch exactly to BatchSize
             LONG          lListSize,
             DWORD         dwPriority,
             bool          bFlushingOpt        // flushing optimization
            ) : m_lBatchSize(lBatchSize),
                m_bBatchExact(bBatchExact && (lBatchSize > 1)),
                m_hThread(NULL),
                m_hSem(NULL),
                m_List(NULL),
                m_pPin(pInputPin),
                m_ppSamples(NULL),
                m_lWaiting(0),
                m_pInputPin(NULL),
                m_bSendAnyway(FALSE),
                m_nBatched(0),
                m_bFlushing(FALSE),
                m_bFlushed(TRUE),
                m_bFlushingOpt(bFlushingOpt),
                m_bTerminate(FALSE),
                m_hEventPop(NULL),
                m_hr(S_OK) {
    ASSERT(m_lBatchSize > 0);


    if(FAILED(*phr)) {
        return;
    }

    //  Check the input pin is OK and cache its IMemInputPin interface

    *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
    if(FAILED(*phr)) {
        return;
    }

    // See if we should ask the downstream pin

    if(bAuto) {
        HRESULT hr = m_pInputPin->ReceiveCanBlock();
        if(SUCCEEDED(hr)) {
            bQueue = hr == S_OK;
        }
    }

    //  Create our sample batch

    m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
    if(m_ppSamples == NULL) {
        *phr = E_OUTOFMEMORY;
        return;
    }

    //  If we're queueing allocate resources

    if(bQueue) {
        DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
        m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
        if(m_hSem == NULL) {
            DWORD dwError = GetLastError();
            *phr = AmHresultFromWin32(dwError);
            return;
        }
        m_List = new CSampleList(NAME("Sample Queue List"),
            lListSize,
            FALSE         // No lock
            );
        if(m_List == NULL) {
            *phr = E_OUTOFMEMORY;
            return;
        }


        DWORD dwThreadId;
        m_hThread = CreateThread(NULL,
            0,
            InitialThreadProc,
            (LPVOID)this,
            0,
            &dwThreadId);
        if(m_hThread == NULL) {
            DWORD dwError = GetLastError();
            *phr = AmHresultFromWin32(dwError);
            return;
        }
        SetThreadPriority(m_hThread, dwPriority);
    }
    else {
        DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
    }
}

//
//  COutputQueuee Destructor :
//
//  Free all resources -
//
//      Thread,
//      Batched samples
//
COutputQueue::~COutputQueue() {
    DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
    /*  Free our pointer */
    if(m_pInputPin != NULL) {
        m_pInputPin->Release();
    }
    if(m_hThread != NULL) { {
            CAutoLock lck(this);
            m_bTerminate = TRUE;
            m_hr = S_FALSE;
            NotifyThread();
        }
        DbgWaitForSingleObject(m_hThread);
        EXECUTE_ASSERT(CloseHandle(m_hThread));

        //  The thread frees the samples when asked to terminate

        ASSERT(m_List->GetCount() == 0);
        delete m_List;
    }
    else {
        FreeSamples();
    }
    if(m_hSem != NULL) {
        EXECUTE_ASSERT(CloseHandle(m_hSem));
    }
    delete [] m_ppSamples;
}

//
//  Call the real thread proc as a member function
//
DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv) {
    HRESULT hrCoInit = CAMThread::CoInitializeHelper();

        COutputQueue *pSampleQueue = (COutputQueue *)pv;
    DWORD dwReturn = pSampleQueue->ThreadProc();

    if(hrCoInit == S_OK) {
        CoUninitialize();
    }

    return dwReturn;
}

//
//  Thread sending the samples downstream :
//
//  When there is nothing to do the thread sets m_lWaiting (while
//  holding the critical section) and then waits for m_hSem to be
//  set (not holding the critical section)
//
DWORD COutputQueue::ThreadProc() {
    while(TRUE) {
        BOOL          bWait = FALSE;
        IMediaSample *pSample=0;
        LONG          lNumberToSend=0; // Local copy
        NewSegmentPacket* ppacket=0;

        //
        //  Get a batch of samples and send it if possible
        //  In any case exit the loop if there is a control action
        //  requested
        //
        {
            CAutoLock lck(this);
            while(TRUE) {

                if(m_bTerminate) {
                    FreeSamples();
                    return 0;
                }
                if(m_bFlushing) {
                    FreeSamples();
                    SetEvent(m_evFlushComplete);
                }

                //  Get a sample off the list

                pSample = m_List->RemoveHead();
                // inform derived class we took something off the queue
                if(m_hEventPop) {
                    //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                    SetEvent(m_hEventPop);
                }

                if(pSample != NULL &&
                    !IsSpecialSample(pSample)) {

                    //  If its just a regular sample just add it to the batch
                    //  and exit the loop if the batch is full

                    m_ppSamples[m_nBatched++] = pSample;
                    if(m_nBatched == m_lBatchSize) {
                        break;
                    }
                }
                else {

                    //  If there was nothing in the queue and there's nothing
                    //  to send (either because there's nothing or the batch
                    //  isn't full) then prepare to wait

                    if(pSample == NULL &&
                        (m_bBatchExact || m_nBatched == 0)) {

                        //  Tell other thread to set the event when there's
                        //  something do to

                        ASSERT(m_lWaiting == 0);
                        m_lWaiting++;
                        bWait      = TRUE;
                    }
                    else {

                        //  We break out of the loop on SEND_PACKET unless
                        //  there's nothing to send

                        if(pSample == SEND_PACKET && m_nBatched == 0) {
                            continue;
                        }

                        if(pSample == NEW_SEGMENT) {
                            // now we need the parameters - we are
                            // guaranteed that the next packet contains them
                            ppacket = (NewSegmentPacket *) m_List->RemoveHead();
                            // we took something off the queue
                            if(m_hEventPop) {
                                //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                                SetEvent(m_hEventPop);
                            }

                            ASSERT(ppacket);
                        }
                        //  EOS_PACKET falls through here and we exit the loop
                        //  In this way it acts like SEND_PACKET
                    }
                    break;
                }
            }
            if(!bWait) {
                // We look at m_nBatched from the client side so keep
                // it up to date inside the critical section
                lNumberToSend = m_nBatched;  // Local copy
                m_nBatched = 0;
            }
        }

        //  Wait for some more data

        if(bWait) {
            DbgWaitForSingleObject(m_hSem);
            continue;
        }



        //  OK - send it if there's anything to send
        //  We DON'T check m_bBatchExact here because either we've got
        //  a full batch or we dropped through because we got
        //  SEND_PACKET or EOS_PACKET - both of which imply we should
        //  flush our batch

        if(lNumberToSend != 0) {
            long nProcessed;
            if(m_hr == S_OK) {
                ASSERT(!m_bFlushed);
                HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
                    lNumberToSend,
                    &nProcessed);
                /*  Don't overwrite a flushing state HRESULT */
                CAutoLock lck(this);
                if(m_hr == S_OK) {
                    m_hr = hr;
                }
                ASSERT(!m_bFlushed);
            }
            while(lNumberToSend != 0) {
                m_ppSamples[--lNumberToSend]->Release();
            }
            if(m_hr != S_OK) {

                //  In any case wait for more data - S_OK just
                //  means there wasn't an error

                DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
                    m_hr));
            }
        }

        //  Check for end of stream

        if(pSample == EOS_PACKET) {

            //  We don't send even end of stream on if we've previously
            //  returned something other than S_OK
            //  This is because in that case the pin which returned
            //  something other than S_OK should have either sent
            //  EndOfStream() or notified the filter graph

            if(m_hr == S_OK) {
                DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
                HRESULT hr = m_pPin->EndOfStream();
                if(FAILED(hr)) {
                    DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
                }
            }
        }

        //  Data from a new source

        if(pSample == RESET_PACKET) {
            m_hr = S_OK;
            SetEvent(m_evFlushComplete);
        }

        if(pSample == NEW_SEGMENT) {
            m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
            delete ppacket;
        }
    }
}

//  Send batched stuff anyway
void COutputQueue::SendAnyway() {
    if(!IsQueued()) {

        //  m_bSendAnyway is a private parameter checked in ReceiveMultiple

        m_bSendAnyway = TRUE;
        LONG nProcessed;
        ReceiveMultiple(NULL, 0, &nProcessed);
        m_bSendAnyway = FALSE;

    }
    else {
        CAutoLock lck(this);
        QueueSample(SEND_PACKET);
        NotifyThread();
    }
}

void
COutputQueue::NewSegment(
    REFERENCE_TIME tStart,
    REFERENCE_TIME tStop,
    double dRate) {
    if(!IsQueued()) {
        if(S_OK == m_hr) {
            if(m_bBatchExact) {
                SendAnyway();
            }
            m_pPin->NewSegment(tStart, tStop, dRate);
        }
    }
    else {
        if(m_hr == S_OK) {
            //
            // we need to queue the new segment to appear in order in the
            // data, but we need to pass parameters to it. Rather than
            // take the hit of wrapping every single sample so we can tell
            // special ones apart, we queue special pointers to indicate
            // special packets, and we guarantee (by holding the
            // critical section) that the packet immediately following a
            // NEW_SEGMENT value is a NewSegmentPacket containing the
            // parameters.
            NewSegmentPacket * ppack = new NewSegmentPacket;
            if(ppack == NULL) {
                return;
            }
            ppack->tStart = tStart;
            ppack->tStop = tStop;
            ppack->dRate = dRate;

            CAutoLock lck(this);
            QueueSample(NEW_SEGMENT);
            QueueSample((IMediaSample*) ppack);
            NotifyThread();
        }
    }
}


//
//  End of Stream is queued to output device
//
void COutputQueue::EOS() {
    CAutoLock lck(this);
    if(!IsQueued()) {
        if(m_bBatchExact) {
            SendAnyway();
        }
        if(m_hr == S_OK) {
            DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
            m_bFlushed = FALSE;
            HRESULT hr = m_pPin->EndOfStream();
            if(FAILED(hr)) {
                DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
            }
        }
    }
    else {
        if(m_hr == S_OK) {
            m_bFlushed = FALSE;
            QueueSample(EOS_PACKET);
            NotifyThread();
        }
    }
}

//
//  Flush all the samples in the queue
//
void COutputQueue::BeginFlush() {
    if(IsQueued()) { {
            CAutoLock lck(this);

            // block receives -- we assume this is done by the
            // filter in which we are a component

            // discard all queued data

            m_bFlushing = TRUE;

            //  Make sure we discard all samples from now on

            if(m_hr == S_OK) {
                m_hr = S_FALSE;
            }

            // Optimize so we don't keep calling downstream all the time

            if(m_bFlushed && m_bFlushingOpt) {
                return;
            }

            // Make sure we really wait for the flush to complete
            m_evFlushComplete.Reset();

            NotifyThread();
        }

        // pass this downstream

        m_pPin->BeginFlush();
    }
    else {
        // pass downstream first to avoid deadlocks
        m_pPin->BeginFlush();
        CAutoLock lck(this);
        // discard all queued data

        m_bFlushing = TRUE;

        //  Make sure we discard all samples from now on

        if(m_hr == S_OK) {
            m_hr = S_FALSE;
        }
    }

}

//
// leave flush mode - pass this downstream
void COutputQueue::EndFlush() { {
        CAutoLock lck(this);
        ASSERT(m_bFlushing);
        if(m_bFlushingOpt && m_bFlushed && IsQueued()) {
            m_bFlushing = FALSE;
            m_hr = S_OK;
            return;
        }
    }

    // sync with pushing thread -- done in BeginFlush
    // ensure no more data to go downstream -- done in BeginFlush
    //
    // Because we are synching here there is no need to hold the critical
    // section (in fact we'd deadlock if we did!)

    if(IsQueued()) {
        m_evFlushComplete.Wait();
    }
    else {
        FreeSamples();
    }

    //  Be daring - the caller has guaranteed no samples will arrive
    //  before EndFlush() returns

    m_bFlushing = FALSE;
    m_bFlushed  = TRUE;

    // call EndFlush on downstream pins

    m_pPin->EndFlush();

    m_hr = S_OK;
}

//  COutputQueue::QueueSample
//
//  private method to Send a sample to the output queue
//  The critical section MUST be held when this is called

void COutputQueue::QueueSample(IMediaSample *pSample) {
    if(NULL == m_List->AddTail(pSample)) {
        if(!IsSpecialSample(pSample)) {
            pSample->Release();
        }
    }
}

//
//  COutputQueue::Receive()
//
//  Send a single sample by the multiple sample route
//  (NOTE - this could be optimized if necessary)
//
//  On return the sample will have been Release()'d
//

HRESULT COutputQueue::Receive(IMediaSample *pSample) {
    LONG nProcessed;
    return ReceiveMultiple(&pSample, 1, &nProcessed);
}

//
//  COutputQueue::ReceiveMultiple()
//
//  Send a set of samples to the downstream pin
//
//      ppSamples           - array of samples
//      nSamples            - how many
//      nSamplesProcessed   - How many were processed
//
//  On return all samples will have been Release()'d
//

HRESULT COutputQueue::ReceiveMultiple(
    IMediaSample **ppSamples,
    long nSamples,
    long *nSamplesProcessed) {
    CAutoLock lck(this);
    //  Either call directly or queue up the samples

    if(!IsQueued()) {

        //  If we already had a bad return code then just return

        if(S_OK != m_hr) {

            //  If we've never received anything since the last Flush()
            //  and the sticky return code is not S_OK we must be
            //  flushing
            //  ((!A || B) is equivalent to A implies B)
            ASSERT(!m_bFlushed || m_bFlushing);

            //  We're supposed to Release() them anyway!
            *nSamplesProcessed = 0;
            for(int i = 0; i < nSamples; i++) {
                DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
                    nSamples, m_hr));
                ppSamples[i]->Release();
            }

            return m_hr;
        }
        //
        //  If we're flushing the sticky return code should be S_FALSE
        //
        ASSERT(!m_bFlushing);
        m_bFlushed = FALSE;

        ASSERT(m_nBatched < m_lBatchSize);
        ASSERT(m_nBatched == 0 || m_bBatchExact);

        //  Loop processing the samples in batches

        LONG iLost = 0;
        for(long iDone = 0;
            iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);) {

            //pragma message (REMIND("Implement threshold scheme"))
            ASSERT(m_nBatched < m_lBatchSize);
            if(iDone < nSamples) {
                m_ppSamples[m_nBatched++] = ppSamples[iDone++];
            }
            if(m_nBatched == m_lBatchSize ||
                nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
                LONG nDone;
                DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
                    m_nBatched));

                if(m_hr == S_OK) {
                    m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
                        m_nBatched,
                        &nDone);
                }
                else {
                    nDone = 0;
                }
                iLost += m_nBatched - nDone;
                for(LONG i = 0; i < m_nBatched; i++) {
                    m_ppSamples[i]->Release();
                }
                m_nBatched = 0;
            }
        }
        *nSamplesProcessed = iDone - iLost;
        if(*nSamplesProcessed < 0) {
            *nSamplesProcessed = 0;
        }
        return m_hr;
    }
    else {
        /*  We're sending to our thread */

        if(m_hr != S_OK) {
            *nSamplesProcessed = 0;
            DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
                nSamples, m_hr));
            for(int i = 0; i < nSamples; i++) {
                ppSamples[i]->Release();
            }
            return m_hr;
        }
        m_bFlushed = FALSE;
        for(long i = 0; i < nSamples; i++) {
            QueueSample(ppSamples[i]);
        }
        *nSamplesProcessed = nSamples;
        if(!m_bBatchExact ||
            m_nBatched + m_List->GetCount() >= m_lBatchSize) {
            NotifyThread();
        }
    }

    return S_OK;
}

//  Get ready for new data - cancels sticky m_hr
void COutputQueue::Reset() {
    if(!IsQueued()) {
        m_hr = S_OK;
    }
    else {
        CAutoLock lck(this);
        QueueSample(RESET_PACKET);
        NotifyThread();
        m_evFlushComplete.Wait();
    }
}

//  Remove and Release() all queued and Batched samples
void COutputQueue::FreeSamples() {
    CAutoLock lck(this);
    if(IsQueued()) {
        while(TRUE) {
            IMediaSample *pSample = m_List->RemoveHead();
            // inform derived class we took something off the queue
            if(m_hEventPop) {
                //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                SetEvent(m_hEventPop);
            }

            if(pSample == NULL) {
                break;
            }
            if(!IsSpecialSample(pSample)) {
                pSample->Release();
            }
            else {
                if(pSample == NEW_SEGMENT) {
                    //  Free NEW_SEGMENT packet
                    NewSegmentPacket *ppacket =
                        (NewSegmentPacket *) m_List->RemoveHead();
                    // inform derived class we took something off the queue
                    if(m_hEventPop) {
                        //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                        SetEvent(m_hEventPop);
                    }

                    ASSERT(ppacket != NULL);
                    delete ppacket;
                }
            }
        }
    }
    for(int i = 0; i < m_nBatched; i++) {
        m_ppSamples[i]->Release();
    }
    m_nBatched = 0;
}

//  Notify the thread if there is something to do
//
//  The critical section MUST be held when this is called
void COutputQueue::NotifyThread() {
    //  Optimize - no need to signal if it's not waiting
    ASSERT(IsQueued());
    if(m_lWaiting) {
        ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
        m_lWaiting = 0;
    }
}

//  See if there's any work to do
//  Returns
//      TRUE  if there is nothing on the queue and nothing in the batch
//            and all data has been sent
//      FALSE otherwise
//
BOOL COutputQueue::IsIdle() {
    CAutoLock lck(this);

    //  We're idle if
    //      there is no thread (!IsQueued()) OR
    //      the thread is waiting for more work  (m_lWaiting != 0)
    //  AND
    //      there's nothing in the current batch (m_nBatched == 0)

    if(IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
        return FALSE;
    }
    else {

        //  If we're idle it shouldn't be possible for there
        //  to be anything on the work queue
        ASSERT(!IsQueued() || m_List->GetCount() == 0);
    }

    return TRUE;
}


void COutputQueue::SetPopEvent(HANDLE hEvent) {
    m_hEventPop = hEvent;
}

