NiFi and the Hortonworks Registry

Introduction

The HortonWorks Registry is a service running on your Hortonworks Data Flow cluster that will allow you to centrally store and distribute schemas of how the data you are manipulating are organized.
The Registry is a web application offering:
  • A web interface to add and modify schema
  • A REST API that can be used by any other service to retrieve schema information
The Registry retains previous version of the schema each time you perform an update on an existing schema.
The schema can be edited online, directly in the Web GUI and it is automatically checked for syntax errors.
Currently, only Apache AVRO schema specification is supported by the Registry. If you need information on how to write a schema for the HortonWorks Registry, you need to have a close look into the official Apache AVRO documentation.
With the Registry in place, you can share between various NiFi instances and processors a common schema representing how the information is organized. Each schema will represent an object and its fields and the type of each fields (string, Boolean, …). You create schema to quickly parse JSON objects, CSV lines or whatever content you may have.
With the Registry and so schemas in place, you have the ability to use Record-oriented flow files inside NiFi. In this case (you will see later how to implement and use them), the content of the flow files become a record formatted according your schemas. Each record can thus be manipulated more easily by NiFi as he knows about the fields it contains, thanks the Registry.
To be able to use this feature inside NiFi, you need to use record oriented processors. Unfortunately, not all the processors have a version record-oriented, but with the supplied one, you can build great flows.
There are record-oriented processors in NiFi to:
  • Convert from one schema to another (ConvertRecord)
  • Perform look up tasks like GeoIP, key/value lookup, script lookup (LookupRecord)
  • Add, modify or update fields inside records (UpdateRecord)
  • Route flow files based on record content (QueryRecord)
  • Split or group similar records based on their content (SplitRecord and PartitionRecord)
  • Read and write to Kafka topics (ConsumeKafkaRecord_0_10 and PublishKafkaRecord_0_10)
  • Execute an SQL statement (INSERT, UPDATE, DELETE or user specified) (PutDatabaseRecord)
  • Insert a record inside an ElasticSearch cluster (PutElasticsearchHttpRecord)
 
Refer to the NiFi RecordPath Guide to see how you can manipulate record oriented flow files.
 

Registry installation

Prior to do the installation, you will have to create an empty database and assign a username and password to connect to it. The database server supported so fare are MySQL and PostgreSQL.
Here is a SQL script to create a MySQL database:
CREATE DATABASE registry;
CREATE USER 'registry'@'%' IDENTIFIED BY 'password';
GRANT ALL PRIVILEGES ON registry.* TO 'registry'@'%' WITH GRANT OPTION ;
Like for any other HDP or HDF services, you need to use Ambari to add the Registry service then select the host you want it to run on. You will be asked to enter the information about the database connection.
Enter the configuration screen and give the parameters to connect to the database defined before.

1.Registry_config.PNG

If you let the parameters unchanged, the Registry will run the web GUI on port 7788 and 7789. By default it wants to connect to a MySQL instance installed on the node where you have installed it.
But, as shown in the printscreen above, you can change the Web GUI port (here set to 8080) and also point to a MySQL DB running on a totally different server.
Once the installation is complete, the Registry web GUI will be accessible at http://<your.host>:7788
 

Adding information into the Registry

Launch a web browser and go to the main page of the Registry. On a fresh install, the page will look like this:

2.empty_registry.PNG

To add a schema into the Registry, you click on the big plus on the upper right corner. You receive a page where to have to input some information:
  • A name: this is the name that will be used by your flow to uniquely identify the schema involved
  • A description
  • The type: for the moment, only Avro schema is supported and proposed
  • Schema group: a name to group similar schema
  • Comptability: Sets the compatibility policy for the schema. Once set, this cannot be changed
  • To allow schema to evolve over time by creating multiple versions, select the Evolve checkbox. Thus, deselecting Evolve means that you can only have one and only one version of a schema.
2b.empty_schema.PNG

Then enter you schema text in the big square to the right. There is an automatic online schema validation running, so you will see if there are no errors. Refer to Apache AVRO documentation for an description on how to build a valid schema.
In later examples on this site, we will describe various schemas you can build to:
  • Manipulate JSON, applied to Apache log management and Tweets indexing
  • Read, write or convert records to CSV format
 

Using the registry in NiFi

To be able to use the Registry inside any NiFi flow, you have to define some Controller Services.
For each record processors you will use, you will need to select a Record Reader and a Record Writer. These are two kind of NiFi Controller Services that:
  • Read an incoming content using the given schema.
  • Write the result using a possible different schema.
Each Record Reader and Writer will reference a Schema Registry Controller Service that is the actual service pointing to the Registry.
You can configure the Controller Services from the properties screen of the processor or by going to the flow configuration screen:

3.Flow_configure.PNG

On the Controller Services tab, you click on the “plus” (+) icon and you select the type of controller you want to configure.
Let’s start first with the Hortonworks Schema Registry:

4.Adding_schema_registry.PNG

Select “HortonworksSchemaRegistry” and click “Add”. A line is inserted into the Controller Services list, click on the pencil icon at the right to configure it.

5.Configure_schema_registry.PNG

You need to enter a valid URL for you schema registry. The URL is of the form:
                http://<host where registry is running>:<port>/api/v1
By default port is 7788, but you may have changed it.

6.Parameters_schema_registry.PNG

You may consider to change the 2 remaining parameters, about the cache size and duration. Once download, a schema from the Registry is cached locally for 1 hour, before NiFi check on the Registry is a new version of it has been published.
Keep this in mind when doing tests, as your new changes in schemas may not be reflected immediately to your NiFi flow.
Then you click “Apply”.
Then, use the same “Plus” icon to add Record Reader and Writer.
Depending on you source and target, you will set different kind of Readers:
  • Avro reader
  • CSV reader
  • GROK reader – to parse any kind of structured content, like log lines
  • JSON path reader – if the input is already JSON formatted but you don’t have access to its schema, you can use this reader to populate fields of your target schema using JSON path notation.
You add properties named according to the field of your schema and as value, you use a JSON path notation like “$.user.name”. So <field> will be filled using the given JSON path.
  • JSON tree reader – to read a record formatted against the specified schema
  • Scripted reader – where the interpretation of the read data is done by the mean of a script
Each Record Reader and Writer must be able to know about which schema to use in the Registry when reading or writing data. So you specify a fixed value in the Schema Name properties or use an existing attribute of the flow file (like ${schema.name}).
You must tell also how to connect to the Registry by associating the HortonworksSchemaRegistry controller created at the previous step with the Reader or Writer via the Schema Registry property.

7.Reader_sample.PNG
The Schema Access Strategy define where to find the name of the schema associated to the data. It can be a property of the Reader configuration or an attribute inside the flow files.
In case of a Record Writer, you have a “Schema Write Strategy” that describe how to specify the schema used when writing record oriented data. It can be a single attribute or a set of attributes.
Record oriented data is written inside the flow file content. Therefore, if you need to manipulate a record oriented data with a processor that is not record oriented, you still have the possibility to extract text directly from the flow file content.