from __future__ import absolute_import, print_function
from functools import wraps
from xml.dom.minidom import parseString
from xml.parsers.expat import ExpatError
from inspect import signature
import sys
def _count(values):
"""internal: returns the `number-of-elements` for the argument.
if argument is a list or type, it returns its `len`, else returns 0 for None
and 1 for non-None."""
if type(values) == list or type(values) == tuple:
return len(values)
elif values is None:
return 0
else:
return 1
def _stringify(values):
"""internal method: used to convert values to a string suitable for an xml attribute"""
if type(values) == list or type(values) == tuple:
return " ".join([str(x) for x in values])
elif type(values) == type(True):
return "1" if values else "0"
else:
return str(values)
def _generate_xml(attrs, nested_xmls=[]):
"""internal: used to generate an XML string from the arguments.
`attrs` is a dict with attributes specified using (key, value) for the dict.
`type` key in `attrs` is treated as the XML tag name.
`nested_xmls` is a list of strings that get nested in the resulting xmlstring
returned by this function.
"""
d = {}
d["type"] = attrs.pop("type")
attr_items = filter(lambda item: item[1] is not None, attrs.items())
d["attrs"] = "\n".join(["%s=\"%s\"" % (x, _stringify(y)) for x, y in attr_items])
d["nested_xmls"] = "\n".join(nested_xmls)
xml = """<{type} {attrs}> {nested_xmls} </{type}>"""
return xml.format(**d)
def _undecorate(func):
"""internal: Traverses through nested decorated objects to return the original
object. This is not a general mechanism and only supports decorator chains created
via `_create_decorator`."""
if hasattr(func, "_pv_original_func"):
return _undecorate(func._pv_original_func)
return func
def _create_decorator(kwargs={}, update_func=None, generate_xml_func=None):
"""internal: used to create decorator for class or function objects.
`kwargs`: these are typically the keyword arguments passed to the decorator itself.
must be a `dict`. The decorator will often update the dict to have
defaults or overrides to the parameters passed to the decorator, as appropriate.
`update_func`: if non-None, must be a callable that takes 2 arguments `(decoratedobj, kwargs)`.
The purpose of this callback is to update the kwargs (and return updated version)
for required-yet-missing attributes that can be deduced by introspecting
the `decoratedobj`.
`generate_xml_func`: must be non-None and must be a callable that takes 2 arguments `(decoratedobj, kwargs)`.
The kwargs can be expected to have been updated by calling `update_func`, if non-None.
The callback typically adds an attribute on the decoratedobj for further processing later.
"""
attrs = kwargs.copy()
def decorator(func):
original_func = _undecorate(func)
if update_func is not None:
updated_attrs = update_func(original_func, attrs)
else:
updated_attrs = attrs
generate_xml_func(original_func, updated_attrs)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
setattr(wrapper, "_pv_original_func", original_func)
return wrapper
return decorator
[docs]class smproperty(object):
"""
Provides decorators for class methods that are to be exposed as
server-manager properties in ParaView. Only methods that are decorated
using one of the available decorators will be exposed be accessible to ParaView
UI or client-side Python scripting API.
"""
@staticmethod
def _append_xml(func, xml):
pxmls = []
if hasattr(func, "_pvsm_property_xmls"):
pxmls = func._pvsm_property_xmls
pxmls.append(xml)
setattr(func, "_pvsm_property_xmls", pxmls)
@staticmethod
def _generate_xml(func, attrs):
nested_xmls = []
if hasattr(func, "_pvsm_domain_xmls"):
for d in func._pvsm_domain_xmls:
nested_xmls.append(d)
delattr(func, "_pvsm_domain_xmls")
if hasattr(func, "_pvsm_hints_xmls"):
hints = []
for h in func._pvsm_hints_xmls:
hints.append(h)
nested_xmls.append(_generate_xml({"type": "Hints"}, hints))
delattr(func, "_pvsm_hints_xmls")
pxml = _generate_xml(attrs, nested_xmls)
smproperty._append_xml(func, pxml)
@staticmethod
def _update_property_defaults(func, attrs):
"""Function used to populate default attribute values for missing attributes
on a for a property"""
# determine unspecified attributes based on the `func`
if attrs.get("name", None) is None:
attrs["name"] = func.__name__
if attrs.get("command", None) is None:
attrs["command"] = func.__name__
return attrs
@staticmethod
def _update_vectorproperty_defaults(func, attrs):
"""Function used to populate default attribute values for missing attributes
on a vector property"""
attrs = smproperty._update_property_defaults(func, attrs)
if attrs.get("number_of_elements", None) is None:
attrs["number_of_elements"] = len(signature(func).parameters) - 1
if attrs.get("default_values", None) is None:
attrs["default_values"] = "None"
else:
# confirm number_of_elements == len(default_values)
assert int(attrs["number_of_elements"]) == _count(attrs["default_values"])
# if repeat_command, set number_of_elements_per_command
# if not set.
if attrs.get("repeat_command", None) is not None and \
attrs.get("number_of_elements_per_command", None) is None:
attrs["number_of_elements_per_command"] = len(signature(func).parameters) - 1
return attrs
@staticmethod
def _update_proxyproperty_attrs(func, attrs):
return smproperty._update_property_defaults(func, attrs)
[docs] @staticmethod
def xml(xmlstr):
"""Decorator that be used to directly add a ServerManager property XML for a method."""
def generate(func, attrs):
smproperty._append_xml(func, xmlstr)
return _create_decorator(generate_xml_func=generate)
[docs] @staticmethod
def intvector(**kwargs):
attrs = {"type": "IntVectorProperty"}
attrs.update(kwargs)
return _create_decorator(attrs,
update_func=smproperty._update_vectorproperty_defaults,
generate_xml_func=smproperty._generate_xml)
[docs] @staticmethod
def doublevector(**kwargs):
attrs = {"type": "DoubleVectorProperty"}
attrs.update(kwargs)
return _create_decorator(attrs,
update_func=smproperty._update_vectorproperty_defaults,
generate_xml_func=smproperty._generate_xml)
[docs] @staticmethod
def idtypevector(**kwargs):
attrs = {"type": "IdTypeVectorProperty"}
attrs.update(kwargs)
return _create_decorator(attrs,
update_func=smproperty._update_vectorproperty_defaults,
generate_xml_func=smproperty._generate_xml)
[docs] @staticmethod
def stringvector(**kwargs):
attrs = {"type": "StringVectorProperty"}
attrs.update(kwargs)
return _create_decorator(attrs,
update_func=smproperty._update_vectorproperty_defaults,
generate_xml_func=smproperty._generate_xml)
[docs] @staticmethod
def proxy(**kwargs):
attrs = {"type": "ProxyProperty"}
attrs.update(kwargs)
return _create_decorator(attrs,
update_func=smproperty._update_proxyproperty_attrs,
generate_xml_func=smproperty._generate_xml)
[docs] @staticmethod
def dataarrayselection(name=None):
def generate(func, attrs):
xml = """<StringVectorProperty name="{name}Info"
command="{command}"
number_of_elements_per_command="2"
information_only="1"
si_class="vtkSIDataArraySelectionProperty" />
<StringVectorProperty name="{name}"
information_property="{name}Info"
command="{command}"
number_of_elements_per_command="2"
element_types="2 0"
repeat_command="1"
si_class="vtkSIDataArraySelectionProperty">
<ArraySelectionDomain name="array_list">
<RequiredProperties>
<Property function="ArrayList" name="{name}Info" />
</RequiredProperties>
</ArraySelectionDomain>
</StringVectorProperty>
""".format(**attrs)
smproperty._append_xml(func, xml)
return _create_decorator({"name": name},
update_func=smproperty._update_property_defaults,
generate_xml_func=generate)
[docs]class smdomain(object):
"""
Provides decorators that add domains to properties.
"""
@staticmethod
def _append_xml(func, xml):
domains = []
if hasattr(func, "_pvsm_domain_xmls"):
domains = func._pvsm_domain_xmls
domains.append(xml)
setattr(func, "_pvsm_domain_xmls", domains)
@staticmethod
def _generate_xml(func, attrs):
smdomain._append_xml(func, _generate_xml(attrs, []))
[docs] @staticmethod
def xml(xmlstr):
def generate(func, attrs):
smdomain._append_xml(func, xmlstr)
return _create_decorator({},
generate_xml_func=generate)
[docs] @staticmethod
def doublerange(**kwargs):
attrs = {"type": "DoubleRangeDomain", "name": "range"}
attrs.update(kwargs)
return _create_decorator(attrs,
generate_xml_func=smdomain._generate_xml)
[docs] @staticmethod
def intrange(**kwargs):
attrs = {"type": "IntRangeDomain", "name": "range"}
attrs.update(kwargs)
return _create_decorator(attrs,
generate_xml_func=smdomain._generate_xml)
[docs] @staticmethod
def filelist(**kwargs):
attrs = {"type": "FileListDomain", "name": "files"}
attrs.update(kwargs)
return _create_decorator(attrs,
generate_xml_func=smdomain._generate_xml)
[docs] @staticmethod
def datatype(dataTypes, **kwargs):
attrs = {"type": "DataTypeDomain", "name": "input_type"}
attrs.update(kwargs)
def generate(func, attrs):
type_xmls = []
for atype in dataTypes:
type_xmls.append(_generate_xml({"type": "DataType", "value": atype}, []))
smdomain._append_xml(func, _generate_xml(attrs, type_xmls))
return _create_decorator(attrs,
generate_xml_func=generate)
[docs]class smhint(object):
"""Provides decorators that add hints to proxies and properties."""
@staticmethod
def _generate_xml(func, attrs):
lhints = []
if hasattr(func, "_pvsm_hints_xmls"):
lhints = func._pvsm_hints_xmls
lhints.append(_generate_xml(attrs, []))
setattr(func, "_pvsm_hints_xmls", lhints)
[docs] @staticmethod
def xml(xmlstr):
def generate(func, attrs):
lhints = []
if hasattr(func, "_pvsm_hints_xmls"):
lhints = func._pvsm_hints_xmls
lhints.append(xmlstr)
setattr(func, "_pvsm_hints_xmls", lhints)
return _create_decorator({},
generate_xml_func=generate)
[docs] @staticmethod
def filechooser(extensions, file_description):
attrs = {}
attrs["type"] = "FileChooser"
attrs["extensions"] = extensions
attrs["file_description"] = file_description
return _create_decorator(attrs,
generate_xml_func=smhint._generate_xml)
[docs]def get_qualified_classname(classobj):
if classobj.__module__ == "__main__":
return classobj.__name__
else:
return "%s.%s" % (classobj.__module__, classobj.__name__)
[docs]class smproxy(object):
"""
Provides decorators for class objects that should be exposed to
ParaView.
"""
@staticmethod
def _update_proxy_defaults(classobj, attrs):
if attrs.get("name", None) is None:
attrs["name"] = classobj.__name__
if attrs.get("class", None) is None:
attrs["class"] = get_qualified_classname(classobj)
if attrs.get("label", None) is None:
attrs["label"] = attrs["name"]
return attrs
@staticmethod
def _generate_xml(classobj, attrs):
nested_xmls = []
classobj = _undecorate(classobj)
if hasattr(classobj, "_pvsm_property_xmls"):
val = getattr(classobj, "_pvsm_property_xmls")
if type(val) == type([]):
nested_xmls += val
else:
nested_xmls.append(val)
prop_xmls_dict = {}
prop_ordering = {}
for pname, val in classobj.__dict__.items():
val = _undecorate(val)
if callable(val) and hasattr(val, "_pvsm_property_xmls"):
pxmls = getattr(val, "_pvsm_property_xmls")
if len(pxmls) > 1:
raise RuntimeError("Multiple property definitions on the same" \
"method are not supported.")
prop_xmls_dict[pname] = pxmls[0]
prop_ordering[pname] = val.__code__.co_firstlineno
# sort properties by the line numbers of their functions
nested_xmls += [prop_xmls_dict[key] for key in sorted(prop_xmls_dict.keys(), key=prop_ordering.get)]
if attrs.get("support_reload", True):
nested_xmls.insert(0, """
<Property name="Reload Python Module" panel_widget="command_button">
<Documentation>Reload the Python module.</Documentation>
</Property>""")
if hasattr(classobj, "_pvsm_hints_xmls"):
hints = [h for h in classobj._pvsm_hints_xmls]
nested_xmls.append(_generate_xml({"type": "Hints"}, hints))
proxyxml = _generate_xml(attrs, nested_xmls)
groupxml = _generate_xml({"type": "ProxyGroup", "name": attrs.get("group")},
[proxyxml])
smconfig = _generate_xml({"type": "ServerManagerConfiguration"}, [groupxml])
setattr(classobj, "_pvsm_proxy_xml", smconfig)
[docs] @staticmethod
def source(**kwargs):
attrs = {}
attrs["type"] = "SourceProxy"
attrs["group"] = "sources"
attrs["si_class"] = "vtkSIPythonSourceProxy"
attrs.update(kwargs)
return _create_decorator(attrs,
update_func=smproxy._update_proxy_defaults,
generate_xml_func=smproxy._generate_xml)
[docs] @staticmethod
def filter(**kwargs):
attrs = {"group": "filters"}
attrs.update(kwargs)
return smproxy.source(**attrs)
[docs] @staticmethod
def reader(file_description, extensions=None, filename_patterns=None, is_directory=False, **kwargs):
"""
Decorates a reader. Either `filename_patterns` or `extensions` must be
provided.
"""
if extensions is None and filename_patterns is None:
raise RuntimeError("Either `filename_patterns` or `extensions` must be provided for a reader.")
attrs = {"type": "ReaderFactory"}
attrs["file_description"] = file_description
attrs["extensions"] = extensions
attrs["filename_patterns"] = filename_patterns
attrs["is_directory"] = "1" if is_directory else None
_xml = _generate_xml(attrs, [])
def decorator(func):
f = smhint.xml(_xml)(func)
return smproxy.source(**kwargs)(f)
return decorator
[docs] @staticmethod
def writer(file_description, extensions, **kwargs):
"""
Decorates a writer.
"""
assert file_description is not None and extensions is not None
attrs = {"type": "WriterFactory"}
attrs["file_description"] = file_description
attrs["extensions"] = extensions
_xml = _generate_xml(attrs, [])
def decorator(func):
f = smhint.xml(_xml)(func)
return smproxy.source(group="writers", type="WriterProxy", **kwargs)(f)
return decorator
[docs]def get_plugin_xmls(module_or_package):
"""helper function called by vtkPVPythonAlgorithmPlugin to discover
all "proxy" decorated classes in the module or package. We don't recurse
into the package, on simply needs to export all classes that form the
ParaView plugin in the __init__.py for the package."""
from inspect import ismodule, isclass
items = []
if ismodule(module_or_package):
items = module_or_package.__dict__.items()
elif hasattr(module_or_package, "items"):
items = module_or_package.items()
xmls = []
for (k, v) in items:
v = _undecorate(v)
if hasattr(v, "_pvsm_proxy_xml"):
xmls.append(getattr(v, "_pvsm_proxy_xml"))
return xmls
[docs]def get_plugin_name(module_or_package):
"""helper function called by vtkPVPythonAlgorithmPlugin to discover
ParaView plugin name, if any."""
from inspect import ismodule, isclass
if ismodule(module_or_package) and hasattr(module_or_package, "paraview_plugin_name"):
return str(getattr(module_or_package, "paraview_plugin_name"))
else:
return module_or_package.__name__
[docs]def get_plugin_version(module_or_package):
"""helper function called by vtkPVPythonAlgorithmPlugin to discover
ParaView plugin version, if any."""
from inspect import ismodule, isclass
if ismodule(module_or_package) and hasattr(module_or_package, "paraview_plugin_version"):
return str(getattr(module_or_package, "paraview_plugin_version"))
else:
return "(unknown)"
[docs]def load_plugin(filepath, default_modulename=None):
"""helper function called by vtkPVPythonAlgorithmPlugin to load
a python file."""
# should we scope these under a plugins namespace?
if default_modulename:
modulename = default_modulename
else:
import os.path
modulename = "%s" % os.path.splitext(os.path.basename(filepath))[0]
try:
# for Python 3.5+
from importlib.util import spec_from_file_location, module_from_spec
spec = spec_from_file_location(modulename, filepath)
module = module_from_spec(spec)
spec.loader.exec_module(module)
except ImportError:
# for Python 3.3 and 3.4
import imp
module = imp.load_source(modulename, filepath)
import sys
sys.modules[modulename] = module
return module
[docs]def reload_plugin_module(module):
"""helper function to reload a plugin module previously loaded via
load_plugin"""
from inspect import getsourcefile, ismodule
if ismodule(module) and getsourcefile(module):
return load_plugin(getsourcefile(module), module.__name__)
return module