Creating Custom Adapter Modules
- How to create Transporter and Formatter modules
Prerequisites
- Step 1
A transporter module is the interface that interacts with external data sources by obtaining data from a data source or outputting data to a data destination.
SAP HANA Streaming Analytics supports two types of transporters: row-based and stream-based.
Row-based transporters obtain and output data in row format, such as a database transporter. These transporters work with
AdapterRowinstances, which are containers for one or more records or rows as they flow from one module (transporter, formatter, or Streaming Analytics connector) to the next. You can add multiple records as objects within a List of a singleAdapterRowobject. TheAdapterRowhas a timestamp and block flags that control how its records are communicated to and from Streaming Analytics.Stream-based transporters deal with streaming data, such as a socket transporter. These transporters work with
ByteStreamorByteBufferinstances, which represent a continuous stream of data.In this tutorial, we will be creating a row based transporter module as it lends itself well to
MQTT.Before we begin, you can check out the
$STREAMING_HOME/adapters/framework/examples/srcdirectory for source code of sample transporters.The full source code for the Transporter Module is provided in the
AppendixSectionFirst, we will set up the Custom Adapter Project.
- Start by opening your IDE and creating a new java project called
mqtt-input - Create a package
com.sap - Create a Java class called
MqttTransporter.java - Create a Java class called
MqttCB.java. The code for this file is provided in the appendix section of this tutorial. - We will now add a number of
.jardependencies to our class path:- Java PAHO library
- The other dependencies will be from the Adapter Toolkit and can be found in
%STREAMING_HOME%\adapters\framework\libjCommons-configuration-<version>.jarStreaming-client.jarStreaming-system.jarStreaming-adapter-framework.jar
Then, have
MqttTransporterextend theTransporterclass.We will start by defining a number of instance variables, which will be assigned values in the
init()method (more on that later).MqttClient client;String topic;MqttCB cb;
Having done this, we will need to implement a number of abstract methods in Transporter. We will cover the methods in the same order they will be called by the adapter framework.
The first abstract method we will implement is
void init(). The purpose of this method is to prepare the module for the actions it is responsible for performing. We will use this method to initialize various global variables as well as grab the user defined parameters for the adapter.-
First, we want to get the Topic parameter value. This value is set by the streaming developer when configuring the adapter in Studio.
We can get the value of Topic by calling:
javaCopyutility.getParameters().getString("MQTTInputTransporterParamet ers.Topic");The
MQTTInputTransporterParametersprefix is defined in our adapter configuration file. -
Next, create an
MqttClient. The constructor takesserverURI- the address of the server to connect to, specified as aURIandclientId- a client identifier that is unique on the server being connected to.We will use the
MosquittoServerAddressdefined by the streaming developer and a unique stringjavaCopyclient = new MqttClient(utility.getParameters().getString("MQTTInputTranspo rterParameters.MosquittoServerAddress"), "MQTT_ESP"); -
Connect the
MqttClientwithclient.connect(); -
Subscribe the
MqttClientto the topic withclient.subscribe(topic); -
Instantiate an
MqttCBobject and assign it to ourMqttClient.MqttCBis a customMqttCallbackclass written for this adapter. The code for it is provided in the appendix section of this tutorial.javaCopycb = new MqttCB(); client.setCallback(cb);
The second abstract method we have to implement is
void start(). The purpose of this method is to perform any necessary tasks when the adapter is started. For our purposes, it is not necessary to
include any instructions in this method so we will leave it empty.The third and most important method to implement is
void execute(). When the adapter framework calls this method, it is expected to run continuously until the adapter is requested to stop or until the adapter completes its work.-
As such, we will wrap our functionality in a loop that iterates until the adapter has been issued a stop request. Following this loop – and ending the method – is an instruction to change the adapter
RunStateto done.javaCopywhile(!utility.isStopRequested()) { //steps b-d } utility.setAdapterState(RunState.RS_DONE); -
While the adapter has not been requested to stop, we will continuously check for new
MQTTmessages. ThetakeNewMsg()method will returnnullif there are no new messages, or take the message out of the message queue and return it. When a new message is received, we will process it within theifstatement.javaCopyString msg; if ((msg = cb.takeNewMsg()) != null){ //steps c-d } -
Once we have received a message, we need to create an
AdapterRowand send it to ourFormattermodule.javaCopyAdapterRow row = utility.createRow(cb.getRcvdMsg()); utility.sendRow(row);
The fourth overridden method is
void stop(). Its purpose is to perform any necessary tasks when the adapter is stopped. We will use this method to disconnect ourMqttClientby issuingjavaCopyclient.disconnect();The fifth and last method is
void destroy(). Its purpose is to perform any cleanup tasks for your input or output transporter. For our purposes, it is not necessary to include any instructions in this method so we will leave it empty. - Start by opening your IDE and creating a new java project called
- Step 2
A formatter module converts between the data format of the transporter module and Streaming Analytics.
SAP HANA Streaming Analytics supports two types of formatters: row-based and stream-based formatters.
Row-based formatters obtain and output data in row format. They work with
AdapterRowinstances, which are containers for one or more records or rows as they flow from one module (transporter, formatter, or Streaming Analytics connector) to the next. You can add multiple records as objects within a List of a singleAdapterRowobject. TheAdapterRowhas a timestamp and block flags that control how its records are communicated to and from Streaming Analytics.Stream-based formatters deal with streaming data. These formatters work with
ByteStreaminstances, which represent a continuous stream of data. Full documentation on writing a Formatter module can be found in the section Building a Custom Formatter Module.Before we begin, you can check out the
$STREAMING_HOME/adapters/framework/examples/srcdirectory for source code of sample formatters.The full source code for the Formatter Module is provided in the
AppendixSection-
Start by opening your IDE and navigating to the java project called
mqtt-inputthat we created for our Transporter Module (complete the pre-requisite tutorial if you have not yet completed this). Note that it is also valid to create a new project and create your Formatter module separately but for simplicity, we will be creating them in the same project. -
Create a Java class called
Mqttformatter.java -
Have
MqttFormatterextend theRowFormatterclass. Note that it is also possible to create a custom Formatter module that extends theStreamingFormatterclass. For the purposes of this tutorial, we will be extended theRowFormatterclass. Similar to our Transporter module, we will have to implement a number of abstract methods (only 3 this time though).
The purpose of the
void init()method is to prepare the formatter module to convert between data formats. For example, obtain properties from the adapter configuration file and perform any required initialization tasks. Our Formatter module is very simple and does not require any initialization instructions.The second method is
AdapterRow convert(AdapterRow in).-
First, we will test whether the received
AdapterRowis non-empty. If this is the case, we will simply send theAdapterRowback.javaCopyif (in.getDataList().isEmpty()){ return in; } -
If we have reached this point in the method, the received
AdapterRowis non-empty. Our particular Formatter will convert aMQTTmessage (String) to something usable by Streaming Analytics - anAepRecord. First, we will create the desiredAepRecord.-
Create a new
AepRecord.javaCopyAepRecord tempRecord = new AepRecord(); -
Set the operation of the record.
javaCopytempRecord.setOpCode(Operation.INSERT); -
Get the data list inside the
AepRecord, and add the first object of the data list in theAdapterRow.javaCopytempRecord.getValues().add(in.getData());
-
Now that we have the
AepRecordand would like to send it to Streaming Analytics, we will convert the receivedAdapterRowby replacing its data list value at index 0 with the newAepRecord-tempRecord.javaCopyin.setData(0, tempRecord);We have now finished converting the
MQTT Stringto anAepRecordso we will return it.javaCopyreturn in;The last method to implement is
void destroy()which is intended for performing clean-up actions for the formatter. Our formatter does not require any destroy instructions. -
- Step 3
Now that we have written our
TransporterandFormattermodules, we need to package them in a.jarfile. If you have been following this tutorial, you should have a single java project containingMqttTransporter.java,MqttFormatter.javaandMqttCB.java. Build a.jarcontaining all of these files. The process for doing so varies withIDEso, if you have questions, it is best to consult yourIDE'shelp pages. Before building the.jarfile, verify that you will be building it with the sameJREversion as included in your Streaming Analytics install. Name the newly created.jarfile"mqtt-input.jar".For the question below, select all of the correct answers, and click Validate.
Which of the following files are packaged into the mqtt-input.jar file:
- Step 4
MqttTransporter.javajavaCopypackage com.sap; import java.io.IOException; import javax.security.auth.login.LoginException; import com.sybase.esp.sdk.exception.EntityNotFoundException; import com.sybase.esp.sdk.exception.ServerErrorException; import com.sybase.esp.adapter.framework.utilities.*; import com.sybase.esp.adapter.framework.RunState; import com.sybase.esp.adapter.framework.module.Transporter; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; public class MqttTransporter extends Transporter{ MqttClient client; String topic; MqttCB cb; @Override public void init() throws MqttException { utility.getAdapterLogger().info("MqttTransporter is initializing:"+utility.getParameters().getString("MQTTInputTransporterParameters.Topic") + "," + utility.getParameters().getString("MQTTInputTransporterParameters.MosquittoServerAddress")); topic = utility.getParameters().getString("MQTTInputTransporterParameters.Tppic"); client = new MqttClient(utility.getParameters().getString("MQTTInputTransporterParameters.MosquittoServerAddress"), "MQTT_ESP"); client.connect(); client.subscribe(topic); cb = new MqttCB(); client.setCallback(cb); } @Override public void start() throws IOException { utility.getAdapterLogger().info("MqttTransporter is starting."); } int count = 0; @Override public void execute() throws IOException, EntityNotFoundException, LoginException, ServerErrorException, InterruptedException { while(utility.isStopRequested() == false){ String msg; if ((msg = cb.takeNewMsg()) != null){ utility.getAdapterLogger().info("Got message: " + msg); AdapterRow row = utility.createRow(msg); utility.sendRow(row); utility.getAdapterLogger().info("Sent row to formatter"); } } utility.setAdapterState(RunState.RS_DONE); } @Override public void stop() throws MqttException { utility.getAdapterLogger().info("MqttTransporter is stopping"); client.disconnect(); } @Override public void destroy() { utility.getAdapterLogger().info("MqttTransporter is destroying"); } }MqttCB.javajavaCopypackage com.sap; import java.util.LinkedList; import java.util.Queue; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttCB implements MqttCallback{ private final Queue<String> msgs = new LinkedList(); @Override public void connectionLost(Throwable arg0) { System.out.print("Connection lost..."); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { System.out.println("Delivery complete."); } public String takeNewMsg(){ return msgs.poll(); } @Override public void messageArrived(final String topic, final MqttMessage msg) { System.out.println("Received message: " + msg); msgs.add(msg.toString()); } }MqttFormatter.javajavaCopypackage com.sap; import com.sybase.esp.adapter.framework.AepRecord; import com.sybase.esp.adapter.framework.module.RowFormatter; import com.sybase.esp.adapter.framework.utilities.AdapterRow; import com.sybase.esp.sdk.Stream.Operation; public class MqttFormatter extends RowFormatter { @Override public void init() { utility.getAdapterLogger().info("MqttFormatter is initializing"); } @Override public AdapterRow convert(AdapterRow in ) { utility.getAdapterLogger().info("MqttFormatter is converting"); if ( in .getDataList().isEmpty()) { return in; } AepRecord tempRecord = new AepRecord(); tempRecord.setOpcode(Operation.INSERT); tempRecord.getValues().add( in .getData()); in .setData(0, tempRecord); utility.getAdapterLogger().info("MqttFormatter is done converting"); return in; } @Override public void destroy() { utility.getAdapterLogger().info("MqttFormatter is destroying"); } }