r"""
This module is designed for use in co-processing Python scripts. It provides a
class, Pipeline, which is designed to be used as the base-class for Python
pipeline. Additionally, this module has several other utility functions that are
appropriate for co-processing.
"""
from paraview import simple, servermanager
from paraview.detail import exportnow
# If the user created a filename in a location that doesn't exist by default we'll
# make the directory for them. This can be changed though by setting createDirectoriesIfNeeded
# to False.
createDirectoriesIfNeeded = True
# -----------------------------------------------------------------------------
[docs]class CoProcessor(object):
"""Base class for co-processing Pipelines.
paraview.cpstate Module can be used to dump out ParaView states as
co-processing pipelines. Those are typically subclasses of this. The
subclasses must provide an implementation for the CreatePipeline() method.
**Cinema Tracks**
CoProcessor maintains user-defined information for the Cinema generation in
__CinemaTracks. This information includes track parameter values, data array
names, etc. __CinemaTracks holds this information in the following structure::
{
proxy_reference : {
'ControlName' : [value_1, value_2, ..., value_n],
'arraySelection' : ['ArrayName_1', ..., 'ArrayName_n']
}
}
__CinemaTracks is populated when defining the co-processing pipline through
paraview.cpstate. paraview.cpstate uses accessor instances to set values and
array names through the RegisterCinemaTrack and AddArrayssToCinemaTrack
methods of this class.
"""
def __init__(self):
self.__PipelineCreated = False
# __ProducersMap will have a list of the producers/channels that need to be
# created by the adaptor for this pipeline. The adaptor may be able to generate
# other channels as well though.
self.__ProducersMap = {}
self.__WritersList = []
self.__ViewsList = []
self.__EnableLiveVisualization = False
self.__LiveVisualizationFrequency = 1;
self.__LiveVisualizationLink = None
# __CinemaTracksList is just for Spec-A compatibility (will be deprecated
# when porting Spec-A to pv_introspect. Use __CinemaTracks instead.
self.__CinemaTracksList = []
self.__CinemaTracks = {}
self.__InitialFrequencies = {}
self.__PrintEnsightFormatString = False
self.__TimeStepToStartOutputAt = 0
self.__ForceOutputAtFirstCall = False
self.__FirstTimeStepIndex = None
# a list of arrays requested for each channel, e.g. {'input': ["a point data array name", 0], ["a cell data array name", 1]}
self.__RequestedArrays = None
self.__ImageRootDirectory = ""
self.__DataRootDirectory = ""
self.__CinemaDHelper = None
[docs] def SetUpdateFrequencies(self, frequencies):
"""Set the frequencies at which the pipeline needs to be updated.
Typically, this is called by the subclass once it has determined what
timesteps co-processing will be needed to be done.
frequencies is a map, with key->string name of for the simulation
input, and value is a list of frequencies.
"""
if type(frequencies) != dict:
raise RuntimeError(
"Incorrect argument type: %s, must be a dict" % type(frequencies))
self.__InitialFrequencies = frequencies
[docs] def SetRequestedArrays(self, channelname, requestedarrays):
"""Set which arrays this script will request from the adaptor for the given channel name.
"""
if not self.__RequestedArrays:
self.__RequestedArrays = {}
self.__RequestedArrays[channelname] = requestedarrays
[docs] def SetInitialOutputOptions(self, timeStepToStartOutputAt, forceOutputAtFirstCall):
"""Set the frequencies at which the pipeline needs to be updated.
Typically, this is called by the subclass once it has determined what
timesteps co-processing will be needed to be done.
frequencies is a map, with key->string name of for the simulation
input, and value is a list of frequencies.
"""
self.__TimeStepToStartOutputAt = timeStepToStartOutputAt
self.__ForceOutputAtFirstCall = forceOutputAtFirstCall
[docs] def EnableLiveVisualization(self, enable, frequency=1):
"""Call this method to enable live-visualization. When enabled,
DoLiveVisualization() will communicate with ParaView server if possible
for live visualization. Frequency specifies how often the
communication happens (default is every second)."""
self.__EnableLiveVisualization = enable
self.__LiveVisualizationFrequency = frequency
[docs] def CreatePipeline(self, datadescription):
"""This methods must be overridden by subclasses to create the
visualization pipeline."""
raise RuntimeError("Subclasses must override this method.")
[docs] def LoadRequestedData(self, datadescription):
"""Call this method in RequestDataDescription co-processing pass to mark
the datadescription with information about what fields and grids are
required for this pipeline for the given timestep, if any.
Default implementation uses the update-frequencies set using
SetUpdateFrequencies() to determine if the current timestep needs to
be processed and then requests all fields. Subclasses can override
this method to provide additional customizations. If there is a Live
connection that can also override the initial frequencies."""
# if this is a time step to do live then only the channels that were requested when
# generating the script will be made available even though the adaptor may be able
# to provide other channels. similarly, if only specific arrays were requested when
# generating the script then only those arrays will be provided to the Live connection.
# note that we want the pipeline built before we do the actual first live connection.
if self.__EnableLiveVisualization and self.NeedToOutput(datadescription, self.__LiveVisualizationFrequency) \
and self.__LiveVisualizationLink:
if self.__LiveVisualizationLink.Initialize(servermanager.ActiveConnection.Session.GetSessionProxyManager()):
if self.__RequestedArrays:
for key in self.__RequestedArrays:
for v in self.__RequestedArrays[key]:
inputDescription = datadescription.GetInputDescriptionByName(key)
if inputDescription:
inputDescription.AddField(v[0], v[1])
elif self.__InitialFrequencies:
# __ProducersMap will not be filled up until after the first call to
# DoCoProcessing so we rely on __InitialFrequencies initially but then
# __ProducersMap after that as __InitialFrequencies will be cleared out.
for key in self.__InitialFrequencies:
inputDescription = datadescription.GetInputDescriptionByName(key)
if inputDescription:
inputDescription.AllFieldsOn()
inputDescription.GenerateMeshOn()
else:
for key in self.__ProducersMap:
inputDescription = datadescription.GetInputDescriptionByName(key)
if inputDescription:
inputDescription.AllFieldsOn()
inputDescription.GenerateMeshOn()
return
# if we haven't processed the pipeline yet in DoCoProcessing() we
# must use the initial frequencies to figure out if there's
# work to do this time/timestep. If we don't have Live enabled
# we know that the output frequencies aren't changed and can
# just use the initial frequencies.
if self.__ForceOutputAtFirstCall or self.__InitialFrequencies or not self.__EnableLiveVisualization:
if self.__RequestedArrays:
for key in self.__RequestedArrays:
for v in self.__RequestedArrays[key]:
inputDescription = datadescription.GetInputDescriptionByName(key)
if inputDescription:
inputDescription.AddField(v[0], v[1])
elif self.__InitialFrequencies:
for key in self.__InitialFrequencies:
freqs = self.__InitialFrequencies.get(key, [])
if self.__EnableLiveVisualization or self.IsInModulo(datadescription, freqs):
inputDescription = datadescription.GetInputDescriptionByName(key)
if inputDescription:
inputDescription.AllFieldsOn()
inputDescription.GenerateMeshOn()
else:
# the catalyst pipeline may have been changed by a Live connection
# so we need to regenerate the frequencies
from paraview import cpstate
frequencies = {}
for writer in self.__WritersList:
frequency = writer.parameters.GetProperty("WriteFrequency").GetElement(0)
if self.NeedToOutput(datadescription, frequency) or datadescription.GetForceOutput() == True:
writerinputs = cpstate.locate_simulation_inputs(writer)
for writerinput in writerinputs:
if self.__RequestedArrays:
for key in self.__RequestedArrays:
for v in self.__RequestedArrays[key]:
datadescription.GetInputDescriptionByName(writerinput).AddField(v[0], v[1])
else:
datadescription.GetInputDescriptionByName(writerinput).AllFieldsOn()
datadescription.GetInputDescriptionByName(writerinput).GenerateMeshOn()
for view in self.__ViewsList:
if (view.cpFrequency and self.NeedToOutput(datadescription, view.cpFrequency)) or \
datadescription.GetForceOutput() == True:
viewinputs = cpstate.locate_simulation_inputs_for_view(view)
for viewinput in viewinputs:
if self.__RequestedArrays:
for key in self.__RequestedArrays:
for v in self.__RequestedArrays[key]:
datadescription.GetInputDescriptionByName(viewinput).AddField(v[0], v[1])
else:
datadescription.GetInputDescriptionByName(viewinput).AllFieldsOn()
datadescription.GetInputDescriptionByName(viewinput).GenerateMeshOn()
[docs] def UpdateProducers(self, datadescription):
"""This method will update the producers in the pipeline. If the
pipeline is not created, it will be created using
self.CreatePipeline().
"""
if not self.__PipelineCreated:
self.CreatePipeline(datadescription)
self.__PipelineCreated = True
if self.__EnableLiveVisualization:
# we don't want to use __InitialFrequencies any more with live viz
self.__InitialFrequencies = None
self.__FixupWriters()
else:
simtime = datadescription.GetTime()
for name, producer in self.__ProducersMap.items():
producer.GetClientSideObject().SetOutput(
datadescription.GetInputDescriptionByName(name).GetGrid(),
simtime)
[docs] def WriteData(self, datadescription):
"""This method will update all writes present in the pipeline, as
needed, to generate the output data files, respecting the
write-frequencies set on the writers."""
timestep = datadescription.GetTimeStep()
for writer in self.__WritersList:
frequency = writer.parameters.GetProperty(
"WriteFrequency").GetElement(0)
if self.NeedToOutput(datadescription, frequency) or datadescription.GetForceOutput() == True:
fileName = writer.parameters.GetProperty("FileName").GetElement(0)
paddingamount = writer.parameters.GetProperty("PaddingAmount").GetElement(0)
helperName = writer.GetXMLName()
if helperName == "ExodusIIWriter":
ts = "." + str(timestep).rjust(paddingamount, '0')
writer.FileName = fileName + ts
else:
ts = str(timestep).rjust(paddingamount, '0')
writer.FileName = fileName.replace("%t", ts)
if '/' in writer.FileName and createDirectoriesIfNeeded:
oktowrite = [1.]
import vtk
comm = vtk.vtkMultiProcessController.GetGlobalController()
if comm.GetLocalProcessId() == 0:
import os
newDir = writer.FileName[0:writer.FileName.rfind('/')]
try:
os.makedirs(newDir)
except OSError:
if not os.path.isdir(newDir):
print("ERROR: Cannot make directory for", writer.FileName, ". No data will be written.")
oktowrite[0] = 0.
comm.Broadcast(oktowrite, 1, 0)
if oktowrite[0] == 0:
# we can't make the directory so no reason to update the pipeline
return
writer.UpdatePipeline(datadescription.GetTime())
self.__AppendToCinemaDTable(timestep, "writer_%s" % self.__WritersList.index(writer), writer.FileName)
self.__FinalizeCinemaDTable()
[docs] def WriteImages(self, datadescription, rescale_lookuptable=False,
image_quality=None, padding_amount=0):
"""This method will update all views, if present and write output
images, as needed.
**Parameters**
datadescription
Catalyst data-description object
rescale_lookuptable (bool, optional)
If True, when all lookup tables
are rescaled using current data ranges before saving the images.
Defaults to False.
image_quality (int, optional)
If specified, should be a value in
the range (0, 100) that specifies the image quality. For JPEG, 0
is low quality i.e. max compression, 100 is best quality i.e.
least compression. For legacy reasons, this is inverted for PNG
(which uses lossless compression). For PNG, 0 is no compression
i.e maximum image size, while 100 is most compressed and hence
least image size.
If not specified, for saving PNGs 0 is assumed to minimize
performance impact.
padding_amount (int, optional)
Amount to pad the time index by.
"""
timestep = datadescription.GetTimeStep()
cinema_dirs = []
for view in self.__ViewsList:
if (view.cpFrequency and self.NeedToOutput(datadescription, view.cpFrequency)) or \
datadescription.GetForceOutput() == True:
fname = view.cpFileName
ts = str(timestep).rjust(padding_amount, '0')
fname = fname.replace("%t", ts)
if view.cpFitToScreen != 0:
view.ViewTime = datadescription.GetTime()
if view.IsA("vtkSMRenderViewProxy") == True:
view.ResetCamera()
elif view.IsA("vtkSMContextViewProxy") == True:
view.ResetDisplay()
else:
print(' do not know what to do with a ', view.GetClassName())
view.ViewTime = datadescription.GetTime()
if rescale_lookuptable:
self.RescaleDataRange(view, datadescription.GetTime())
cinemaOptions = view.cpCinemaOptions
if cinemaOptions and 'camera' in cinemaOptions:
if 'composite' in view.cpCinemaOptions and view.cpCinemaOptions['composite'] == True:
dirname, filelist = self.UpdateCinema(view, datadescription,
specLevel="B")
else:
dirname, filelist = self.UpdateCinema(view, datadescription,
specLevel="A")
if dirname:
self.__AppendCViewToCinemaDTable(timestep, "view_%s" % self.__ViewsList.index(view), filelist)
cinema_dirs.append(dirname)
else:
if '/' in fname and createDirectoriesIfNeeded:
oktowrite = [1.]
import vtk
comm = vtk.vtkMultiProcessController.GetGlobalController()
if comm.GetLocalProcessId() == 0:
import os
newDir = fname[0:fname.rfind('/')]
try:
os.makedirs(newDir)
except OSError:
if not os.path.isdir(newDir):
print("ERROR: Cannot make directory for", fname, ". No image will be output.")
oktowrite[0] = 0.
comm.Broadcast(oktowrite, 1, 0)
if oktowrite[0] == 0:
# we can't make the directory so no reason to update the pipeline
return
if image_quality is None and fname.endswith('png'):
# for png quality = 0 means no compression. compression can be a potentially
# very costly serial operation on process 0
quality = 0
elif image_quality is not None:
quality = int(image_quality)
else:
# let simple.SaveScreenshot pick a default.
quality = None
if fname.endswith('png') and view.cpCompression is not None and view.cpCompression != -1:
simple.SaveScreenshot(fname, view,
CompressionLevel=view.cpCompression,
ImageResolution=view.ViewSize)
else:
simple.SaveScreenshot(fname, view,
magnification=view.cpMagnification,
quality=quality)
self.__AppendToCinemaDTable(timestep, "view_%s" % self.__ViewsList.index(view), fname)
if len(cinema_dirs) > 1:
import paraview.tpl.cinema_python.adaptors.paraview.pv_introspect as pv_introspect
pv_introspect.make_workspace_file("cinema\\", cinema_dirs)
self.__FinalizeCinemaDTable()
[docs] def DoLiveVisualization(self, datadescription, hostname, port):
"""This method execute the code-stub needed to communicate with ParaView
for live-visualization. Call this method only if you want to support
live-visualization with your co-processing module."""
if not self.__EnableLiveVisualization:
return
if not self.__LiveVisualizationLink and self.__EnableLiveVisualization:
# Create the vtkLiveInsituLink i.e. the "link" to the visualization processes.
self.__LiveVisualizationLink = servermanager.vtkLiveInsituLink()
# Tell vtkLiveInsituLink what host/port must it connect to
# for the visualization process.
self.__LiveVisualizationLink.SetHostname(hostname)
self.__LiveVisualizationLink.SetInsituPort(int(port))
# Initialize the "link"
self.__LiveVisualizationLink.Initialize(servermanager.ActiveConnection.Session.GetSessionProxyManager())
if self.__EnableLiveVisualization and self.NeedToOutput(datadescription, self.__LiveVisualizationFrequency):
if not self.__LiveVisualizationLink.Initialize(
servermanager.ActiveConnection.Session.GetSessionProxyManager()):
return
time = datadescription.GetTime()
timeStep = datadescription.GetTimeStep()
# stay in the loop while the simulation is paused
while True:
# Update the simulation state, extracts and simulationPaused
# from ParaView Live
self.__LiveVisualizationLink.InsituUpdate(time, timeStep)
# sources need to be updated by insitu
# code. vtkLiveInsituLink never updates the pipeline, it
# simply uses the data available at the end of the
# pipeline, if any.
for source in simple.GetSources().values():
source.UpdatePipeline(time)
# push extracts to the visualization process.
self.__LiveVisualizationLink.InsituPostProcess(time, timeStep)
if (self.__LiveVisualizationLink.GetSimulationPaused()):
# This blocks until something changes on ParaView Live
# and then it continues the loop. Returns != 0 if LIVE side
# disconnects
if (self.__LiveVisualizationLink.WaitForLiveChange()):
break
else:
break
[docs] def CreateProducer(self, datadescription, inputname):
"""Creates a producer proxy for the grid. This method is generally used in
CreatePipeline() call to create producers."""
# Check that the producer name for the input given is valid for the
# current setup.
if not datadescription.GetInputDescriptionByName(inputname):
raise RuntimeError("Simulation input name '%s' does not exist" % inputname)
grid = datadescription.GetInputDescriptionByName(inputname).GetGrid()
if not grid:
# we have a description of this channel but we don't need the grid so return
return
if inputname in self.__ProducersMap:
raise RuntimeError("CreateProducer is being called multiple times for input '%s'" % inputname)
producer = simple.PVTrivialProducer(guiName=inputname)
producer.add_attribute("cpSimulationInput", inputname)
# mark this as an input proxy so we can use cpstate.locate_simulation_inputs()
# to find it
producer.SMProxy.cpSimulationInput = inputname
# we purposefully don't set the time for the PVTrivialProducer here.
# when we update the pipeline we will do it then.
producer.GetClientSideObject().SetOutput(grid, datadescription.GetTime())
if grid.IsA("vtkImageData") == True or \
grid.IsA("vtkStructuredGrid") == True or \
grid.IsA("vtkRectilinearGrid") == True:
extent = datadescription.GetInputDescriptionByName(inputname).GetWholeExtent()
producer.WholeExtent = [extent[0], extent[1], extent[2], extent[3], extent[4], extent[5]]
# Save the producer for easy access in UpdateProducers() call.
self.__ProducersMap[inputname] = producer
producer.UpdatePipeline(datadescription.GetTime())
return producer
[docs] def CreateTemporalProducer(self, datadescription, inputname):
"""Python access to a temporal cache object associated with a specific
one simulation product. Much like CreateProducer, only this ends up with
a temporal cache filter instead of a PVTrivialProducer."""
if not datadescription.GetInputDescriptionByName(inputname):
raise RuntimeError("Simulation input name '%s' does not exist" % inputname)
idd = datadescription.GetInputDescriptionByName(inputname)
cache = idd.GetTemporalCache()
if not cache:
raise RuntimeError("I see no cache for '%s'" % inputname)
return
return servermanager._getPyProxy(cache)
[docs] def ProcessExodusIIWriter(self, writer):
"""Extra work for the ExodusII writer to avoid undesired warnings
and print out a message on how to read the files into Ensight."""
# Disable the warning about not having meta data available since we can
# use this writer for vtkDataSets
writer.IgnoreMetaDataWarning = 1
# optionally print message so that people know what file string to use to open in Ensight
if self.__PrintEnsightFormatString:
pm = servermanager.vtkProcessModule.GetProcessModule()
pid = pm.GetGlobalController().GetLocalProcessId()
if pid == 0:
nump = pm.GetGlobalController().GetNumberOfProcesses()
if nump == 1:
print("Ensight 'Set string' input is '", writer.FileName, ".*'", sep="")
else:
print("Ensight 'Set string' input is '", writer.FileName, ".*." + str(nump) + \
".<" + str(nump) + ":%0." + str(len(str(nump - 1))) + "d>'", sep="")
[docs] def RegisterWriter(self, writer, filename, freq, paddingamount=0, **params):
"""Registers a writer proxy. This method is generally used in
CreatePipeline() to register writers. All writes created as such will
write the output files appropriately in WriteData() is called.
params should be empty as of ParaView 5.9 but is passed in for
backwards compatibility."""
writerParametersProxy = self.WriterParametersProxy(
writer, filename, freq, paddingamount)
writer.FileName = filename
writer.add_attribute("parameters", writerParametersProxy)
for p in params:
v = params[p]
if writer.GetProperty(p) is not None:
wp = writer.GetProperty(p)
wp.SetData(v)
self.__WritersList.append(writer)
helperName = writer.GetXMLName()
if helperName == "ExodusIIWriter":
self.ProcessExodusIIWriter(writer)
return writer
[docs] def WriterParametersProxy(self, writer, filename, freq, paddingamount):
"""Creates a client only proxy that will be synchronized with ParaView
Live, allowing a user to set the filename and frequency.
"""
controller = servermanager.ParaViewPipelineController()
# assume that a client only proxy with the same name as a writer
# is available in "insitu_writer_parameters"
# Since coprocessor sometimes pass writer as a custom object and not
# a proxy, we need to handle that. Just creating any arbitrary writer
# proxy to store the parameters it acceptable. So let's just do that
# when the writer is not a proxy.
writerIsProxy = isinstance(writer, servermanager.Proxy)
helperName = writer.GetXMLName() if writerIsProxy else "XMLPImageDataWriter"
proxy = servermanager.ProxyManager().NewProxy(
"insitu_writer_parameters", helperName)
controller.PreInitializeProxy(proxy)
if writerIsProxy:
# it's possible that the writer can take in multiple input connections
# so we need to go through all of them. the try/except block seems
# to be the best way to figure out if there are multiple input connections
try:
length = len(writer.Input)
for i in range(length):
proxy.GetProperty("Input").AddInputConnection(
writer.Input[i].SMProxy, 0)
except:
proxy.GetProperty("Input").SetInputConnection(
0, writer.Input.SMProxy, 0)
proxy.GetProperty("FileName").SetElement(0, filename)
proxy.GetProperty("WriteFrequency").SetElement(0, freq)
proxy.GetProperty("PaddingAmount").SetElement(0, paddingamount)
controller.PostInitializeProxy(proxy)
controller.RegisterPipelineProxy(proxy)
return proxy
[docs] def RegisterCinemaTrack(self, name, proxy, smproperty, valrange):
"""
Register a point of control (filter's property) that will be varied over in a cinema export.
"""
if not isinstance(proxy, servermanager.Proxy):
raise RuntimeError("Invalid 'proxy' argument passed to RegisterCinemaTrack.")
self.__CinemaTracksList.append({"name": name, "proxy": proxy, "smproperty": smproperty, "valrange": valrange})
proxyDefinitions = self.__CinemaTracks[proxy] if (proxy in self.__CinemaTracks) else {}
proxyDefinitions[smproperty] = valrange
self.__CinemaTracks[proxy] = proxyDefinitions
return proxy
[docs] def AddArraysToCinemaTrack(self, proxy, propertyName, arrayNames):
''' Register user-defined target arrays by name. '''
if not isinstance(proxy, servermanager.Proxy):
raise RuntimeError("Invalid 'proxy' argument passed to AddArraysToCinemaTrack.")
proxyDefinitions = self.__CinemaTracks[proxy] if (proxy in self.__CinemaTracks) else {}
proxyDefinitions[propertyName] = arrayNames
self.__CinemaTracks[proxy] = proxyDefinitions
return proxy
[docs] def RegisterView(self, view, filename, freq, fittoscreen, magnification, width, height,
cinema=None, compression=None):
"""Register a view for image capture with extra meta-data such
as magnification, size and frequency."""
if not isinstance(view, servermanager.Proxy):
raise RuntimeError("Invalid 'view' argument passed to RegisterView.")
view.add_attribute("cpFileName", filename)
view.add_attribute("cpFrequency", freq)
view.add_attribute("cpFitToScreen", fittoscreen)
view.add_attribute("cpMagnification", magnification)
view.add_attribute("cpCinemaOptions", cinema)
view.add_attribute("cpCompression", compression)
view.ViewSize = [width, height]
self.__ViewsList.append(view)
return view
[docs] def Finalize(self):
for writer in self.__WritersList:
if hasattr(writer, 'Finalize'):
writer.Finalize()
for view in self.__ViewsList:
if hasattr(view, 'Finalize'):
view.Finalize()
[docs] def RescaleDataRange(self, view, time):
"""DataRange can change across time, sometime we want to rescale the
color map to match to the closer actual data range."""
reps = view.Representations
for rep in reps:
if not hasattr(rep, 'Visibility') or \
not rep.Visibility or \
not hasattr(rep, 'MapScalars') or \
not rep.MapScalars or \
not rep.LookupTable:
# rep is either not visible or not mapping scalars using a LUT.
continue;
input = rep.Input
input.UpdatePipeline(time) # make sure range is up-to-date
lut = rep.LookupTable
colorArrayInfo = rep.GetArrayInformationForColorArray()
if not colorArrayInfo:
import sys
datarange = [sys.float_info.max, -sys.float_info.max]
else:
if lut.VectorMode != 'Magnitude' or \
colorArrayInfo.GetNumberOfComponents() == 1:
datarange = colorArrayInfo.GetComponentRange(lut.VectorComponent)
else:
# -1 corresponds to the magnitude.
datarange = colorArrayInfo.GetComponentRange(-1)
from paraview.vtk import vtkDoubleArray
import paraview.servermanager
pm = paraview.servermanager.vtkProcessModule.GetProcessModule()
globalController = pm.GetGlobalController()
localarray = vtkDoubleArray()
localarray.SetNumberOfTuples(2)
localarray.SetValue(0,
-datarange[0]) # negate so that MPI_MAX gets min instead of doing a MPI_MIN and MPI_MAX
localarray.SetValue(1, datarange[1])
globalarray = vtkDoubleArray()
globalarray.SetNumberOfTuples(2)
globalController.AllReduce(localarray, globalarray, 0)
globaldatarange = [-globalarray.GetValue(0), globalarray.GetValue(1)]
rgbpoints = lut.RGBPoints.GetData()
numpts = len(rgbpoints) // 4
if globaldatarange[0] != rgbpoints[0] or globaldatarange[1] != rgbpoints[(numpts - 1) * 4]:
# rescale all of the points
oldrange = rgbpoints[(numpts - 1) * 4] - rgbpoints[0]
newrange = globaldatarange[1] - globaldatarange[0]
# only readjust if the new range isn't zero.
if newrange != 0:
newrgbpoints = list(rgbpoints)
# if the old range isn't 0 then we use that ranges distribution
if oldrange != 0:
for v in range(numpts - 1):
newrgbpoints[v * 4] = globaldatarange[0] + (
rgbpoints[v * 4] - rgbpoints[0]) * newrange / oldrange
# avoid numerical round-off, at least with the last point
newrgbpoints[(numpts - 1) * 4] = globaldatarange[1]
else: # the old range is 0 so the best we can do is to space the new points evenly
for v in range(numpts + 1):
newrgbpoints[v * 4] = globaldatarange[0] + v * newrange / (1.0 * numpts)
lut.RGBPoints.SetData(newrgbpoints)
[docs] def UpdateCinema(self, view, datadescription, specLevel):
""" called from catalyst at each timestep to add to the cinema database """
if not view.IsA("vtkSMRenderViewProxy") == True:
return
try:
import paraview.tpl.cinema_python.adaptors.explorers as explorers
import paraview.tpl.cinema_python.adaptors.paraview.pv_explorers as pv_explorers
import paraview.tpl.cinema_python.adaptors.paraview.pv_introspect as pv_introspect
except ImportError as e:
import paraview
paraview.print_error("Cannot import cinema")
paraview.print_error(e)
return
# figure out where to put this store
import os.path
vfname = view.cpFileName
extension = os.path.splitext(vfname)[1]
vfname = vfname[0:vfname.rfind("_")] # strip _num.ext
fname = os.path.join(os.path.dirname(vfname),
"cinema",
os.path.basename(vfname),
"info.json")
def float_limiter(x):
# a shame, but needed to make sure python, javascript and (directory/file)name agree
if isinstance(x, (float)):
return '%.6e' % x # arbitrarily chose 6 significant digits
else:
return x
# what time?
time = datadescription.GetTime()
view.ViewTime = time
formatted_time = float_limiter(time)
# ensure that cinema operates on the specified view
simple.SetActiveView(view)
# Include camera information in the user defined parameters.
# pv_introspect uses __CinemaTracks to customize the exploration.
co = view.cpCinemaOptions
camType = co["camera"]
if "phi" in co:
self.__CinemaTracks["phi"] = co["phi"]
if "theta" in co:
self.__CinemaTracks["theta"] = co["theta"]
if "roll" in co:
self.__CinemaTracks["roll"] = co["roll"]
tracking_def = {}
if "tracking" in co:
tracking_def = co['tracking']
# figure out what we show now
pxystate = pv_introspect.record_visibility()
# a conservative global bounds for consistent z scaling
minbds, maxbds = pv_introspect.max_bounds()
# make sure depth rasters are consistent
view.MaxClipBounds = [minbds, maxbds, minbds, maxbds, minbds, maxbds]
view.LockBounds = 1
disableValues = False if 'noValues' not in co else co['noValues']
if specLevel == "B":
p = pv_introspect.inspect(skip_invisible=True)
else:
p = pv_introspect.inspect(skip_invisible=False)
fs = pv_introspect.make_cinema_store(p, fname, view,
forcetime=formatted_time,
userDefined=self.__CinemaTracks,
specLevel=specLevel,
camType=camType,
extension=extension,
disableValues=disableValues)
# all nodes participate, but only root can writes out the files
pm = servermanager.vtkProcessModule.GetProcessModule()
pid = pm.GetPartitionId()
enableFloatVal = False if 'floatValues' not in co else co['floatValues']
new_files = {}
ret = pv_introspect.explore(fs, p, iSave=(pid == 0),
currentTime={'time': formatted_time},
userDefined=self.__CinemaTracks,
specLevel=specLevel,
camType=camType,
tracking=tracking_def,
floatValues=enableFloatVal,
disableValues=disableValues)
if pid == 0:
fs.save()
new_files[vfname] = ret;
view.LockBounds = 0
# restore what we showed
pv_introspect.restore_visibility(pxystate)
return os.path.basename(vfname), new_files
[docs] def IsInModulo(self, datadescription, frequencies):
"""
Return True if the given timestep in datadescription is in one of the provided frequencies
or output is forced. This can be interpreted as follow::
isFM = IsInModulo(timestep-timeStepToStartOutputAt, [2,3,7])
is similar to::
isFM = (timestep-timeStepToStartOutputAt % 2 == 0) or (timestep-timeStepToStartOutputAt % 3 == 0) or (timestep-timeStepToStartOutputAt % 7 == 0)
The timeStepToStartOutputAt is the first timestep that will potentially be output.
"""
timestep = datadescription.GetTimeStep()
if timestep < self.__TimeStepToStartOutputAt and not self.__ForceOutputAtFirstCall:
return False
for frequency in frequencies:
if frequency > 0 and self.NeedToOutput(datadescription, frequency):
return True
return False
[docs] def NeedToOutput(self, datadescription, frequency):
"""
Return True if we need to output based on the input timestep, frequency and forceOutput. Checks based
__FirstTimeStepIndex, __FirstTimeStepIndex, __ForceOutputAtFirstCall and __TimeStepToStartOutputAt
member variables.
"""
if datadescription.GetForceOutput() == True:
return True
timestep = datadescription.GetTimeStep()
if self.__FirstTimeStepIndex == None:
self.__FirstTimeStepIndex = timestep
if self.__ForceOutputAtFirstCall and self.__FirstTimeStepIndex == timestep:
return True
if self.__TimeStepToStartOutputAt <= timestep and (timestep - self.__TimeStepToStartOutputAt) % frequency == 0:
return True
return False
[docs] def EnableCinemaDTable(self):
""" Enable the normally disabled cinema D table export feature """
self.__CinemaDHelper = exportnow.CinemaDHelper(True,
self.__ImageRootDirectory)
def __AppendCViewToCinemaDTable(self, time, producer, filelist):
"""
This is called every time catalyst writes any cinema image result
to update the Cinema D index of outputs table.
Note, we aggregate file operations later with __FinalizeCinemaDTable.
"""
if self.__CinemaDHelper is None:
return
import vtk
comm = vtk.vtkMultiProcessController.GetGlobalController()
if comm.GetLocalProcessId() == 0:
self.__CinemaDHelper.AppendCViewToCinemaDTable(time, producer, filelist)
def __AppendToCinemaDTable(self, time, producer, filename):
"""
This is called every time catalyst writes any data file or screenshot
to update the Cinema D index of outputs table.
Note, we aggregate file operations later with __FinalizeCinemaDTable.
"""
if self.__CinemaDHelper is None:
return
import vtk
comm = vtk.vtkMultiProcessController.GetGlobalController()
if comm.GetLocalProcessId() == 0:
self.__CinemaDHelper.AppendToCinemaDTable(time, producer, filename)
def __FinalizeCinemaDTable(self):
if self.__CinemaDHelper is None:
return
import vtk
comm = vtk.vtkMultiProcessController.GetGlobalController()
if comm.GetLocalProcessId() == 0:
self.__CinemaDHelper.WriteNow()
[docs] def SetRootDirectory(self, root_directory):
""" Makes Catalyst put all output under this directory. """
self.SetImageRootDirectory(root_directory)
self.SetDataRootDirectory(root_directory)
[docs] def SetImageRootDirectory(self, root_directory):
"""Specify root directory for image extracts"""
if root_directory and not root_directory.endswith("/"):
root_directory = root_directory + "/"
self.__ImageRootDirectory = root_directory
[docs] def SetDataRootDirectory(self, root_directory):
"""Specify root directory for data extracts"""
if root_directory and not root_directory.endswith("/"):
root_directory = root_directory + "/"
self.__DataRootDirectory = root_directory
def __FixupWriters(self):
""" Called once to ensure that all writers obey the root directory directive """
if self.__ImageRootDirectory:
for view in self.__ViewsList:
view.cpFileName = self.__ImageRootDirectory + view.cpFileName
if self.__DataRootDirectory:
for writer in self.__WritersList:
fileName = self.__DataRootDirectory + writer.parameters.GetProperty("FileName").GetElement(0)
writer.parameters.GetProperty("FileName").SetElement(0, fileName)
writer.parameters.FileName = fileName