Further Readings on Custom Adapters
- How to enable guaranteed delivery
- How to implement schema discovery
- How to create an output adapter
- How to create stream based Transport and Formatter Modules
- Pre-defined Transporter and Formatter modules
- How to debug a Custom Adapter
Prerequisites
- Step 1
This is done in the custom transporter module. SAP documentation regarding the process can be found in section Enabling Guaranteed Delivery for an Input Transporter.
- Step 2
-
Add the following two commands to your
.cnxmlfile:htmlCopy<Internal id="x_unixCmdDisc" label="Discovery Command" type="string" default=""$STREAMING_HOME/adapters/framework/bin/discover.sh" "$STREAMING_HOME/adapters/framework/instances/mqtt_input/adapter_config.xml""/> <Internal id="x_winCmdDisc" label="Discovery Command" type="string" default=""%STREAMING_HOME%/adapters/framework/bin/discover.bat" "%STREAMING_HOME%/adapters/framework/instances/mqtt_input/adapter_con fig.xml""/> -
Implement sampling or non-sampling schema discovery in your custom transporter module. Consult section Implementing Schema Discovery in a Custom Adapter for more details.
For the question below, select the correct answer, and click Validate.
Which file should be edited in order to implement schema discovery?
-
- Step 3
If you have followed this tutorial to create an input adapter, only minor changes will have to be applied to convert it into an output adapter.
-
EspConnector-
You will need to choose either
EspSubscriberorEspMultiStreamSubscriberinstead of theEspPublisherwe have used. -
To make this change, edit the
<Module type="espconnector">element of your.cnxmlfile. Specifically, specify the chosenEspSubscriberin the<InstanceName>element
-
-
Formatter moduleThe convert(
AdapterRow in) method will need to be edited to convert Streaming Analytics objects toStrings. You can change it to something along the lines of the following:java Object obj = in.getData(0); in.setData(0, obj.toString()); return in; -
Transporter moduleInstead of the following in execute():
java AdapterRow row = utility.createRow(msg); utility.sendRow(row);You will need something along the lines of the following:
java AepRecord record = (AepRecord)row.getData(0); if(record != null) { String value = record.getValues().toString(); myDataSink.send(value); }Where
myDataSinkis the object you are outputting your data to. -
Mqtt_input.cnxml-
Particularly the type attribute for the Adapter element should be changed to “output”
htmlCopy<Adapter type="output" …> -
You may consider
find > replace allfrom input to output. This will require changing themqtt_inputfolder name in%STREAMING_HOME%/adapters/framework/instances/mqtt_input -
Consider changing the name of this file to
mqtt_output.cnxml
-
-
Adapter_config.xml,modulesdefine.xml,parametersdefine.xsdYou may consider changing “Input” to “Output” in names
-
<MQTTInputTransporterParameters> -
<Name>MQTT Input</Name>
-
-
Mqtt.cclThis project assumes the adapter is an input adapter, you will need to make a new project. However, doing so is outside the scope of this tutorial.
-
- Step 4
- Transporter
Change Execute() function according to following:
-
For input stream based transporters:
-
Create a
ByteBufferobject and load data into it by calling<ByteBuffer>.put<Type>(data) -
Call
utility.sendRowsBuffer()
-
-
For output stream based transporters:
-
Call
utility.getRowsBuffer()which will return aByteBufferobject. -
Call
ByteBuffer.get<Type>()to get data from the object.
-
-
Full
ByteBufferdocumentation can be found atByteBufferDocumentation regarding custom
Transportermodules can be found in section Building a Custom Transporter Module.- Formatter
-
Have your custom formatter class extend
com.sybase.esp.adapter.framework.module.StreamingFormatter -
Implement the following functions:
-
The
init()function.Prepare your formatter module to convert between data formats; for example, obtain properties from the adapter configuration file and perform any required initialization tasks.
-
The
start()function.Perform any necessary tasks when the adapter is started.
-
The
execute()function.Here is an example of the execute() function for a formatter that converts row-based data into stream-based:
javaCopy
public void execute() throws Exception { OutputStream output = utility.getOutputStream(); while(!utility.isStopRequested()) { AdapterRow row = utility.getRow(); if(row != null) { AepRecord record = (AepRecord)row.getData(0); String str = record.getValues().toString() + "\n"; output.write(str.getBytes()); } } } -
For a formatter that converts from stream-based data into row-based, use:
-
utility.getInputStream()to obtain theInputStream -
utility.createRow()to create theAdapterRowobjects -
utility.sendRow()to send the rows to the next module specified in the adapter configuration file
-
-
The
stop()functionPerform any necessary tasks when the adapter is stopped.
-
The
destroy()function.Perform clean-up actions for your formatter.
-
Documentation regarding custom Formatter modules can be found in section Building a Custom Formatter Module.
See the
$STREAMING_HOME/adapters/framework/examples/srcdirectory for source code for sample modules - Transporter
- Step 5
If your transporter or formatter modules don’t require very complex implementation, they may already be provided with your Streaming Analytics installation.
You can find a listing of the predefined Formatter modules in section Formatters Currently Available from SAP as well as their location.
You can find a listing of the predefined Transporter modules in section Transporters Currently Available from SAP as well as their location.
- Step 6
-
Comment out the
ATTACH INPUT ADAPTERstatement in thecclcode provided in the Appendix of this tutorial. Thecclshould simply be as follows:sqlCopy
CREATE INPUT WINDOW InputWindow1 SCHEMA ( Message string ) PRIMARY KEY ( Message ) KEEP ALL ROWS ; -
Uncomment the
<ProjectName>and<StreamName>elements in<EspPublisherParameters> -
Start the project in Studio by pressing the deploy button.

-
Start the adapter by running
start.shand pass it the full path to youradapter_config.xml.%STREAMING_HOME%\adapters\framework\bin\start.bat%STREAMING_HOME%\adapters\framework\instances\mqtt_input\adapter_config.xml -
The adapter will use the
<EspProject>element properties set in youradapter_config.xmlfile to connect to the project in studio and will use the<MQTTInputTransporterParameters>element properties as arguments.
-
- Step 7
There are two main ways to debug custom adapters:
-
The first is to use the
Eclipsedebugger. Steps for accomplishing this are outlined in this documentation. This debugger can be used to set break points and/or step through code. -
The second is to simply use print statements.
-
For adapter run using the
ATTACHstatementUse
utility.getAdapterLogger().info(String)in your customTransporterandFormattermodules to log information.Streaming Analytics log info can be found using HANA Studio.
-
In the
SAP HANA Administration Consoleperspective, double click yourSYSTEMin the Systems panel -
Navigate to the
Diagnosis Filestab. -
Look for a
.outfile corresponding to the project you are working on. You can open the file for read in HANA Studio by double clicking it. -
utility.getAdapterLogger().info()will log messages to<working-directory>\ projects\default.mqtt.0\logs\adapterframework.log.
-
-
For standalone adapter run without the
ATTACHstatement-
utility.getAdapterLogger().info()can also be used -
utility.getAdapterLogger().info()will log messages to the console in which the adapter start command was issued as well as%STREAMING_HOME%/adapters/framework/bin/frameworkadapter.log -
System.out.println()messages will also appear in the console.
-
-
Click the Done button below once you have completed this tutorial.
-