Creating Custom Adapter Modules
- How to create Transporter and Formatter modules
Prerequisites
- Step 1
A transporter module is the interface that interacts with external data sources by obtaining data from a data source or outputting data to a data destination.
SAP HANA Streaming Analytics supports two types of transporters: row-based and stream-based.
Row-based transporters obtain and output data in row format, such as a database transporter. These transporters work with
AdapterRow
instances, which are containers for one or more records or rows as they flow from one module (transporter, formatter, or Streaming Analytics connector) to the next. You can add multiple records as objects within a List of a singleAdapterRow
object. TheAdapterRow
has a timestamp and block flags that control how its records are communicated to and from Streaming Analytics.Stream-based transporters deal with streaming data, such as a socket transporter. These transporters work with
ByteStream
orByteBuffer
instances, which represent a continuous stream of data.In this tutorial, we will be creating a row based transporter module as it lends itself well to
MQTT
.Before we begin, you can check out the
$STREAMING_HOME/adapters/framework/examples/src
directory for source code of sample transporters.The full source code for the Transporter Module is provided in the
Appendix
SectionFirst, we will set up the Custom Adapter Project.
- Start by opening your IDE and creating a new java project called
mqtt-input
- Create a package
com.sap
- Create a Java class called
MqttTransporter.java
- Create a Java class called
MqttCB.java
. The code for this file is provided in the appendix section of this tutorial. - We will now add a number of
.jar
dependencies to our class path:- Java PAHO library
- The other dependencies will be from the Adapter Toolkit and can be found in
%STREAMING_HOME%\adapters\framework\libj
Commons-configuration-<version>.jar
Streaming-client.jar
Streaming-system.jar
Streaming-adapter-framework.jar
Then, have
MqttTransporter
extend theTransporter
class.We will start by defining a number of instance variables, which will be assigned values in the
init()
method (more on that later).MqttClient client
;String topic
;MqttCB cb
;
Having done this, we will need to implement a number of abstract methods in Transporter. We will cover the methods in the same order they will be called by the adapter framework.
The first abstract method we will implement is
void init()
. The purpose of this method is to prepare the module for the actions it is responsible for performing. We will use this method to initialize various global variables as well as grab the user defined parameters for the adapter.-
First, we want to get the Topic parameter value. This value is set by the streaming developer when configuring the adapter in Studio.
We can get the value of Topic by calling:
javaCopyutility.getParameters().getString("MQTTInputTransporterParamet ers.Topic");
The
MQTTInputTransporterParameters
prefix is defined in our adapter configuration file. -
Next, create an
MqttClient
. The constructor takesserverURI
- the address of the server to connect to, specified as aURI
andclientId
- a client identifier that is unique on the server being connected to.We will use the
MosquittoServerAddress
defined by the streaming developer and a unique stringjavaCopyclient = new MqttClient(utility.getParameters().getString("MQTTInputTranspo rterParameters.MosquittoServerAddress"), "MQTT_ESP");
-
Connect the
MqttClient
withclient.connect();
-
Subscribe the
MqttClient
to the topic withclient.subscribe(topic);
-
Instantiate an
MqttCB
object and assign it to ourMqttClient
.MqttCB
is a customMqttCallback
class written for this adapter. The code for it is provided in the appendix section of this tutorial.javaCopycb = new MqttCB(); client.setCallback(cb);
The second abstract method we have to implement is
void start()
. The purpose of this method is to perform any necessary tasks when the adapter is started. For our purposes, it is not necessary to
include any instructions in this method so we will leave it empty.The third and most important method to implement is
void execute()
. When the adapter framework calls this method, it is expected to run continuously until the adapter is requested to stop or until the adapter completes its work.-
As such, we will wrap our functionality in a loop that iterates until the adapter has been issued a stop request. Following this loop – and ending the method – is an instruction to change the adapter
RunState
to done.javaCopywhile(!utility.isStopRequested()) { //steps b-d } utility.setAdapterState(RunState.RS_DONE);
-
While the adapter has not been requested to stop, we will continuously check for new
MQTT
messages. ThetakeNewMsg()
method will returnnull
if there are no new messages, or take the message out of the message queue and return it. When a new message is received, we will process it within theif
statement.javaCopyString msg; if ((msg = cb.takeNewMsg()) != null){ //steps c-d }
-
Once we have received a message, we need to create an
AdapterRow
and send it to ourFormatter
module.javaCopyAdapterRow row = utility.createRow(cb.getRcvdMsg()); utility.sendRow(row);
The fourth overridden method is
void stop()
. Its purpose is to perform any necessary tasks when the adapter is stopped. We will use this method to disconnect ourMqttClient
by issuingjavaCopyclient.disconnect();
The fifth and last method is
void destroy()
. Its purpose is to perform any cleanup tasks for your input or output transporter. For our purposes, it is not necessary to include any instructions in this method so we will leave it empty. - Start by opening your IDE and creating a new java project called
- Step 2
A formatter module converts between the data format of the transporter module and Streaming Analytics.
SAP HANA Streaming Analytics supports two types of formatters: row-based and stream-based formatters.
Row-based formatters obtain and output data in row format. They work with
AdapterRow
instances, which are containers for one or more records or rows as they flow from one module (transporter, formatter, or Streaming Analytics connector) to the next. You can add multiple records as objects within a List of a singleAdapterRow
object. TheAdapterRow
has a timestamp and block flags that control how its records are communicated to and from Streaming Analytics.Stream-based formatters deal with streaming data. These formatters work with
ByteStream
instances, which represent a continuous stream of data. Full documentation on writing a Formatter module can be found in the section Building a Custom Formatter Module.Before we begin, you can check out the
$STREAMING_HOME/adapters/framework/examples/src
directory for source code of sample formatters.The full source code for the Formatter Module is provided in the
Appendix
Section-
Start by opening your IDE and navigating to the java project called
mqtt-input
that we created for our Transporter Module (complete the pre-requisite tutorial if you have not yet completed this). Note that it is also valid to create a new project and create your Formatter module separately but for simplicity, we will be creating them in the same project. -
Create a Java class called
Mqttformatter.java
-
Have
MqttFormatter
extend theRowFormatter
class. Note that it is also possible to create a custom Formatter module that extends theStreamingFormatter
class. For the purposes of this tutorial, we will be extended theRowFormatter
class. Similar to our Transporter module, we will have to implement a number of abstract methods (only 3 this time though).
The purpose of the
void init()
method is to prepare the formatter module to convert between data formats. For example, obtain properties from the adapter configuration file and perform any required initialization tasks. Our Formatter module is very simple and does not require any initialization instructions.The second method is
AdapterRow convert(AdapterRow in)
.-
First, we will test whether the received
AdapterRow
is non-empty. If this is the case, we will simply send theAdapterRow
back.javaCopyif (in.getDataList().isEmpty()){ return in; }
-
If we have reached this point in the method, the received
AdapterRow
is non-empty. Our particular Formatter will convert aMQTT
message (String) to something usable by Streaming Analytics - anAepRecord
. First, we will create the desiredAepRecord.
-
Create a new
AepRecord
.javaCopyAepRecord tempRecord = new AepRecord();
-
Set the operation of the record.
javaCopytempRecord.setOpCode(Operation.INSERT);
-
Get the data list inside the
AepRecord
, and add the first object of the data list in theAdapterRow
.javaCopytempRecord.getValues().add(in.getData());
-
Now that we have the
AepRecord
and would like to send it to Streaming Analytics, we will convert the receivedAdapterRow
by replacing its data list value at index 0 with the newAepRecord
-tempRecord
.javaCopyin.setData(0, tempRecord);
We have now finished converting the
MQTT String
to anAepRecord
so we will return it.javaCopyreturn in;
The last method to implement is
void destroy()
which is intended for performing clean-up actions for the formatter. Our formatter does not require any destroy instructions. -
- Step 3
Now that we have written our
Transporter
andFormatter
modules, we need to package them in a.jar
file. If you have been following this tutorial, you should have a single java project containingMqttTransporter.java
,MqttFormatter.java
andMqttCB.java
. Build a.jar
containing all of these files. The process for doing so varies withIDE
so, if you have questions, it is best to consult yourIDE's
help pages. Before building the.jar
file, verify that you will be building it with the sameJRE
version as included in your Streaming Analytics install. Name the newly created.jar
file"mqtt-input.jar"
.For the question below, select all of the correct answers, and click Validate.
Which of the following files are packaged into the mqtt-input.jar file:
- Step 4
MqttTransporter.java
javaCopypackage com.sap; import java.io.IOException; import javax.security.auth.login.LoginException; import com.sybase.esp.sdk.exception.EntityNotFoundException; import com.sybase.esp.sdk.exception.ServerErrorException; import com.sybase.esp.adapter.framework.utilities.*; import com.sybase.esp.adapter.framework.RunState; import com.sybase.esp.adapter.framework.module.Transporter; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; public class MqttTransporter extends Transporter{ MqttClient client; String topic; MqttCB cb; @Override public void init() throws MqttException { utility.getAdapterLogger().info("MqttTransporter is initializing:"+utility.getParameters().getString("MQTTInputTransporterParameters.Topic") + "," + utility.getParameters().getString("MQTTInputTransporterParameters.MosquittoServerAddress")); topic = utility.getParameters().getString("MQTTInputTransporterParameters.Tppic"); client = new MqttClient(utility.getParameters().getString("MQTTInputTransporterParameters.MosquittoServerAddress"), "MQTT_ESP"); client.connect(); client.subscribe(topic); cb = new MqttCB(); client.setCallback(cb); } @Override public void start() throws IOException { utility.getAdapterLogger().info("MqttTransporter is starting."); } int count = 0; @Override public void execute() throws IOException, EntityNotFoundException, LoginException, ServerErrorException, InterruptedException { while(utility.isStopRequested() == false){ String msg; if ((msg = cb.takeNewMsg()) != null){ utility.getAdapterLogger().info("Got message: " + msg); AdapterRow row = utility.createRow(msg); utility.sendRow(row); utility.getAdapterLogger().info("Sent row to formatter"); } } utility.setAdapterState(RunState.RS_DONE); } @Override public void stop() throws MqttException { utility.getAdapterLogger().info("MqttTransporter is stopping"); client.disconnect(); } @Override public void destroy() { utility.getAdapterLogger().info("MqttTransporter is destroying"); } }
MqttCB.java
javaCopypackage com.sap; import java.util.LinkedList; import java.util.Queue; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttCB implements MqttCallback{ private final Queue<String> msgs = new LinkedList(); @Override public void connectionLost(Throwable arg0) { System.out.print("Connection lost..."); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { System.out.println("Delivery complete."); } public String takeNewMsg(){ return msgs.poll(); } @Override public void messageArrived(final String topic, final MqttMessage msg) { System.out.println("Received message: " + msg); msgs.add(msg.toString()); } }
MqttFormatter.java
javaCopypackage com.sap; import com.sybase.esp.adapter.framework.AepRecord; import com.sybase.esp.adapter.framework.module.RowFormatter; import com.sybase.esp.adapter.framework.utilities.AdapterRow; import com.sybase.esp.sdk.Stream.Operation; public class MqttFormatter extends RowFormatter { @Override public void init() { utility.getAdapterLogger().info("MqttFormatter is initializing"); } @Override public AdapterRow convert(AdapterRow in ) { utility.getAdapterLogger().info("MqttFormatter is converting"); if ( in .getDataList().isEmpty()) { return in; } AepRecord tempRecord = new AepRecord(); tempRecord.setOpcode(Operation.INSERT); tempRecord.getValues().add( in .getData()); in .setData(0, tempRecord); utility.getAdapterLogger().info("MqttFormatter is done converting"); return in; } @Override public void destroy() { utility.getAdapterLogger().info("MqttFormatter is destroying"); } }