diff --git a/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsFinderIT.java b/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsFinderIT.java index 0f9f65a..18c4226 100644 --- a/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsFinderIT.java +++ b/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsFinderIT.java @@ -61,6 +61,27 @@ public void testProcessorWithInputFlow() { assertThat(thrown.getMessage(), is(equalTo("Nothing back!"))); } + @Test + public void testProcessorWithCorrectInputFlow() { + testRunner.enqueue(""" + { + "expiryTimeout": "2", + "properties": [ + { + "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", + "uri":"http://schema.org/Car" + }, + { + "key": "http://schema.org/identifier", + "stringLiteral":"1874258" + } + ], + "scope": "LOCAL" + } + """); + run(); + } + @Test public void testProcessorEmptyInputFlow() { testRunner.enqueue(""" diff --git a/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsPublisherIT.java b/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsPublisherIT.java index 898a71f..a2c369b 100644 --- a/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsPublisherIT.java +++ b/nifi-iotics-processors/src/integration/java/smartrics/iotics/nifi/processors/IoticsPublisherIT.java @@ -44,18 +44,15 @@ public void init() throws Exception { feed = model.feeds().getFirst(); } - @Test - public void testProcessor() throws IOException, InterruptedException { + public void testProcessor() throws InterruptedException { int count = 5; String content = newRandomContent(count); - testRunner.enqueue(content); testRunner.run(); //assert the input Q is empty and the flowfile is processed testRunner.assertQueueEmpty(); List results = testRunner.getFlowFilesForRelationship(Constants.SUCCESS); - assertThat(results.size(), is(count)); results.forEach(mockFlowFile -> { String outputFlowfileContent = new String(testRunner.getContentAsByteArray(mockFlowFile)); diff --git a/nifi-iotics-processors/src/main/java/smartrics/iotics/nifi/processors/IoticsFinder.java b/nifi-iotics-processors/src/main/java/smartrics/iotics/nifi/processors/IoticsFinder.java index adbd436..7cba9bf 100644 --- a/nifi-iotics-processors/src/main/java/smartrics/iotics/nifi/processors/IoticsFinder.java +++ b/nifi-iotics-processors/src/main/java/smartrics/iotics/nifi/processors/IoticsFinder.java @@ -54,42 +54,68 @@ @Tags({"IOTICS", "DIGITAL TWIN", "SEARCH"}) @CapabilityDescription(""" - Processor for IOTICS search +Processor for IOTICS search. The processor expects an input flow file with a search payload JSON object. +The processor's properties will be supplied - if available - as defaults if not available in the input flow file. +An example payload is: +
+{
+    "expiryTimeout": "2",
+    "text": "something to search",
+    "properties": [
+        {
+            "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
+            "uri":"http://schema.org/FooBarBaz"
+        },
+        {
+            "key": "http://schema.org/identifier",
+            "stringLiteral":"1234"
+        }
+    ],
+    "location": {
+        "lat": 53.09873,
+        "lon": 0.987654,
+        "r": 1
+    },
+    "scope": "LOCAL",
+    "responseType": "MINIMAL"
+}
+
+ """) public class IoticsFinder extends AbstractProcessor { public static PropertyDescriptor EXPIRY_TIMEOUT = new PropertyDescriptor .Builder().name("expiryTimeoutSec") .displayName("Expiry Timeout in Seconds") - .description("How long to wait for before disconnecting from receiving search results, in seconds") + .description("How long to wait for before disconnecting from receiving search results, in seconds. This value will be used if none supplied by the inbound flow file.") .required(true) .addValidator(POSITIVE_INTEGER_VALIDATOR) .build(); public static PropertyDescriptor TEXT = new PropertyDescriptor .Builder().name("textFilter") .displayName("Text Filter") - .description("text filter matching label or comment") + .description("Text filter matching label or comment. This filter will be used if none supplied by the inbound flow file.") .required(false) .addValidator(NON_BLANK_VALIDATOR) .build(); public static PropertyDescriptor LOCATION = new PropertyDescriptor .Builder().name("locationFilter") .displayName("Location Filter") - .description("JSON map with the following three keys: 'r', 'lat', 'lon', 'r' is the radius in KM of the circle centered in lat/lon.") + .description("JSON map with the following three keys: 'r', 'lat', 'lon', 'r' is the radius in KM of the circle centered in lat/lon. This object will be used if none supplied by the inbound flow file.") .required(false) .addValidator(new LocationValidator()) .build(); public static PropertyDescriptor PROPERTIES = new PropertyDescriptor .Builder().name("propertiesFilter") .displayName("Properties Filter") - .description("JSON array where each entry is a JSON map with 'key' and one of 'uri', 'stringLiteral', 'literal'. In case 'literal' is specified, an 'dataType' may be supplied, with value one of the valid xsd data types (int, boolean, anyURI, ...)") + .description("JSON array where each entry is a JSON map with 'key' and one of 'uri', 'stringLiteral', 'literal'. In case 'literal' is specified, an 'dataType' may be supplied, with value one of the valid xsd data types (int, boolean, anyURI, ...). This array will be used if none supplied by the inbound flow file.") .required(false) .addValidator(NON_BLANK_VALIDATOR) .build(); public static PropertyDescriptor QUERY_RESPONSE_TYPE = new PropertyDescriptor.Builder() .name("queryResponseType") .displayName("Query Response Type") - .description("query response type: " + ResponseType.FULL.name() + ", " + ResponseType.LOCATED + ", " + ResponseType.MINIMAL) + .description("query response type: " + ResponseType.FULL.name() + ", " + ResponseType.LOCATED + ", " + ResponseType.MINIMAL + ". This value will be used if none supplied by the inbound flow file.") .allowableValues(Arrays.stream(ResponseType.values()) .map(enumValue -> new AllowableValue(enumValue.name(), enumValue.name())) .toArray(AllowableValue[]::new)) @@ -224,7 +250,7 @@ public void onNext(SearchResponse searchResponse) { to.twins().forEach(twin -> { try { Gson gson = new Gson(); - FlowFile flowFile = session.create(session.get()); + FlowFile flowFile = session.create(); try { String json = gson.toJson(twin, MyTwinModel.class); session.write(flowFile, out -> out.write(json.getBytes(StandardCharsets.UTF_8))); @@ -234,7 +260,7 @@ public void onNext(SearchResponse searchResponse) { session.transfer(flowFile, FAILURE); } } catch (Exception e) { - getLogger().warn("unable to process twin {}", twin); + getLogger().warn("unable to process twin {}", twin.id(), e); } }); } diff --git a/nifi-iotics-processors/src/test/resources/car_twin.json b/nifi-iotics-processors/src/test/resources/car_twin.json index efee272..a94feb3 100644 --- a/nifi-iotics-processors/src/test/resources/car_twin.json +++ b/nifi-iotics-processors/src/test/resources/car_twin.json @@ -53,7 +53,7 @@ }, { "key": "http://www.w3.org/2000/01/rdf-schema#label", - "value": "Toyota Camry 1", + "value": "Toyota Camry 10987654", "type": "StringLiteral" }, {