NiFI for Apache - the flow using records and registry

Presentation

In a previous guide, we’ve setup MiNiFi on Web servers to export Apache access log event to a central NiFi server. Then we saw an example of flow build in this NiFi server to handle this flow. This flow was using standard NiFi processors, manipulating each event as a string. Now, we will start a new flow, achieving the same purpose but using a record oriented approach.
We will then discover the ease of use of the record oriented flow files and how it can speed up the deployment of a flow.

Pieces needed from before

From other tutorials on this site, we have:
  1. The Registry service up and running (see …) with the HortonworksSchemaRegistry Controller Service configured in your central NiFi server.
  2. The MiNiFi agent installed and configured on our Web servers.
  3. A central NiFi server with only one remote processor called “Remote NiFi” to receive log events from your MiNiFi agent(s).

Prepare the required Controller Services

HortonworksRegistry

0a-Controller_HWregistry.PNG

GrokReader

Use 'Schema name' property => schema name (apacheraw) is the value of the property "Schema Name" of this controller.
Schema Registry = HortonworksSchemaRegistry
Grok Pattern File stay empty because we are satisfied with the default patterns supplied with NiFi.
Grok Expression = %{COMBINEDLOG} -> we need to know which field name are used in the pattern. These names must match the field name in our schema used to write

0b-Controller_GrokReader.PNG

JsonRecordSetWriter

Use 'Schema name' property => schema name (apacheraw) is the value of the property "Schema Name" of this controller.
Schema Registry = HortonworksSchemaRegistry

0e-Controller_JSONTreeWriter.PNG

JsonTreeReader

Use 'Schema name' property => schema attribute name (${schema.name}) is the value of the property "Schema Name" of this controller, therefore, whatever schema was used to write this record, we will be able to read it
Schema Registry = HortonworksSchemaRegistry

0d-Controller_JSONTreeReader.PNG

IPLookupService

The IP lookup service of NiFi requires the GeoLite database version 2 from MaxMind. The file is freely downloadable from their website. You have different choice of database, we will use the one mapping IP up to the City level.
Download and extract the file in a directory, on each of your NiFi server. Be sure that the directory is at least readable for the Linux user running NiFi (in our case, nifi).

MaxMind Database file: /opt/nifi-custom/GeoLite2-City.mmdb in our case.

Choose the lookup to be done (each lookup option represents one lookup iteration, so it can become computer intensive if you are handling a lot of records). We need at least to have the “Geo Enrichment” to get the country, region, city and coordinates associated with one IP (if any found in the DB).
The value to be looked up (the ip address) must be found in the field name specified by the user-defined 'ip' attribute in the LookupRecord processors. See the configuration of the LookupRecord processor later on.

0g-Controller_IPLookup.PNG

Add and configure the need processors

The QueryRecord processor

With this processor, we will be able to:
  • Extract each field of log lines with the GrokReader controller
  • Write a JSON representation with the JsonWriter controller
  • Set “Include Zero Record FlowFiles” to false or you will receive flow files with empty record for each SELECT that doesn’t return anything, so an empty record on each relations.
  • Query the “agent” field, containing the web browser UserAgent string, so we can add a record telling which kind of entity possibly initiated the web request.
This processor allows querying the record(s) contained in one single flow file using an standard SQL SELECT statement. You write an SQL statement selecting fields in a table called “flowfile”. By using a WHERE clause, we will add a new field to returned one different according to the SELECT WHERE clause:
SELECT field1, field2, “VALUE” as field3 FROM flowfile WHERE field1 like ‘%string%’
So a new field is added (it must be present in your schema of course) if the flow file field1 contains “string”. If not matching, the flow file will be routed to the “failure” relationship.
We thus create 4 relationship, with the following SELECT statements:
  • Human:
select "timestamp", "verb", "response", "clientip","auth","ident","request","agent","bytes","referrer","httpversion",'Human' as type
from flowfile
where lower(agent) not like '%bot%'
and lower(agent) not like '%wget%'
and lower(agent) not like '%spider%'
and lower(agent) not like '%bark%'
and lower(agent) not like '%check_http%'
  • Nagios:
select "timestamp", "verb", "response", "clientip","auth","ident","request","agent","bytes","referrer","httpversion",'Nagios' as type
from flowfile
where lower(agent) like '%check_http%'
  • Wget
select "timestamp", "verb", "response", "clientip","auth","ident","request","agent","bytes","referrer","httpversion",'Wget' as type
from flowfile
where lower(agent) like '%wget%'
  • Robot:
select "timestamp", "verb", "response", "clientip","auth","ident","request","agent","bytes","referrer","httpversion",'Robot' as type
from flowfile
where lower(agent) like '%bot%'
or lower(agent) like '%spider%'
or lower(agent) not like '%bark%'
 
Of course, the above selection is not an exact science and must be seen more as an example than a real UserAgent interpretation.

1-QueryRecord.PNG

Each relationship defined above will be connected to the next processor, a LookupRecord processor.

The LookupProcessor

We add a LookupProcessor that will:
  • Read the records using the JSON Reader controller
  • Write the records after lookup using the JSON Writer controller
  • Use the IPLookupService to do a geographical enrichment of the IP initiating the web connection.
  • Set the Result Record Path to “/GeoIP”, which is defined in our schema to hold the resulting JSON object of the IP lookup. The IP Lookup service uses a fixed schema which is detailed in the help page of the service. We just had to append it into our schema under the field we would like to see this information. /GeoIP is our choice, what’s come below is imposed by the IP Lookup service
  • Chose “Route to success” for the Routing Strategy. In this case, whatever the result of the lookup is, the flow file is routed to success. If we had to do a routing based on the success or not of the lookup, we would have chosen “Route to matched or unmatched”.
