VTK/Parallel Pipeline
Introduction
VTK uses a form of parallelism called data parallelism. In this form, the data is divided amongst the processes, and each process performs the same operation on its piece of data. Advantages of data parallelism include scalability, simplified load balancing, and reduced communications overhead.
Figure 1 shows how VTK filters can be setup when running in parallel. In this example, each reader reads in a piece of the data. Usually, this can be done with very little communication between processes. Then, each reader feeds into a pipeline identical with that on the other processes. Because many of the filters in VTK use algorithms that independently process each point or cell, these filters can run in parallel with little or no communication between them. Let us see a simple example of how that works.
Consider the simple, 2D grid that is partitioned into three pieces shown in Figure 2. Assume that the three pieces reside on separate processes of a distributed memory computer. Because no process has global information, communications costs will be minimized if each process performs its operation only on its local information. Of course, we can only do this if the end result of the parallel operation is equivalent to the same operation on the global data.
Figure 3 demonstrates a clip filter (vtkClipDataSet for example) processing data in parallel. Each process is given the same parameters for the cut plane. The cut plane is then applied independently on each piece of the data. When the pieces are brought back together, we see that the result is the entire data set properly cut by the plane.
Not all visualization algorithms can operate on pieces without information on neighboring cells. Consider the operation of extracting external faces as shown in Figure 4. The external face operation identifies all faces that have no local neighbors. When we bring the pieces together, we see that some internal faces have been incorrectly identified as being external. These false positives on the faces occur whenever two neighboring cells are placed in separate processes.
The external face operation in our example fails because some important global information is missing from the local processing. The processes need some data that is not local to them, but they do not need all the data. They only need to know about cells in other partitions that are neighbors to the local cells.
We can solve this local/global problem with the introduction of ghost cells. Ghost cells are cells that belong to one partition of the data and are duplicated on other partitions. The introduction of ghost cells is performed through neighborhood information and organized in levels. For a given partition, any cell neighboring a cell in the partition but not belonging to the partition itself is a ghost cell 1. Any cell neighboring a ghost cell at level 1 that does not belong to level 1 or the original partition is at level 2. Further levels are defined recursively. We define ghost cells in this way because it provides a simple distance metric to the cells of a partition and allows filters to easily specify the minimal or near minimal set of ghost cells required for proper operation.
Let us apply the use of ghost cells to our example of extracting external faces. Figure 5 shows the same partitions with a layer of ghost cells added. When the external face algorithm is run again, some faces are still inappropriately classified as external. However, all of these faces are attached to ghost cells. These ghost faces are easily culled, and the end result is the appropriate external faces.
VTK Pipeline Support for Data Parallelism
Demand-driven data parallelism is natively supported by VTK's execution mechanism. This is achieved by utilizing various pipeline passes and a specific set of meta-data and request objects. An introduction to VTK's pipeline can be found in the VTK User's Guide and this page. Unless you are familiar with the VTK pipeline, we recommend taking a look at these documents before continuing with this one. Also note that certain keys pertaining to parallelism have changed since VTK 6.1 and are described in more detail here.
The three main pipeline passes pertaining to data parallelism are RequestInformation, RequestUpdateExtent and RequestData:
RequestInformation
This is where data sources (e.g. readers) provide meta-data about their capabilities and what data they can produce. Filters downstream can also modify this meta-data when they can add/reduce capability or change what data can be made available downstream. This pass can be usually ignored with respect to data parallelism. The only exception is that readers that can produce data in a partitioned way need to notify the pipeline by providing the CAN_HANDLE_PIECE_REQUEST() key as follows:
<source lang="cpp"> int vtkSphereSource::RequestInformation(
vtkInformation *vtkNotUsed(request), vtkInformationVector **vtkNotUsed(inputVector), vtkInformationVector *outputVector)
{
// get the info object vtkInformation *outInfo = outputVector->GetInformationObject(0);
outInfo->Set(CAN_HANDLE_PIECE_REQUEST(), 1);
return 1;
} </source>
A serial reader should not set this key. This case is described in more detail below.
RequestUpdateExtent
This pass is where consumers can request a specific subset of the available data from upstream. Upstream filters can further modify this request to fit their needs. There are 3 specific keys that are used to implement data parallelism:
- UPDATE_NUMBER_OF_PIECES: This key together with UPDATE_PIECE_NUMBER controls how data should be partitioned by the data source. It is usually set equal to the number of MPI ranks in the current MPI group.
- UPDATE_PIECE_NUMBER: This key determines which partition should be loaded on the current process. It is usually set to the MPI rank of the current process.
- UPDATE_NUMBER_OF_GHOST_LEVELS: This key determines the number of ghost levels requested by a particular filter. Filters should usually increment the number of ghost levels requested by downstream by the number of ghost levels they need.
These keys are usually set by a data consumer and possibly modified by filters upstream. Usually UPDATE_NUMBER_OF_GHOST_LEVELS is the only one modified by filters. It is also possible to set them manually as follows:
<source lang="cpp">
afilter->UpdateInformation(); vtkInformation* outInfo = afilter->GetOutputInformation(0); outInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(), controller->GetLocalProcessId()); outInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES(), controller->GetNumberOfProcesses()); outInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS(), 0);
// or in short
afilter->UpdateInformation(); vtkInformation* outInfo = afilter->GetOutputInformation(0); vtkStreamingDemandDrivenPipeline::SetUpdateExtent(outInfo, controller->GetLocalProcessId(), controller->GetNumberOfProcesses(), 0);
</source>
More commonly, partitioning information is controlled by data members of a data sink such as a mapper or a writer. For example, vtkPolyDataMapper can be used to control partitioning as follows:
<source lang="cpp">
mapper->SetPiece(controller->GetLocalProcessId()); mapper->SetNumberOfPieces(controller->GetNumberOfProcesses());
renWin->Render();
</source>
Note that some readers do not support reading data in parallel. Such readers do not set the CAN_HANDLE_PIECE_REQUEST() meta-data during RequestInformation(). When this key is not set, the pipeline will ask the reader to provide the whole data for UPDATE_PIECE_NUMBER() == 0 and empty data for all other pieces. If parallelism is to be achieved with a serial reader, the developer needs to use a filter such as D3 or vtkTransmitXXXPiece() to partition data after a serial read.
Also note that these keys are automatically copied by the pipeline upstream (except to a reader that does not support parallel reading) so filtes do not need to implement RequestUpdateExtent() unless they want to modify the values requested by downstream.
RequestData
This is the pass where algorithms produce data based on the 3 update keys described above. A parallel reader is free to partition the data in any way it chooses as long as each partition is unique with the exception of ghost levels. Embarrasingly parallel filters do not need to worry about these keys. Also note that filters that request ghost levels are not expected to remove ghost levels from their output.
Structured Data Readers and Filters
The VTK pipeline has the ability to request a subset of a structured dataset from a data source to further reduce I/O or data generation overhead. All structured data (vtkImageData, vtkUniformGrid, vtkRectilinearGrid and vtkStructuredGrid) readers are required to produce a WHOLE_EXTENT value during RequestInformation(), this provides information about the maximum logical extents a reader can read or produce. Data sinks or filters can ask for a subset of this whole extent by setting the UPDATE_EXTENT key during RequestUpdateExtent(). Then data sources can use this key to minimize data they have to produce. Readers that are capable of honoring the UPDATE_EXTENT() request should set the CAN_PRODUCE_SUB_EXTENT() during RequestInformation().
It is very important to note that the interaction between update pieces and extents changed in VTK 6.2. In 6.2, update piece and update extent requests are treated independently except by data sources. This means that a filter should make changes to UPDATE_EXTENT independent of which piece they are processing and expect that they will receive a subset of the update extent if the number of pieces is larger than 1. This is because readers are expected to partition the UPDATE_EXTENT further based on the piece request they received. The pipeline handles this automatically for readers that set CAN_PRODUCE_SUB_EXTENT() and passes them the appropriate UPDATE_EXTENT() using the following code:
<source lang="cpp">
if (outInfo->Has(vtkAlgorithm::CAN_PRODUCE_SUB_EXTENT())) { int piece = outInfo->Get(UPDATE_PIECE_NUMBER()); int ghost = outInfo->Get(UPDATE_NUMBER_OF_GHOST_LEVELS());
vtkExtentTranslator* et = vtkExtentTranslator::New(); int execExt[6]; et->PieceToExtentThreadSafe(piece, numPieces, ghost, uExt, execExt, vtkExtentTranslator::BLOCK_MODE, 0); et->Delete(); outInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_EXTENT(), execExt, 6); }
</source>
Such readers can safely ignore all piece and ghost level requests. Note that a structured reader may want to handle all data partitioning internally to optimize I/O (for example when using MPI I/O). Such readers should not set CAN_PRODUCE_SUB_EXTENT() but set CAN_HANDLE_PIECE_REQUEST() and handle both UPDATE_EXTENT() and pieces/ghosts internally.
Note: Handling of UPDATE_EXTENT() is always optional for all readers. Readers that for some reason cannot read a subset of structured data can choose to produce data larger than UPDATE_EXTENT. On the other hand, filters that set CAN_HANDLE_PIECE_REQUEST() cannot ignore piece requests and have to produce unique datasets based on the piece request.
Structured data filters need to handle the case where the extent of the data they receive in RequestData() is different than the update extent they requested if they are expected to work properly in data parallel mode. Note that as of VTK 6.2, most imaging filters cannot handle this case and cannot be used in data parallel pipelines.