- Developers
- Tutorials
- Further Readings on Custom Adapters
Further Readings on Custom Adapters
-
Join the conversation on Facebook
-
Join the conversation on Twitter
-
Subscribe to the YouTube Channel
-
Join the conversation on LinkedIn
-
View our projects on GitHub
-
Share via email
Further Readings on Custom Adapters
You will learn
- How to enable guaranteed delivery
- How to implement schema discovery
- How to create an output adapter
- How to create stream based Transport and Formatter Modules
- Pre-defined Transporter and Formatter modules
- How to debug a Custom Adapter
Prerequisites
This is done in the custom transporter module. SAP documentation regarding the process can be found in section Enabling Guaranteed Delivery for an Input Transporter.
-
Add the following two commands to your
.cnxml
file:<Internal id="x_unixCmdDisc" label="Discovery Command" type="string" default=""$STREAMING_HOME/adapters/framework/bin/discover.sh" "$STREAMING_HOME/adapters/framework/instances/mqtt_input/adapter_config.xml""/> <Internal id="x_winCmdDisc" label="Discovery Command" type="string" default=""%STREAMING_HOME%/adapters/framework/bin/discover.bat" "%STREAMING_HOME%/adapters/framework/instances/mqtt_input/adapter_con fig.xml""/>
-
Implement sampling or non-sampling schema discovery in your custom transporter module. Consult section Implementing Schema Discovery in a Custom Adapter for more details.
For the question below, select the correct answer, and click Validate.
If you have followed this tutorial to create an input adapter, only minor changes will have to be applied to convert it into an output adapter.
-
EspConnector
-
You will need to choose either
EspSubscriber
orEspMultiStreamSubscriber
instead of theEspPublisher
we have used. -
To make this change, edit the
<Module type="espconnector">
element of your.cnxml
file. Specifically, specify the chosenEspSubscriber
in the<InstanceName>
element
-
-
Formatter module
The convert(
AdapterRow in
) method will need to be edited to convert Streaming Analytics objects toStrings
. You can change it to something along the lines of the following:java Object obj = in.getData(0); in.setData(0, obj.toString()); return in;
-
Transporter module
Instead of the following in execute():
java AdapterRow row = utility.createRow(msg); utility.sendRow(row);
You will need something along the lines of the following:
java AepRecord record = (AepRecord)row.getData(0); if(record != null) { String value = record.getValues().toString(); myDataSink.send(value); }
Where
myDataSink
is the object you are outputting your data to. -
Mqtt_input.cnxml
-
Particularly the type attribute for the Adapter element should be changed to “output”
<Adapter type="output" …>
-
You may consider
find > replace all
from input to output. This will require changing themqtt_input
folder name in%STREAMING_HOME%/adapters/framework/instances/mqtt_input
-
Consider changing the name of this file to
mqtt_output.cnxml
-
-
Adapter_config.xml
,modulesdefine.xml
,parametersdefine.xsd
You may consider changing “Input” to “Output” in names
-
<MQTTInputTransporterParameters>
-
<Name>MQTT Input</Name>
-
-
Mqtt.ccl
This project assumes the adapter is an input adapter, you will need to make a new project. However, doing so is outside the scope of this tutorial.
- Transporter
Change Execute() function according to following:
-
For input stream based transporters:
-
Create a
ByteBuffer
object and load data into it by calling<ByteBuffer>.put<Type>(data)
-
Call
utility.sendRowsBuffer()
-
-
For output stream based transporters:
-
Call
utility.getRowsBuffer()
which will return aByteBuffer
object. -
Call
ByteBuffer.get<Type>()
to get data from the object.
-
-
Full ByteBuffer
documentation can be found at ByteBuffer
Documentation regarding custom Transporter
modules can be found in section Building a Custom Transporter Module.
- Formatter
-
Have your custom formatter class extend
com.sybase.esp.adapter.framework.module.StreamingFormatter
-
Implement the following functions:
-
The
init()
function.Prepare your formatter module to convert between data formats; for example, obtain properties from the adapter configuration file and perform any required initialization tasks.
-
The
start()
function.Perform any necessary tasks when the adapter is started.
-
The
execute()
function.Here is an example of the execute() function for a formatter that converts row-based data into stream-based:
public void execute() throws Exception { OutputStream output = utility.getOutputStream(); while(!utility.isStopRequested()) { AdapterRow row = utility.getRow(); if(row != null) { AepRecord record = (AepRecord)row.getData(0); String str = record.getValues().toString() + "\n"; output.write(str.getBytes()); } } } -
For a formatter that converts from stream-based data into row-based, use:
-
utility.getInputStream()
to obtain theInputStream
-
utility.createRow()
to create theAdapterRow
objects -
utility.sendRow()
to send the rows to the next module specified in the adapter configuration file
-
-
The
stop()
functionPerform any necessary tasks when the adapter is stopped.
-
The
destroy()
function.Perform clean-up actions for your formatter.
-
Documentation regarding custom Formatter modules can be found in section Building a Custom Formatter Module.
See the $STREAMING_HOME/adapters/framework/examples/src
directory for source code for sample modules
If your transporter or formatter modules don’t require very complex implementation, they may already be provided with your Streaming Analytics installation.
You can find a listing of the predefined Formatter modules in section Formatters Currently Available from SAP as well as their location.
You can find a listing of the predefined Transporter modules in section Transporters Currently Available from SAP as well as their location.
-
Comment out the
ATTACH INPUT ADAPTER
statement in theccl
code provided in the Appendix of this tutorial. Theccl
should simply be as follows:
CREATE INPUT WINDOW InputWindow1 SCHEMA ( Message string ) PRIMARY KEY ( Message ) KEEP ALL ROWS ; -
Uncomment the
<ProjectName>
and<StreamName>
elements in<EspPublisherParameters>
-
Start the project in Studio by pressing the deploy button.
-
Start the adapter by running
start.sh
and pass it the full path to youradapter_config.xml
.%STREAMING_HOME%\adapters\framework\bin\start.bat
%STREAMING_HOME%\adapters\framework\instances\mqtt_input\adapter_config.xml
-
The adapter will use the
<EspProject>
element properties set in youradapter_config.xml
file to connect to the project in studio and will use the<MQTTInputTransporterParameters>
element properties as arguments.
There are two main ways to debug custom adapters:
-
The first is to use the
Eclipse
debugger. Steps for accomplishing this are outlined in this documentation. This debugger can be used to set break points and/or step through code. -
The second is to simply use print statements.
-
For adapter run using the
ATTACH
statementUse
utility.getAdapterLogger().info(String)
in your customTransporter
andFormatter
modules to log information.Streaming Analytics log info can be found using HANA Studio.
-
In the
SAP HANA Administration Console
perspective, double click yourSYSTEM
in the Systems panel -
Navigate to the
Diagnosis Files
tab. -
Look for a
.out
file corresponding to the project you are working on. You can open the file for read in HANA Studio by double clicking it. -
utility.getAdapterLogger().info()
will log messages to<working-directory>\ projects\default.mqtt.0\logs\adapterframework.log.
-
-
For standalone adapter run without the
ATTACH
statement-
utility.getAdapterLogger().info()
can also be used -
utility.getAdapterLogger().info()
will log messages to the console in which the adapter start command was issued as well as%STREAMING_HOME%/adapters/framework/bin/frameworkadapter.log
-
System.out.println()
messages will also appear in the console.
-
-
Click the Done button below once you have completed this tutorial.