VTK/Streaming
Streaming pieces
Streaming a number of pieces is pretty straightforward. Here is an example
<source lang="cpp"> alg->UpdateInformation(); vtkStreamingDemandDrivenPipeline* exec = vtkStreamingDemandDrivenPipeline::SafeDownCast(alg->GetExecutive()); if (!exec || exec->GetMaximumNumberOfPieces() < 10)
{ die(); }
for (int i=0; i<10; i++)
{ exec->SetUpdateExtent(0, i, 10, 0); alg->Update(); // do something with alg->GetOutputDataObject(0) }
</source>
This example streams a pipeline that ends with the algorithm alg in 10 pieces. The best example of streaming pieces is vtkPolyDataMapper:
<source lang="cpp"> void vtkPolyDataMapper::Render(vtkRenderer *ren, vtkActor *act) {
// ...
nPieces = this->NumberOfPieces * this->NumberOfSubPieces;
for(int i=0; i<this->NumberOfSubPieces; i++) { // If more than one pieces, render in loop. currentPiece = this->NumberOfSubPieces * this->Piece + i; input->SetUpdateExtent(currentPiece, nPieces, this->GhostLevel); this->RenderPiece(ren, act); }
} </source>
For a good example of streaming extents, see vtkMemoryLimitImageDataStreamer and its superclass vtkImageDataStreamer.
Streaming time steps
This example demonstrates how a filter can stream multiple time steps. The example we will use to demonstrate this functionality is vtkTemporalStatistics.
Streaming happens over multiple executions of the filter. Therefore, we need a way of keeping track of which time step we are currently processing. For this purpose, vtkTemporalStatistics uses an data member called CurrentTimeIndex. CurrentTimeIndex is initialized to 0 in the constructor.
<source lang="cpp"> vtkTemporalStatistics::vtkTemporalStatistics() {
this->CurrentTimeIndex = 0;
} </source>
In RequestUpdateExtent(), we request the current time index. This is initially set to 0 but will be incremented after the streaming starts.
<source lang="cpp"> int vtkTemporalStatistics::RequestUpdateExtent(
vtkInformation *vtkNotUsed(request), vtkInformationVector **inputVector, vtkInformationVector *vtkNotUsed(outputVector))
{
vtkInformation *inInfo = inputVector[0]->GetInformationObject(0);
double *inTimes = inInfo->Get(vtkStreamingDemandDrivenPipeline::TIME_STEPS()); if (inTimes) { inInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_TIME_STEPS(), &inTimes[this->CurrentTimeIndex], 1); }
return 1;
} </source>
In RequestData(), we first process the current time step. We then increment the current time index and set CONTINUE_EXECUTING if there are more time steps to process. This tells the executive to perform another pass of the REQUEST_UPDATE_EXTENT and REQUEST_DATA. This will lead to another call to RequestUpdateExtent() and RequestData() with the next time index. This will continue until the filter removes CONTINUE_EXECUTING from the request.
<source lang="cpp"> int vtkTemporalStatistics::RequestData(vtkInformation *request,
vtkInformationVector **inputVector, vtkInformationVector *outputVector)
{
vtkInformation *inInfo = inputVector[0]->GetInformationObject(0); vtkInformation *outInfo = outputVector->GetInformationObject(0);
vtkDataObject *input = vtkDataObject::GetData(inInfo); vtkDataObject *output = vtkDataObject::GetData(outInfo);
if (this->CurrentTimeIndex == 0) { // First execution, initialize arrays. this->InitializeStatistics(input, output); } else { // Subsequent execution, accumulate new data. this->AccumulateStatistics(input, output); }
this->CurrentTimeIndex++;
if ( this->CurrentTimeIndex < inInfo->Length(vtkStreamingDemandDrivenPipeline::TIME_STEPS())) { // There is still more to do. request->Set(vtkStreamingDemandDrivenPipeline::CONTINUE_EXECUTING(), 1); } else { // We are done. Finish up. this->PostExecute(input, output); request->Remove(vtkStreamingDemandDrivenPipeline::CONTINUE_EXECUTING()); this->CurrentTimeIndex = 0; }
return 1;
} </source>