Skip to content

Fix: stop the process even if the wrapper expects more data #322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/AvTranscoder/file/OutputFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ bool OutputFile::beginWrap()
IOutputStream::EWrappingStatus OutputFile::wrap(const CodedData& data, const size_t streamIndex)
{
if(!data.getSize())
return IOutputStream::eWrappingSuccess;
return IOutputStream::eWrappingSkip;

LOG_DEBUG("Wrap on stream " << streamIndex << " (" << data.getSize() << " bytes for frame "
<< _frameCount.at(streamIndex) << ")")
Expand Down
7 changes: 4 additions & 3 deletions src/AvTranscoder/stream/IOutputStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ class AvExport IOutputStream
**/
enum EWrappingStatus
{
eWrappingSuccess = 0,
eWrappingWaitingForData,
eWrappingError,
eWrappingSuccess = 0, ///< The wrapping succeeded
eWrappingWaitingForData, ///< The wrapper expects more data to complete the writing process
eWrappingSkip, ///< The wrapper receives empty data, so nothing is written
eWrappingError, ///< An error occurred during the wrapping process
};

virtual ~IOutputStream(){};
Expand Down
38 changes: 7 additions & 31 deletions src/AvTranscoder/transcoder/StreamTranscoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ void StreamTranscoder::preProcessCodecLatency()
_currentDecoder = NULL;
}

bool StreamTranscoder::processFrame()
IOutputStream::EWrappingStatus StreamTranscoder::processFrame()
{
std::string msg = "Current process case of the stream is a ";
switch(getProcessCase())
Expand Down Expand Up @@ -677,7 +677,7 @@ bool StreamTranscoder::processFrame()
return processTranscode();
}

bool StreamTranscoder::processRewrap()
IOutputStream::EWrappingStatus StreamTranscoder::processRewrap()
{
assert(_inputStreams.size() == 1);
assert(_outputStream != NULL);
Expand All @@ -692,25 +692,13 @@ bool StreamTranscoder::processRewrap()
switchToGeneratorDecoder();
return processTranscode();
}
return false;
return IOutputStream::eWrappingError;
}

const IOutputStream::EWrappingStatus wrappingStatus = _outputStream->wrap(data);
switch(wrappingStatus)
{
case IOutputStream::eWrappingSuccess:
return true;
case IOutputStream::eWrappingWaitingForData:
// the wrapper needs more data to write the current packet
return processFrame();
case IOutputStream::eWrappingError:
return false;
}

return true;
return _outputStream->wrap(data);
}

bool StreamTranscoder::processTranscode()
IOutputStream::EWrappingStatus StreamTranscoder::processTranscode()
{
assert(_outputStream != NULL);
assert(_currentDecoder != NULL);
Expand Down Expand Up @@ -840,25 +828,13 @@ bool StreamTranscoder::processTranscode()
}
return processTranscode();
}
return false;
return IOutputStream::eWrappingError;
}
}

// Wrap
LOG_DEBUG("wrap (" << data.getSize() << " bytes)")
const IOutputStream::EWrappingStatus wrappingStatus = _outputStream->wrap(data);
switch(wrappingStatus)
{
case IOutputStream::eWrappingSuccess:
return true;
case IOutputStream::eWrappingWaitingForData:
// the wrapper needs more data to write the current packet
return processFrame();
case IOutputStream::eWrappingError:
return false;
}

return true;
return _outputStream->wrap(data);
}

void StreamTranscoder::switchToGeneratorDecoder()
Expand Down
6 changes: 3 additions & 3 deletions src/AvTranscoder/transcoder/StreamTranscoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class AvExport StreamTranscoder
* @brief process a single frame for the current stream
* @return the process status result
*/
bool processFrame();
IOutputStream::EWrappingStatus processFrame();

//@{
// Switch current decoder.
Expand Down Expand Up @@ -139,8 +139,8 @@ class AvExport StreamTranscoder
void addDecoder(const InputStreamDesc& inputStreamDesc, IInputStream& inputStream);
void addGenerator(const InputStreamDesc& inputStreamDesc, const ProfileLoader::Profile& profile);

bool processRewrap();
bool processTranscode();
IOutputStream::EWrappingStatus processRewrap();
IOutputStream::EWrappingStatus processTranscode();

private:
std::vector<InputStreamDesc> _inputStreamDesc; ///< Description of the data to extract from the input stream.
Expand Down
103 changes: 71 additions & 32 deletions src/AvTranscoder/transcoder/Transcoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Transcoder::Transcoder(IOutputFile& outputFile)
, _profileLoader(true)
, _eProcessMethod(eProcessMethodLongest)
, _mainStreamIndex(0)
, _processedFrames(0)
, _outputDuration(0)
{
}
Expand Down Expand Up @@ -159,27 +160,86 @@ void Transcoder::preProcessCodecLatency()
}

bool Transcoder::processFrame()
{
NoDisplayProgress progress;
return processFrame(progress);
}

bool Transcoder::processFrame(IProgress& progress)
{
if(_streamTranscoders.size() == 0)
return false;

// For each stream, process a frame
bool result = true;
for(size_t streamIndex = 0; streamIndex < _streamTranscoders.size(); ++streamIndex)
{
LOG_DEBUG("Process stream " << streamIndex + 1 << "/" << _streamTranscoders.size())
if(!processFrame(progress, streamIndex))
result = false;
}
return result;
}