Thus, in this case, even if the IP Lookup doesn’t find the IP in the database, the flow file is routed to the Success relationship.
  • Add a property called “ip” with the value of the path to the record holding the IP to lookup. The IP Lookup service expect a property called “ip”.
2-LookupRecord.PNG

Connect the success relationship to the next processor, an UpdateRecord processor.

The UpdateRecord processor

It will:
  • Read incoming records using the JSON Reader controller
  • Write records using the JSON Writer controller
  • Have a replacement strategy of “Literal Value” -> the value of each property specifying the field to be updated contains a value (or an expression that will be evaluated as a value).
The other choice for the replacement strategy is “Record Path Value”. In this case, the value you give to update a field is a record path to another field in the record. Nice examples of this configuration are given on the documentation website of NiFi: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.4.0/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html
We will use this processor to add some interesting information not part of the log line itself, like:
  • Field host: hold the originating hostname, thus the name of the web server running MiNiFi
  • Field logfile: to retain the name of the log file from which this event was found
  • Field website: in an Apache virtual host configuration where each virtual host create their own log, we derive its name from the name of the log file.
  • We will also use it to reformat the timestamp field coming from the event into a format understood by ElasticSearch by default.
So we have these property names with their values:
  1. /timestamp -> ${field.value:toDate("dd/MMM/yyyy:HH:mm:ss Z"):format("yyyy-MM-dd'T'HH:mm:ssZZ")}
  2. /host -> ${s2s.host}
  3. /logfile -> ${tailfile.original.path}
  4. /website -> ${tailfile.original.path:substringAfterLast('/'):substringBeforeLast('.')}
In the first expression, note the use of the ${field.value} expression which allow us to build an expression to manipulate the content of the field record given as property name.
The other 3 expressions are using attributes added by the MiNiFi agent to the flow file.

3-UpdateRecord.PNG

The ElasticSearch processor

The last processor to add to our flow is the one inserting the flow file record into an ElasticSearch index. We use the record oriented version of the ElasticSearch HTTP processor: PutElasticsearchHttpRecord processor.
This processor will:
  • Use the HTTP transport to connect to the ElasticSearch cluster, so an URL of the form must be http://<node1>:9200. It seems that when using the REST API (http) from NiFi you can only specify one node of your ElasticSearch cluster. If this node get missing, your flow files will be queued in the retry relationship of this processor. This is why we create a connection using it pointing back to the processor.
  • Use the JSON Reader controller to read the record in JSON format
  • The name of the index is an expression creating an index name of the form “webrecords-2017-10” (depending of your ElasticSearch settings, if the index doesn’t exist it may be created by ElasticSearch).
  • The type of document: this is the ElasticSearch name used to identify the type of document inserted. This parameter is mandatory but you are free to choose it by yourself.
  • The Index Operation to apply: in our case, this is “Index”, because we want to index a new document.
4-PutESRecord.PNG
 

Conclusion

Our full flow looks like this:

ApacheFlowWithRecord.PNG

By using less processors, we are able to achieve the same as the non-record oriented flow seen previously.
Here we are not using the SplitLine processor because the record oriented processor are able to handle smoothly any originating flow file that would contains multiple lines. For each flow files, the record reader and writer always manage automatically multiple records per flow file.
In the non-oriented record flow, we need to split each flow file to one single-line flow file because of the handling we do. We store in an attribute the result of the parsing of the “agent” attribute (UserAgent). If multiple lines contain UserAgent from different sources, we cannot interpret them correctly with only one attribute.
Idem for the creation of the JSON document which is done from value contained in attributes, so one flow file can only contain information to create one JSON object.
Therefore, we can say that with the record oriented NiFi flow we have less worry to bother us. If MiNiFi send one flow file with multiple log line, no problem, it is handled even without you noticing it.
 
You can download the XML template for this flow: Nifi_for_Apache-flow-with-records.xml
You can also download a full Avro Schema for this flow, to be uploaded to the HortonWorks Registry server: Schema_ApacheJSON.txt
A sample of JSON record created using this flow:
[ {
  "timestamp" : "2017-10-01T20:24:35+0200",
  "verb" : "GET",
  "response" : "200",
  "clientip" : "213.136.91.195",
  "auth" : "-",
  "ident" : "-",
  "request" : "/index.php?/category/669/created-monthly-list-2017-9&lang=nl_NL",
  "agent" : "Mozilla/5.0 (compatible; MJ12bot/v1.4.7; http://mj12bot.com/)",
  "bytes" : 7192,
  "referrer" : "-",
  "httpversion" : "1.1",
  "GeoIP" : {
    "geo" : {
      "city" : null,
      "accuracy" : 200,
      "metroCode" : null,
      "timeZone" : null,
      "latitude" : 51.2993,
      "longitude" : 9.491,
      "country" : {
        "name" : "Germany",
        "isoCode" : "DE"
      },
      "subdivisions" : [ ],
      "continent" : "Europe",
      "postalCode" : null
    },
    "isp" : null,
    "domainName" : null,
    "connectionType" : null,
    "anonymousIp" : null
  },
  "website" : "gallery",
  "logfile" : "/var/log/apache2/gallery.log",
  "host" : "websites",
  "type" : "Bot"
} ]

 
Note the starting and ending [ ], meaning that this is an array of records that are read and written by the processors.