Adding new VisTrails modules
VisTrails features a plugin system that allows users to easily extend the set of modules available for the workflows. Each set of modules is organized into one
package, which is simply a Python source file that respects simple conventions.
Each
VisTrails module behaves in a dataflow fashion: It requests input from ports, performs the necessary computation, and creates the output for its output ports. In the source file, each module is simply a Python class that overrides the
compute() method. If two different modules request an output, the execution infrastructure ensures that each module only gets one call to
compute(). Here's how a typical module is implemented:
class PythonCalc(Module):
def compute(self):
v1 = self.getInputFromPort("value1")
v2 = self.getInputFromPort("value2")
self.setResult("value", self.op(v1, v2))
def op(self, v1, v2):
op = self.getInputFromPort("op")
if op == '+':
return v1 + v2
elif op == '-':
return v1 - v2
elif op == '*':
return v1 * v2
elif op == '/':
return v1 / v2
raise ModuleError("unrecognized operation: '%s'" % op)
Additionally, this class needs to be registered with the runtime
VisTrails type system, so that it's available in the GUI:
def initialize(*args, **keywords):
reg = modules.module_registry
reg.addModule(PythonCalc)
reg.addInputPort(PythonCalc, "value1", (modules.basic_modules.Float, 'the first argument'))
reg.addInputPort(PythonCalc, "value2", (modules.basic_modules.Float, 'the second argument'))
reg.addInputPort(PythonCalc, "op", (modules.basic_modules.String, 'the operation'))
reg.addOutputPort(PythonCalc, "value", (modules.basic_modules.Float, 'the result'))
This is essentially all the code necessary to create a new module within
VisTrails. This is how we created the provenance challenge modules. It is very easy to wrap command line functionality by using appropriate
system() calls. The intermediate results are threaded by using temporary files, and
VisTrails provides a convenient facility for handling these, so modules don't need to be responsible for cleanup. Each module can request unique temporary filenames that are created within a unique temporary directory. The entire provenance package python file is shown below. Notice that the code is essentially just constructing command lines for executing the
FSL and AIR utilities. Notice also the use of the annotation logging API on AIRHeaderFile.compute(). The call to self.annotate() registers a dictionary of key-value pairs in the database for later querying:
import modules
import modules.module_registry
import modules.basic_modules
from modules.vistrails_module import Module, ModuleError, newModule
import os
import os.path
import stat
################################################################################
global_airpath = ""
global_fslpath = ""
global_netpbmpath = ""
class ProvenanceChallenge(Module):
__quiet = True
__programsQuiet = True
def air_cmd_line(self, cmd, *params):
return (global_airpath + cmd + " %s" * len(params)) % params
def fsl_cmd_line(self, cmd, *params):
return ('FSLOUTPUTTYPE=NIFTI_GZ ' + global_fslpath + cmd + " %s" * len(params)) % params
def run(self, cmd):
if not self.__quiet:
print cmd
if self.__programsQuiet:
cmd = '(' + cmd + ') 2>&1 >/dev/null'
r = os.system(cmd)
if r != 0:
raise ModuleError(self, "system call failed: '%s'" % cmd)
class AIRHeaderFile(modules.basic_modules.File):
def get_header_annotations(self):
try:
x = self.interpreter.filePool.createFile()
os.system(global_airpath + 'scanheader ' + self.name + '> ' +
x.name)
f = file(x.name, 'r')
except:
raise ModuleError(self, "Could not open header file " + self.name)
result = {}
for l in f:
l = l[:-1]
if not l:
continue
entries = l.split('=')
if len(entries) != 2:
raise ModuleError(self,
"Error parsing line '%s' of header %s" %
(l[:-1], self.name))
result[entries[0]] = entries[1]
return result
def compute(self):
modules.basic_modules.File.compute(self)
d = self.get_header_annotations()
self.annotate(d)
class AlignWarp(ProvenanceChallenge):
def compute(self):
image = self.getInputFromPort("image")
ref = self.getInputFromPort("reference")
model = self.getInputFromPort("model")
o = self.interpreter.filePool.createFile(suffix='.warp')
cmd = self.air_cmd_line('align_warp',
image.name,
ref.name,
o.name,
'-m',
model,
'-q')
self.run(cmd)
self.setResult("output", o)
class Reslice(ProvenanceChallenge):
def compute(self):
warp = self.getInputFromPort("warp")
o = self.interpreter.filePool.createFile()
cmd = self.air_cmd_line('reslice',
warp.name,
o.name)
self.run(cmd)
self.setResult("output", o)
class SoftMean(ProvenanceChallenge):
def compute(self):
imageList = self.getInputFromPort("imageList")
o = self.interpreter.filePool.createFile(suffix='.hdr')
cmd = self.air_cmd_line('softmean',
o.name,
'y',
'null',
*[f.name for f in imageList])
self.run(cmd)
self.setResult('output', o)
class Slicer(ProvenanceChallenge):
def compute(self):
cmd = ['slicer']
i = self.getInputFromPort("input")
cmd.append(i.name)
if self.hasInputFromPort("slice_x"):
cmd.append('-x')
cmd.append(str(self.getInputFromPort("slice_x")))
elif self.hasInputFromPort("slice_y"):
cmd.append('-y')
cmd.append(str(self.getInputFromPort("slice_y")))
elif self.hasInputFromPort("slice_z"):
cmd.append('-z')
cmd.append(str(self.getInputFromPort("slice_z")))
o = self.interpreter.filePool.createFile(suffix='.pgm')
cmd.append(o.name)
self.run(self.fsl_cmd_line(*cmd))
self.setResult('output', o)
class PGMToPPM(ProvenanceChallenge):
def compute(self):
cmd = ['pgmtoppm white']
i = self.getInputFromPort("input")
cmd.append(i.name)
o = self.interpreter.filePool.createFile(suffix='.ppm')
cmd.append(' >')
cmd.append(o.name)
self.run(" ".join(cmd))
self.setResult('output', o)
class PNMToJpeg(ProvenanceChallenge):
def compute(self):
cmd = ['pnmtojpeg']
i = self.getInputFromPort("input")
cmd.append(i.name)
o = self.interpreter.filePool.createFile(suffix='.jpg')
cmd.append(' >')
cmd.append(o.name)
self.run(" ".join(cmd))
self.setResult('output', o)
################################################################################
def checkProgram(fileName):
return os.path.isfile(fileName) and os.stat(fileName) & 005
def initialize(airpath=None, fslpath=None, netpbmpath=None, *args, **kwargs):
print "Initializing Provenance Challenge Package"
if not airpath:
print "airpath not specified or incorrect: Will assume AIR tools are on the path"
else:
print "Will use AIR tools from ", airpath
global global_airpath
global_airpath = airpath + '/'
if not fslpath:
print "fslpath not specified or incorrect: Will assume fsl tools are on the path"
else:
print "Will use FSL tools from ", fslpath
global global_fslpath
global_fslpath = fslpath + '/'
if not netpbmpath:
print "netpbmpath not specified or incorrect: Will assume netpbm tools are on the path"
else:
print "Will use netpbm tools from ", netpbmpath
global global_netpbmpath
global_netpbmpath = netpbmpath + '/'
reg = modules.module_registry
basic = modules.basic_modules
reg.addModule(ProvenanceChallenge)
reg.addModule(AlignWarp)
reg.addInputPort(AlignWarp, "image", (basic.File, 'the image file to be deformed'))
reg.addInputPort(AlignWarp, "image_header", (basic.File, 'the header of the image to be deformed'))
reg.addInputPort(AlignWarp, "reference", (basic.File, 'the reference image'))
reg.addInputPort(AlignWarp, "reference_header", (basic.File, 'the header of the reference image'))
reg.addInputPort(AlignWarp, "model", (basic.Integer, 'the deformation model'))
reg.addOutputPort(AlignWarp, "output", (basic.File, 'the output deformation'))
reg.addModule(Reslice)
reg.addInputPort(Reslice, "warp", (basic.File, 'the warping to be resliced'))
reg.addOutputPort(Reslice, "output", (basic.File, 'the new slice'))
reg.addModule(SoftMean)
reg.addInputPort(SoftMean, "imageList", (basic.List, 'image files'))
reg.addOutputPort(SoftMean, "output", (basic.File, 'average image'))
reg.addModule(Slicer)
reg.addInputPort(Slicer, "input", (basic.File, 'the input file to be sliced'))
reg.addInputPort(Slicer, "slice_x", (basic.Float, 'sagittal slice with given value'))
reg.addInputPort(Slicer, "slice_y", (basic.Float, 'coronal slice with given value'))
reg.addInputPort(Slicer, "slice_z", (basic.Float, 'axial slice with given value'))
reg.addOutputPort(Slicer, "output", (basic.File, 'slice output'))
reg.addModule(PGMToPPM)
reg.addInputPort(PGMToPPM, "input", (basic.File, "input"))
reg.addOutputPort(PGMToPPM, "output", (basic.File, "output"))
reg.addModule(PNMToJpeg)
reg.addInputPort(PNMToJpeg, "input", (basic.File, "input"))
reg.addOutputPort(PNMToJpeg, "output", (basic.File, "output"))
reg.addModule(AIRHeaderFile)
--
EmanueleSantos - 12 Sep 2006
to top