Further Readings on Custom Adapters
- How to enable guaranteed delivery
- How to implement schema discovery
- How to create an output adapter
- How to create stream based Transport and Formatter Modules
- Pre-defined Transporter and Formatter modules
- How to debug a Custom Adapter
Prerequisites
- Step 1
This is done in the custom transporter module. SAP documentation regarding the process can be found in section Enabling Guaranteed Delivery for an Input Transporter.
- Step 2
-
Add the following two commands to your
.cnxml
file:htmlCopy<Internal id="x_unixCmdDisc" label="Discovery Command" type="string" default=""$STREAMING_HOME/adapters/framework/bin/discover.sh" "$STREAMING_HOME/adapters/framework/instances/mqtt_input/adapter_config.xml""/> <Internal id="x_winCmdDisc" label="Discovery Command" type="string" default=""%STREAMING_HOME%/adapters/framework/bin/discover.bat" "%STREAMING_HOME%/adapters/framework/instances/mqtt_input/adapter_con fig.xml""/>
-
Implement sampling or non-sampling schema discovery in your custom transporter module. Consult section Implementing Schema Discovery in a Custom Adapter for more details.
For the question below, select the correct answer, and click Validate.
Which file should be edited in order to implement schema discovery?
-
- Step 3
If you have followed this tutorial to create an input adapter, only minor changes will have to be applied to convert it into an output adapter.
-
EspConnector
-
You will need to choose either
EspSubscriber
orEspMultiStreamSubscriber
instead of theEspPublisher
we have used. -
To make this change, edit the
<Module type="espconnector">
element of your.cnxml
file. Specifically, specify the chosenEspSubscriber
in the<InstanceName>
element
-
-
Formatter module
The convert(
AdapterRow in
) method will need to be edited to convert Streaming Analytics objects toStrings
. You can change it to something along the lines of the following:java Object obj = in.getData(0); in.setData(0, obj.toString()); return in;
-
Transporter module
Instead of the following in execute():
java AdapterRow row = utility.createRow(msg); utility.sendRow(row);
You will need something along the lines of the following:
java AepRecord record = (AepRecord)row.getData(0); if(record != null) { String value = record.getValues().toString(); myDataSink.send(value); }
Where
myDataSink
is the object you are outputting your data to. -
Mqtt_input.cnxml
-
Particularly the type attribute for the Adapter element should be changed to “output”
htmlCopy<Adapter type="output" …>
-
You may consider
find > replace all
from input to output. This will require changing themqtt_input
folder name in%STREAMING_HOME%/adapters/framework/instances/mqtt_input
-
Consider changing the name of this file to
mqtt_output.cnxml
-
-
Adapter_config.xml
,modulesdefine.xml
,parametersdefine.xsd
You may consider changing “Input” to “Output” in names
-
<MQTTInputTransporterParameters>
-
<Name>MQTT Input</Name>
-
-
Mqtt.ccl
This project assumes the adapter is an input adapter, you will need to make a new project. However, doing so is outside the scope of this tutorial.
-
- Step 4
- Transporter
Change Execute() function according to following:
-
For input stream based transporters:
-
Create a
ByteBuffer
object and load data into it by calling<ByteBuffer>.put<Type>(data)
-
Call
utility.sendRowsBuffer()
-
-
For output stream based transporters:
-
Call
utility.getRowsBuffer()
which will return aByteBuffer
object. -
Call
ByteBuffer.get<Type>()
to get data from the object.
-
-
Full
ByteBuffer
documentation can be found atByteBuffer
Documentation regarding custom
Transporter
modules can be found in section Building a Custom Transporter Module.- Formatter
-
Have your custom formatter class extend
com.sybase.esp.adapter.framework.module.StreamingFormatter
-
Implement the following functions:
-
The
init()
function.Prepare your formatter module to convert between data formats; for example, obtain properties from the adapter configuration file and perform any required initialization tasks.
-
The
start()
function.Perform any necessary tasks when the adapter is started.
-
The
execute()
function.Here is an example of the execute() function for a formatter that converts row-based data into stream-based:
javaCopy
public void execute() throws Exception { OutputStream output = utility.getOutputStream(); while(!utility.isStopRequested()) { AdapterRow row = utility.getRow(); if(row != null) { AepRecord record = (AepRecord)row.getData(0); String str = record.getValues().toString() + "\n"; output.write(str.getBytes()); } } } -
For a formatter that converts from stream-based data into row-based, use:
-
utility.getInputStream()
to obtain theInputStream
-
utility.createRow()
to create theAdapterRow
objects -
utility.sendRow()
to send the rows to the next module specified in the adapter configuration file
-
-
The
stop()
functionPerform any necessary tasks when the adapter is stopped.
-
The
destroy()
function.Perform clean-up actions for your formatter.
-
Documentation regarding custom Formatter modules can be found in section Building a Custom Formatter Module.
See the
$STREAMING_HOME/adapters/framework/examples/src
directory for source code for sample modules - Transporter
- Step 5
If your transporter or formatter modules don’t require very complex implementation, they may already be provided with your Streaming Analytics installation.
You can find a listing of the predefined Formatter modules in section Formatters Currently Available from SAP as well as their location.
You can find a listing of the predefined Transporter modules in section Transporters Currently Available from SAP as well as their location.
- Step 6
-
Comment out the
ATTACH INPUT ADAPTER
statement in theccl
code provided in the Appendix of this tutorial. Theccl
should simply be as follows:sqlCopy
CREATE INPUT WINDOW InputWindow1 SCHEMA ( Message string ) PRIMARY KEY ( Message ) KEEP ALL ROWS ; -
Uncomment the
<ProjectName>
and<StreamName>
elements in<EspPublisherParameters>
-
Start the project in Studio by pressing the deploy button.
-
Start the adapter by running
start.sh
and pass it the full path to youradapter_config.xml
.%STREAMING_HOME%\adapters\framework\bin\start.bat
%STREAMING_HOME%\adapters\framework\instances\mqtt_input\adapter_config.xml
-
The adapter will use the
<EspProject>
element properties set in youradapter_config.xml
file to connect to the project in studio and will use the<MQTTInputTransporterParameters>
element properties as arguments.
-
- Step 7
There are two main ways to debug custom adapters:
-
The first is to use the
Eclipse
debugger. Steps for accomplishing this are outlined in this documentation. This debugger can be used to set break points and/or step through code. -
The second is to simply use print statements.
-
For adapter run using the
ATTACH
statementUse
utility.getAdapterLogger().info(String)
in your customTransporter
andFormatter
modules to log information.Streaming Analytics log info can be found using HANA Studio.
-
In the
SAP HANA Administration Console
perspective, double click yourSYSTEM
in the Systems panel -
Navigate to the
Diagnosis Files
tab. -
Look for a
.out
file corresponding to the project you are working on. You can open the file for read in HANA Studio by double clicking it. -
utility.getAdapterLogger().info()
will log messages to<working-directory>\ projects\default.mqtt.0\logs\adapterframework.log.
-
-
For standalone adapter run without the
ATTACH
statement-
utility.getAdapterLogger().info()
can also be used -
utility.getAdapterLogger().info()
will log messages to the console in which the adapter start command was issued as well as%STREAMING_HOME%/adapters/framework/bin/frameworkadapter.log
-
System.out.println()
messages will also appear in the console.
-
-
Click the Done button below once you have completed this tutorial.
-