// if a stream failed to process
if(!_streamTranscoders.at(streamIndex)->processFrame())
{
bool Transcoder::processFrame(IProgress& progress, const size_t& streamIndex)
{
LOG_DEBUG("Process stream " << streamIndex + 1 << "/" << _streamTranscoders.size())

IOutputStream::EWrappingStatus status = _streamTranscoders.at(streamIndex)->processFrame();
switch(status)
{
case IOutputStream::eWrappingSuccess:
if(streamIndex == 0)
_processedFrames++;

if(!continueProcess(progress))
return false;
return true;

case IOutputStream::eWrappingWaitingForData:
// the wrapper needs more data to write the current packet
if(streamIndex == 0)
_processedFrames++;

if(!continueProcess(progress))
return false;

return processFrame(progress, streamIndex);

case IOutputStream::eWrappingSkip:
return true;

case IOutputStream::eWrappingError:
// if a stream failed to process
LOG_WARN("Failed to process the stream transcoder at index " << streamIndex)

// if this is the end of the main stream
if(streamIndex == _mainStreamIndex) {
if(streamIndex == _mainStreamIndex)
LOG_INFO("End of process because the main stream at index " << _mainStreamIndex << " failed to process a new frame.")
return false;
}
}

return false;
}
}

bool Transcoder::continueProcess(IProgress& progress) {
const float expectedOutputDuration = getExpectedOutputDuration();
const float progressDuration = getCurrentOutputDuration();

// check if JobStatusCancel
if(progress.progress((progressDuration > expectedOutputDuration) ? expectedOutputDuration : progressDuration,
expectedOutputDuration) == eJobStatusCancel)
{
LOG_INFO("End of process because the job was canceled.")
return false;
}

// check progressDuration
if(_eProcessMethod == eProcessMethodBasedOnDuration && progressDuration >= expectedOutputDuration)
{
LOG_INFO("End of process because the output program duration ("
<< progressDuration << "s) is equal or upper than " << expectedOutputDuration << "s.")
return false;
}

return true;
}

Expand All @@ -205,36 +265,15 @@ ProcessStat Transcoder::process(IProgress& progress)
const float expectedOutputDuration = getExpectedOutputDuration();
LOG_INFO("The expected output duration of the program will be " << expectedOutputDuration << "s.")

size_t frame = 0;
bool frameProcessed = true;
while(frameProcessed)
{
LOG_DEBUG("Process frame " << frame)
frameProcessed = processFrame();
++frame;

const float progressDuration = getCurrentOutputDuration();

// check if JobStatusCancel
if(progress.progress((progressDuration > expectedOutputDuration) ? expectedOutputDuration : progressDuration,
expectedOutputDuration) == eJobStatusCancel)
{
LOG_INFO("End of process because the job was canceled.")
break;
}

// check progressDuration
if(_eProcessMethod == eProcessMethodBasedOnDuration && progressDuration >= expectedOutputDuration)
{
LOG_INFO("End of process because the output program duration ("
<< progressDuration << "s) is equal or upper than " << expectedOutputDuration << "s.")
break;
}
LOG_INFO("Process frame " << _processedFrames);
frameProcessed = processFrame(progress);
}

_outputFile.endWrap();

LOG_INFO("End of process: " << ++frame << " frames processed")
LOG_INFO("End of process: " << ++_processedFrames << " frames processed")

LOG_INFO("Get process statistics")
ProcessStat processStat;
Expand Down
26 changes: 21 additions & 5 deletions src/AvTranscoder/transcoder/Transcoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ class AvExport Transcoder

/**
* @brief Process the next frame of all streams.
* @param progress: choose a progress, or create your own in C++ or in bindings by inherit IProgress class.
* @return if a frame was processed or not.
*/
bool processFrame();
bool processFrame(IProgress& progress);
bool processFrame(); ///< Call processFrame with no display of progression

/**
* @brief Process all the streams, and ended the process depending on the transcode politic.
Expand Down Expand Up @@ -190,6 +192,19 @@ class AvExport Transcoder
*/
void manageSwitchToGenerator();

/**
* @brief Process the next frame of the specified stream.
* @return whether a frame was processed or not.
*/
bool processFrame(IProgress& progress, const size_t& streamIndex);

/**
* @brief Check whether the process is canceled or not, and whether the process reached the ending condition.
* @note The progress is updated in this function.
* @return whether the process must continue or stop.
*/
bool continueProcess(IProgress& progress);

/**
* @brief Fill the given ProcessStat to summarize the process.
*/
Expand All @@ -205,10 +220,11 @@ class AvExport Transcoder
ProfileLoader _profileLoader; ///< Objet to get existing profiles, and add new ones for the Transcoder.

EProcessMethod _eProcessMethod; ///< Processing policy
size_t
_mainStreamIndex; ///< Index of stream used to stop the process.
float _outputDuration; ///< Duration of output media used to stop the process of transcode in case of
/// eProcessMethodBasedOnDuration.

size_t _mainStreamIndex; ///< Index of stream used to stop the process.
size_t _processedFrames; ///< Counter for the number of processed frames.

float _outputDuration; ///< Duration of output media used to stop the process of transcode in case of eProcessMethodBasedOnDuration.
};
}

Expand Down