Apache NiFi ExecuteScript: Groovy script to replace Json values via a mapping file -
i working apache nifi 0.5.1 on groovy script replace incoming json values ones contained in mapping file. mapping file looks (it simple .txt):
header1;header2;header3 a;some text;a2
i have started following:
import groovy.json.jsonbuilder import groovy.json.jsonslurper import java.nio.charset.standardcharsets def flowfile = session.get(); if (flowfile == null) { return; } flowfile = session.write(flowfile, { inputstream, outputstream -> def content = """ { "field1": "a" "field2": "a", "field3": "a" }""" def slurped = new jsonslurper().parsetext(content) def builder = new jsonbuilder(slurped) builder.content.field1 = "a" builder.content.field2 = "some text" builder.content.field3 = "a2" outputstream.write(builder.toprettystring().getbytes(standardcharsets.utf_8)) } streamcallback) session.transfer(flowfile, executescript.rel_success)
this first step works fine, although hardcoded , far being ideal. initial thought use replacetextwithmapping able perform substitutions, not work complex mapping files (e.g. multi-columns). take further, not sure how go it. first of all, instead of passing in entire harcoded json, read incoming flowfile. how possible in nifi? before running script part of executescript, have output .json file content via updateattribute filename = myresultingjson.json. furthermore, know how load .txt file groovy (string mappingcontent= new file('/path/to/file').gettext('utf-8'
), how use loaded file perform substitutions resulting json this:
{ "field1": "a" "field2": "some text", "field3": "a2" }
thank help,
i.
edit:
first modification script allow me read inputstream:
import groovy.json.jsonbuilder import groovy.json.jsonslurper import java.nio.charset.standardcharsets def flowfile = session.get(); if (flowfile == null) { return; } flowfile = session.write(flowfile, { inputstream, outputstream -> def content = org.apache.commons.io.ioutils.tostring(inputstream, java.nio.charset.standardcharsets.utf_8) def slurped = new jsonslurper().parsetext(content) def builder = new jsonbuilder(slurped) builder.content.field1 = "a" builder.content.field2 = "some text" builder.content.field3 = "a2" outputstream.write(builder.toprettystring().getbytes(standardcharsets.utf_8)) } streamcallback) session.transfer(flowfile, executescript.rel_success)
i have moved testing approach configslurper , wrote generic class before injecting logic groovy executescript:
class testloadingmappings { static void main(string[] args) { def content = ''' {"field2":"a", "field3": "a" } ''' println "this content of json file" + content def slurped = new jsonslurper().parsetext(content) def builder = new jsonbuilder(slurped) println "this content of builder " + builder def propertiesfile = new file("d:\\myfile.txt") properties props = new properties() props.load(new fileinputstream(propertiesfile)) def config = new configslurper().parse(props).flatten() println "this content of config " + config config.each { k, v -> if (builder[k]) { builder[k] = v } } println(builder.toprettystring()) } }
i returned groovy.lang.missinpropertyexception , because mapping not straightforward. fields/properties (from field1 field3) come inpustream same value (e.g.) , means every time field2, example, has value can valid other 2 properties. however, cannot have mapping field maps "field2" : "sometext" because actual mapping driven first value in mapping file. here example:
{ "field1": "a" "field2": "a", "field3": "a" }
in mapping file have:
a;some text;a2
however field1 needs mapping (first value in file) or stay same, if wish. field2 needs mapping value in last column (a2) , field3 needs mapping 'some text' in middle column.
can this? can achieve groovy , executescript. if needed can split config files two.
also, have had quick @ other option (putdistributedmapcache) , not sure have understood how load key-value pairs distributed map cache. looks need have distributedmapcacheclient , not sure whether can easy implement.
thank you!
edit 2:
some other progress, have mapping working, not sure why fails when reading second line of properties file:
"a" sometext "a2" anothertext class testloadingmappings { static void main(string[] args) { def content = ''' {"field2":"a", "field3":"a" } ''' println "this content of json file" + content def slurper = new jsonslurper().parsetext(content) def builder = new jsonbuilder(slurper) println "this content of builder " + builder assert builder.content.field2 == "a" assert builder.content.field3 == "a" def propertiesfile = new file('d:\\mytest.txt') properties props = new properties() props.load(new fileinputstream(propertiesfile)) println "this content of properties " + props def config = new configslurper().parse(props).flatten() config.each { k, v -> if (builder.content.field2) { builder.content.field2 = config[k] } if (builder.content.field3) { builder.content.field3 = config[k] } println(builder.toprettystring()) println "this builder " + builder } } }
i returned with: this builder {"field2":"sometext","field3":"sometext"}
any idea why?
thank much
edit 3 (moved below)
i have written following:
import groovy.json.jsonbuilder import groovy.json.jsonslurper class testloadingmappings { static void main(string[] args) { def content = ''' {"field2":"a", "field3":"a" } ''' def slurper = new jsonslurper().parsetext(content) def builder = new jsonbuilder(slurper) println "this content of builder " + builder def propertiesfile = new file('d:\\properties.txt') properties props = new properties() props.load(new fileinputstream(propertiesfile)) def conf = new configslurper().parse(props).flatten() conf.each { k, v -> if (builder.content[k]) { builder.content[k] = v } println("this prints resulting json :" + builder.toprettystring()) } } }
however, had change structure of mapping file following:
"field1"="substitutiontext" "field2"="substitutiontext2"
i have 'incorporated' configslurper executescript script, follows:
import groovy.json.jsonbuilder import groovy.json.jsonslurper import org.apache.commons.io.ioutils import org.apache.nifi.processor.io.streamcallback import java.nio.charset.standardcharsets def flowfile = session.get(); if (flowfile == null) { return; } flowfile = session.write(flowfile, { inputstream, outputstream -> def content = ioutils.tostring(inputstream, standardcharsets.utf_8) def slurped = new jsonslurper().parsetext(content) def builder = new jsonbuilder(slurped) outputstream.write(builder.toprettystring().getbytes(standardcharsets.utf_8)) def propertiesfile = new file(''d:\\properties.txt') properties props = new properties() props.load(new fileinputstream(propertiesfile)) def conf = new configslurper().parse(props).flatten(); conf.each { k, v -> if (builder.content[k]) { builder.content[k] = v } } outputstream.write(content.tostring().getbytes(standardcharsets.utf_8)) } streamcallback) session.transfer(flowfile, executescript.rel_success)
the problem seems fact cannot replicate logic in original mapping file using similar 1 created in testloadingmappings. mentioned in previous comments/edits, mapping should work in way:
field2 = if substitute "some text"
field3 = if substitute a2
...
field2 = b substitute "some other text"
field3 = b substitute b2
and son on.
in nutshell, mappings driven incoming value in inputstream (which varies), conditionally maps different values depending on json attribute. can please recommend better way achieve mapping via groovy/executescript? have flexibility in modifying mapping file, can see way can change in order achieve desired mappings?
thanks
i have examples on how read in flow file containing json:
http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html
you've got right structure above; can use "inputstream" variable in closure read incoming flow file contents. if want read in @ once (which need json), can use ioutils.tostring() followed jsonslurper, done in examples in links above.
for mapping file, if json "flat", have java properties file, mapping name of field new value:
field2=some text
field3=a2
check out configslurper reading in properties files.
once have slurped incoming json file , read in mapping file, can @ individual fields of json using array notation instead of direct member notation. let's read properties configslurper, , want overwrite existing property in input json (called "json" example) 1 properties file. might following:
config.parse(props).flatten().each { k,v -> if(json[k]) { json[k] = v } }
you can continue on outputstream.write().
instead of reading mappings file, load distributed cache via putdistributedmapcache processor. can read distributedcachemapserver in executescript, have example here:
http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html
if mapping complex, may want use transformjson processor, available in next release of nifi (0.7.0). associated jira case here:
https://issues.apache.org/jira/browse/nifi-361
edit:
in response edits, didn't realize had multiple rules various values. in case properties file not best way represent mappings. instead use json:
{ "field2": { "a": "some text", "b": "some other text" }, "field3": { "a": "a2", "b": "b2" } }
then can use jsonslurper read in mappings file. here example of using above mapping file:
import groovy.json.jsonbuilder import groovy.json.jsonslurper import org.apache.commons.io.ioutils import org.apache.nifi.processor.io.streamcallback import java.nio.charset.standardcharsets def flowfile = session.get(); if (flowfile == null) { return; } def mappingjson = new file('/users/mburgess/mappings.json').text flowfile = session.write(flowfile, { inputstream, outputstream -> def content = ioutils.tostring(inputstream, standardcharsets.utf_8) def injson = new jsonslurper().parsetext(content) def mappings = new jsonslurper().parsetext(mappingjson) injson.each {k,v -> injson[k] = mappings[k][v] } outputstream.write(injson.tostring().getbytes(standardcharsets.utf_8)) } streamcallback) session.transfer(flowfile, rel_success)