r"""This module is used to by the CoProcessingScriptGenerator plugin to aid in
capturing ParaView state as CoProcessing python script.
This can capture the ParaView state in a Pipeline object that can then be used
in CoProcessing scripts. The entry point into this module is the function
DumpPipeline() which returns the Python trace script. Most of the other
functions can be considered internal.
"""
from paraview import smtrace, smstate, servermanager
import numbers
_cpstate_globals = None
[docs]def reset_globals():
global _cpstate_globals
_cpstate_globals = None
[docs]def initialize_globals():
global _cpstate_globals
class _state(object):
def __init__(self):
self.write_frequencies = {}
self.simulation_input_map = {}
self.view_proxies = []
self.screenshot_info = {}
self.export_rendering = False
self.cinema_tracks = {}
self.cinema_arrays = {}
self.channels_needed = []
self.enable_live_viz = False
self.live_viz_frequency = 0
self.variable_to_name_map = {}
_cpstate_globals = _state()
return _cpstate_globals
[docs]def get_globals():
global _cpstate_globals
if _cpstate_globals is None:
raise RuntimeError("get_globals called before initialize_globals!")
return _cpstate_globals
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]class ProducerAccessor(smtrace.RealProxyAccessor):
"""This accessor is created instead of the standard one for proxies that
have been marked as simulation inputs. This accessor override the
trace_ctor() method to trace the constructor as the CreateProducer() call,
since the proxy is a dummy, in this case.
"""
def __init__(self, varname, proxy, simname):
self.SimulationInputName = simname
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
# this cpSimulationInput attribute is used to locate the proxy later on.
proxy.SMProxy.cpSimulationInput = simname
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
trace = smtrace.TraceOutput()
trace.append("# create a producer from a simulation input")
trace.append("%s = coprocessor.CreateProducer(datadescription, '%s')" % \
(self, self.SimulationInputName))
# self.varname has .'s and *'s stripped. Knowing this, the key in cpstate_globals.cinema_arrays
# should also be stripped of .'s and *'s. Refers to /Qt/ApplicationComponents/pqExportCatalystScript.cxx
cpstate_globals = get_globals()
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class SliceAccessor(smtrace.RealProxyAccessor):
"""
augments traces of slice filters with information to explore the
parameter space for cinema playback (if enabled)
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
cpstate_globals = get_globals()
if cpstate_globals.cinema_tracks and self.varname in cpstate_globals.cinema_tracks:
valrange = cpstate_globals.cinema_tracks[self.varname]
trace.append_separated(["# register the filter with the coprocessor's cinema generator"])
trace.append(["coprocessor.RegisterCinemaTrack('slice', %s, 'SliceOffsetValues', %s)" % (self, valrange)])
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class ContourAccessor(smtrace.RealProxyAccessor):
"""
augments traces of contour filters with information to explore the
parameter space for cinema playback (if enabled)
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
if cpstate_globals.cinema_tracks and self.varname in cpstate_globals.cinema_tracks:
valrange = cpstate_globals.cinema_tracks[self.varname]
trace.append_separated(["# register the filter with the coprocessor's cinema generator"])
trace.append(["coprocessor.RegisterCinemaTrack('contour', %s, 'Isosurfaces', %s)" % (self, valrange)])
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class ClipAccessor(smtrace.RealProxyAccessor):
"""
augments traces of clip filters with information to explore the
parameter space for cinema playback (if enabled)
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
cpstate_globals = get_globals()
if cpstate_globals.cinema_tracks and self.varname in cpstate_globals.cinema_tracks:
valrange = cpstate_globals.cinema_tracks[self.varname]
trace.append_separated(["# register the filter with the coprocessor's cinema generator"])
trace.append(["coprocessor.RegisterCinemaTrack('clip', %s, 'OffsetValues', %s)" % (self, valrange)])
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# TODO: Make Slice, Contour & Clip Accessors to share an interface to reduce code
# duplication. ArrayAccessor could be used as this interface.
# -----------------------------------------------------------------------------
[docs]class ArrayAccessor(smtrace.RealProxyAccessor):
''' Augments traces of filters by defining names of arrays to be explored.'''
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
cpstate_globals = get_globals()
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class ViewAccessor(smtrace.RealProxyAccessor):
"""Accessor for views. Overrides trace_ctor() to trace registering of the
view with the coprocessor. (I wonder if this registering should be moved to
the end of the state for better readability of the generated state files.
"""
def __init__(self, varname, proxy, proxyname):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.ProxyName = proxyname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
cpstate_globals = get_globals()
if self.ProxyName in cpstate_globals.screenshot_info:
trace.append_separated(["# register the view with coprocessor",
"# and provide it with information such as the filename to use,",
"# how frequently to write the images, etc."])
params = cpstate_globals.screenshot_info[self.ProxyName]
assert len(params) == 8
trace.append([
"coprocessor.RegisterView(%s," % self,
" filename='%s', freq=%s, fittoscreen=%s, magnification=%s, width=%s, height=%s, cinema=%s, compression=%s)" % \
(params[0], params[1], params[2], params[3], params[4], params[5], params[6], params[7]),
"%s.ViewTime = datadescription.GetTime()" % self])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class WriterFilter(smtrace.PipelineProxyFilter):
[docs] def should_never_trace(self, prop):
"""overridden to never trace 'WriteFrequency', 'FileName' and
'PaddingAmount' properties on writers."""
if prop.get_property_name() in ["WriteFrequency", "FileName", "PaddingAmount"]:
return True
return super(WriterFilter, self).should_never_trace(prop)
# -----------------------------------------------------------------------------
[docs]class WriterAccessor(smtrace.RealProxyAccessor):
"""Accessor for writers. Overrides trace_ctor() to use the actual writer
proxy name instead of the dummy-writer proxy's name. Also updates the
write_frequencies maintained in cpstate_globals with the write frequencies
for the writer.
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
write_frequency = proxy.GetProperty("WriteFrequency").GetElement(0)
# Locate which simulation input this write is connected to, if any. If so,
# we update the write_frequencies datastructure accordingly.
sim_inputs = locate_simulation_inputs(proxy)
cpstate_globals = get_globals()
for sim_input_name in sim_inputs:
if not write_frequency in cpstate_globals.write_frequencies[sim_input_name]:
cpstate_globals.write_frequencies[sim_input_name].append(write_frequency)
cpstate_globals.write_frequencies[sim_input_name].sort()
if not sim_input_name in cpstate_globals.channels_needed:
cpstate_globals.channels_needed.append(sim_input_name)
[docs] def get_proxy_label(self, xmlgroup, xmlname):
pxm = servermanager.ProxyManager()
prototype = pxm.GetPrototypeProxy(xmlgroup, xmlname)
if not prototype:
# a bit of a hack but we assume that there's a stub of some
# writer that's not available in this build but is available
# with the build used by the simulation code (probably through a plugin)
# this stub must have the proper name in the coprocessing hints
print("WARNING: Could not find", xmlname, "writer in", xmlgroup, \
"XML group. This is not a problem as long as the writer is available with " \
"the ParaView build used by the simulation code.")
ctor = servermanager._make_name_valid(xmlname)
else:
ctor = servermanager._make_name_valid(prototype.GetXMLLabel())
# TODO: use servermanager.ProxyManager().NewProxy() instead
# we create the writer proxy such that it is not registered with the
# ParaViewPipelineController, so its state is not sent to ParaView Live.
return "servermanager.%s.%s" % (xmlgroup, ctor)
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
xmlElement = self.get_object().GetHints().FindNestedElementByName("WriterProxy")
xmlgroup = xmlElement.GetAttribute("group")
xmlname = xmlElement.GetAttribute("name")
write_frequency = self.get_object().GetProperty("WriteFrequency").GetElement(0)
filename = self.get_object().GetProperty("FileName").GetElement(0)
padding_amount = self.get_object().GetProperty("PaddingAmount").GetElement(0)
ctor = self.get_proxy_label(xmlgroup, xmlname)
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, WriterFilter(), ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
trace.append_separated(["# register the writer with coprocessor",
"# and provide it with information such as the filename to use,",
"# how frequently to write the data, etc."])
trace.append("coprocessor.RegisterWriter(%s, filename='%s', freq=%s, paddingamount=%s)" % \
(self, filename, write_frequency, padding_amount))
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]def cp_hook(varname, proxy):
"""callback to create our special accessors instead of the standard ones."""
pname = smtrace.Trace.get_registered_name(proxy, "sources")
cpstate_globals = get_globals()
cpstate_globals.variable_to_name_map[proxy] = varname
if pname:
if pname in cpstate_globals.simulation_input_map:
return ProducerAccessor(varname, proxy, cpstate_globals.simulation_input_map[pname])
elif proxy.GetHints() and proxy.GetHints().FindNestedElementByName("WriterProxy"):
return WriterAccessor(varname, proxy)
elif ("servermanager.Slice" in proxy.__class__().__str__() and
"Plane object" in proxy.__getattribute__("SliceType").__str__()):
return SliceAccessor(varname, proxy)
elif ("servermanager.Clip" in proxy.__class__().__str__() and
"Plane object" in proxy.__getattribute__("ClipType").__str__()):
return ClipAccessor(varname, proxy)
elif "servermanager.Contour" in proxy.__class__().__str__():
return ContourAccessor(varname, proxy)
else:
return ArrayAccessor(varname, proxy)
raise NotImplementedError
# -----------------------------------------------------------------------------
[docs]class cpstate_filter_proxies_to_serialize(object):
"""filter used to skip views and representations a when export_rendering is
disabled."""
def __call__(self, proxy):
cpstate_globals = get_globals()
if not smstate.visible_representations()(proxy): return False
if (not cpstate_globals.export_rendering) and \
(proxy.GetXMLGroup() in ["views", "representations"]): return False
return True
# -----------------------------------------------------------------------------
[docs]class NewStyleWriters(object):
"""Helper to dump configured writer proxies, which are not in the pipeline,
into the script."""
def __init__(self, make_temporal_script=False):
self.__cnt = 1
self.__make_temporal_script = make_temporal_script
def __make_name(self, name):
"""
emulating name uniqueness that trace brings to variable names.
This may not be necessary because we register everything we make immediately
so var names can probably conflict.
"""
ret = smtrace.Trace.get_varname(name) + str(self.__cnt)
self.__cnt = self.__cnt + 1
return ret
[docs] def make_trace(self):
"""gather trace for the writer proxies that are not in the trace pipeline but
rather in the new export state.
"""
res = []
res.append("")
res.append("# Now any catalyst writers")
pxm = servermanager.ProxyManager()
globalepxy = pxm.GetProxy("export_global", "catalyst")
exports = pxm.GetProxiesInGroup("export_writers") # todo should use ExportDepot
for x in exports:
xs = x[0]
pxy = pxm.GetProxy('export_writers', xs)
if not pxy.HasAnnotation('enabled'):
continue
xmlname = pxy.GetXMLName()
if xmlname == "Cinema image options":
# skip the array and property export information we stuff in this proxy
continue
# note: this logic is not truly correct. the way this is setup,
# there is no good way to really find the variable name used for the input since the names
# that smtrace assigns are already cleaned up at this point.
# Ideally, this class should have been written as a true `Accessor` so it could
# be traced correctly. Right now, I am hacking this to attempt to get a reasonable name
# that works in most cases.
inputname = xs.split('|')[0]
inputname = servermanager._make_name_valid(inputname)
inputname = inputname[0].lower() + inputname[1:]
writername = xs.split('|')[1]
xmlgroup = pxy.GetXMLGroup()
padding_amount = globalepxy.GetProperty("FileNamePadding").GetElement(0)
write_frequency = pxy.GetProperty("WriteFrequency").GetElement(0)
filename = pxy.GetProperty("CatalystFilePattern").GetElement(0)
cpstate_globals = get_globals()
sim_inputs = locate_simulation_inputs(pxy)
for sim_input_name in sim_inputs:
if not write_frequency in cpstate_globals.write_frequencies[sim_input_name]:
cpstate_globals.write_frequencies[sim_input_name].append(write_frequency)
cpstate_globals.write_frequencies[sim_input_name].sort()
if not sim_input_name in cpstate_globals.channels_needed:
cpstate_globals.channels_needed.append(sim_input_name)
prototype = pxm.GetPrototypeProxy(xmlgroup, xmlname)
if not prototype:
varname = self.__make_name(xmlname)
else:
varname = self.__make_name(prototype.GetXMLLabel())
# Write pass array proxy
if pxy.GetProperty("ChooseArraysToWrite").GetElement(0) == 1:
point_arrays = []
cell_arrays = []
arrays_property = pxy.GetProperty("PointDataArrays")
for i in range(arrays_property.GetNumberOfElements()):
point_arrays.append(arrays_property.GetElement(i))
arrays_property = pxy.GetProperty("CellDataArrays")
for i in range(arrays_property.GetNumberOfElements()):
cell_arrays.append(arrays_property.GetElement(i))
f = "%s_arrays = PassArrays(Input=%s, PointDataArrays=%s, CellDataArrays=%s)" % \
(inputname, inputname, str(point_arrays), str(cell_arrays))
inputname = "%s_arrays" % inputname
res.append(f)
# Actual writer
f = "%s = servermanager.writers.%s(Input=%s)" % (varname, writername, inputname)
res.append(f)
# set various writer properties, except for the filename since that will be set later.
# point, cell and field arrays should already be set
notNeededProperties = ['CatalystFilePattern', 'CellDataArrays', 'ChooseArraysToWrite', 'EdgeDataArrays',
'FieldDataArrays', 'FileName', 'FileNameSuffix', 'Filenamesuffix', 'Input',
'PointDataArrays', 'WriteFrequency', ]
for p in pxy.ListProperties():
if p not in notNeededProperties and len(pxy.GetProperty(p)):
if isinstance(pxy.GetProperty(p).GetData(), servermanager.Proxy):
proxyvalue = pxy.GetProperty(p).GetData()
cpstate_globals = get_globals()
if proxyvalue in cpstate_globals.variable_to_name_map:
f = "%s.%s = %s" % (varname, p, cpstate_globals.variable_to_name_map[proxyvalue])
res.append(f)
elif hasattr(pxy.GetProperty(p), 'GetElement'):
value = pxy.GetProperty(p).GetElement(0)
if isinstance(value, numbers.Number):
f = "%s.%s = %s" % (varname, p, value)
elif isinstance(value, str):
f = "%s.%s = '%s'" % (varname, p, value)
else:
f = "%s.%s = %s" % (varname, p, str(value))
res.append(f)
if self.__make_temporal_script:
f = "STP.RegisterWriter(%s, '%s', tp_writers)" % (
varname, filename)
else:
f = "coprocessor.RegisterWriter(%s, filename='%s', freq=%s, paddingamount=%s)" % (
varname, filename, write_frequency, padding_amount)
res.append(f)
res.append("")
if len(res) == 2:
return [] # don't clutter output if there are no writers
return res
# -----------------------------------------------------------------------------
[docs]def DumpPipeline(export_rendering, simulation_input_map, screenshot_info,
cinema_tracks, cinema_arrays, enable_live_viz, live_viz_frequency):
"""Method that will dump the current pipeline and return it as a string trace.
export_rendering
boolean telling if we want to export rendering
simulation_input_map
string->string map with key being the proxyname while value being the
simulation input name.
screenshot_info
map with information about screenshots
* key -> view proxy name
* value -> [filename, writefreq, fitToScreen, magnification, width, height,
cinemacamera options, compressionlevel]
cinema_tracks
map with information about cinema tracks to record
* key -> proxy name
* value -> argument ranges
cinema_arrays
map with information about value arrays to be exported
* key -> proxy name
* value -> list of array names
enable_live_viz
boolean telling if we want to enable Catalyst Live connection
live_viz_frequency
integer telling how often to update Live connection. only used if
enable_live_viz is True
"""
# reset the global variables.
initialize_globals()
cpstate_globals = get_globals()
cpstate_globals.export_rendering = export_rendering
cpstate_globals.simulation_input_map = simulation_input_map
cpstate_globals.screenshot_info = screenshot_info
cpstate_globals.cinema_tracks = cinema_tracks
cpstate_globals.cinema_arrays = cinema_arrays
cpstate_globals.enable_live_viz = enable_live_viz
cpstate_globals.live_viz_frequency = live_viz_frequency
# Initialize the write frequency map
for key in cpstate_globals.simulation_input_map.values():
cpstate_globals.write_frequencies[key] = []
# Start trace
filter = cpstate_filter_proxies_to_serialize()
smtrace.RealProxyAccessor.register_create_callback(cp_hook)
state = smstate.get_state(filter=filter, raw=True)
smtrace.RealProxyAccessor.unregister_create_callback(cp_hook)
# add in the new style writer proxies
state = state + NewStyleWriters().make_trace()
# iterate over all views that were saved in state and update write requencies
if export_rendering:
pxm = servermanager.ProxyManager()
for key, vtuple in screenshot_info.items():
view = pxm.GetProxy("views", key)
if not view: continue
image_write_frequency = int(vtuple[1])
# Locate which simulation input this write is connected to, if any. If so,
# we update the write_frequencies datastructure accordingly.
sim_inputs = locate_simulation_inputs_for_view(view)
for sim_input_name in sim_inputs:
if not image_write_frequency in cpstate_globals.write_frequencies[sim_input_name]:
cpstate_globals.write_frequencies[sim_input_name].append(image_write_frequency)
cpstate_globals.write_frequencies[sim_input_name].sort()
if not sim_input_name in cpstate_globals.channels_needed:
cpstate_globals.channels_needed.append(sim_input_name)
if enable_live_viz:
for key in simulation_input_map:
sim_input_name = simulation_input_map[key]
if not live_viz_frequency in cpstate_globals.write_frequencies[sim_input_name]:
cpstate_globals.write_frequencies[sim_input_name].append(live_viz_frequency)
cpstate_globals.write_frequencies[sim_input_name].sort()
if not sim_input_name in cpstate_globals.channels_needed:
cpstate_globals.channels_needed.append(sim_input_name)
pxm = servermanager.ProxyManager()
arrays = {}
for channel_name in cpstate_globals.channels_needed:
arrays[channel_name] = []
p = pxm.GetProxy("sources", channel_name)
if p:
for i in range(p.GetPointDataInformation().GetNumberOfArrays()):
arrays[channel_name].append([p.GetPointDataInformation().GetArray(i).GetName(), 0])
for i in range(p.GetCellDataInformation().GetNumberOfArrays()):
arrays[channel_name].append([p.GetCellDataInformation().GetArray(i).GetName(), 1])
# Create global fields values
pipelineClassDef = "\n"
pipelineClassDef += "# ----------------------- CoProcessor definition -----------------------\n\n"
# Create the resulting string that will contains the pipeline definition
pipelineClassDef += "def CreateCoProcessor():\n"
pipelineClassDef += " def _CreatePipeline(coprocessor, datadescription):\n"
pipelineClassDef += " class Pipeline:\n";
# add the traced code.
for original_line in state:
for line in original_line.split("\n"):
if line.find("import *") != -1 or \
line.find("#### import the simple") != -1:
continue
if line:
pipelineClassDef += " " + line + "\n"
else:
pipelineClassDef += "\n"
pipelineClassDef += " return Pipeline()\n";
pipelineClassDef += "\n"
pipelineClassDef += " class CoProcessor(coprocessing.CoProcessor):\n"
pipelineClassDef += " def CreatePipeline(self, datadescription):\n"
pipelineClassDef += " self.Pipeline = _CreatePipeline(self, datadescription)\n"
pipelineClassDef += "\n"
pipelineClassDef += " coprocessor = CoProcessor()\n";
pipelineClassDef += " # these are the frequencies at which the coprocessor updates.\n"
pipelineClassDef += " freqs = " + str(cpstate_globals.write_frequencies) + "\n"
pipelineClassDef += " coprocessor.SetUpdateFrequencies(freqs)\n"
if arrays:
pipelineClassDef += " if requestSpecificArrays:\n"
for channel_name in arrays:
pipelineClassDef += " arrays = " + str(arrays[channel_name]) + "\n"
pipelineClassDef += " coprocessor.SetRequestedArrays('" + channel_name + "', arrays)\n"
pipelineClassDef += " coprocessor.SetInitialOutputOptions(timeStepToStartOutputAt,forceOutputAtFirstCall)\n"
pipelineClassDef += "\n"
pipelineClassDef += " if imageRootDirectory:\n"
pipelineClassDef += " coprocessor.SetImageRootDirectory(imageRootDirectory)\n"
pipelineClassDef += " if dataRootDirectory:\n"
pipelineClassDef += " coprocessor.SetDataRootDirectory(dataRootDirectory)\n"
pipelineClassDef += "\n"
pipelineClassDef += " if make_cinema_table:\n"
pipelineClassDef += " coprocessor.EnableCinemaDTable()\n"
pipelineClassDef += "\n"
pipelineClassDef += " return coprocessor\n"
# cleanup globals state
reset_globals()
return pipelineClassDef
# ------------------------------------------------------------------------------
[docs]def run(filename=None):
"""Create a dummy pipeline and save the coprocessing state in the filename
specified, if any, else dumps it out on stdout."""
from paraview import simple, servermanager
wavelet = simple.Wavelet(registrationName="Wavelet1")
contour = simple.Contour()
display = simple.Show()
view = simple.Render()
# create a new 'Parallel PolyData Writer'
parallelPolyDataWriter0 = simple.ParallelPolyDataWriter()
viewname = servermanager.ProxyManager().GetProxyName("views", view.SMProxy)
script = DumpPipeline(export_rendering=True,
simulation_input_map={"Wavelet1": "input"},
screenshot_info={viewname: ['image.png', '1', '1', '2', '400', '400']},
cinema_tracks={},
cinema_arrays={})
if filename:
f = open(filename, "w")
f.write(script)
f.close()
else:
print("# *** Generated Script Begin ***")
print(script)
print("# *** Generated Script End ***")
if __name__ == "__main__":
run()