Create App for Sending and Receiving Messages Using a Java-Based Client
- How to create a basic messaging client application with Java
- How to deploy this application to the SAP Event Mesh and test it
Prerequisites
- Installed Java 8: Java download
- Installed Maven 3.x: Maven download
- Installed Git: Git download
- Follow Create Instance of SAP Event Mesh Service and Create Queues and Queue Subscriptions for Event Mesh to create a queue in an instance of Event Mesh.
- Follow Install the Cloud Foundry Command Line Interface(CLI) to download and work with CLI.
- Installed IDE of choice (e.g. Visual Studio with installed Java language support plugin)
- Step 1
To download and install the samples, just clone the repository via: git clone
The git has two applications for you to try out.
Application Scenario Description emjapi-samples-jms-p2p (Point to Point communication) This sample demonstrates how messages can be sent and received from an application deployed on SAP Business Technology Platform. Therefore the messaging sample provides a Spring Boot based application which provides REST endpoints for sending and receiving messages via a queue (or queues) of choice. The REST endpoints are provided via the MessagingServiceRestController
.|
Application Scenario Description emjapi-samples-jms-pubsub (Publish & Subscribe) This sample demonstrates how messages can be sent and received to a topic from an application deployed on SAP Business Technology Platform. This messaging sample provides a Spring Boot based application which provides REST endpoints for sending and receiving messages via a topic of choice. It also offers a REST endpoint to receive a message from a queue. The REST endpoints are provided via the MessagingServiceRestController
.|
Download both the scenarios to your local IDE. After downloading, the project structure will look like this :
The downloaded project has all the dependencies and required client files for both scenarios mentioned above.
The Event Mesh service descriptor is/config/em-config-default.json
. Detailed information on different parameters of the
descriptor can be found in Create Instance of SAP Event Mesh.Replace the content in the .json file with the content of the descriptor in the service instance you have created.
- Step 2
To be able to build, deploy and run the Java message client, ensure the following dependencies are mentioned in the pom.xml.
- enterprise-messaging spring service connector that provides the
MessagingService
- enterprise-messaging core that creates the connection factory
- enterprise-messaging JMS extension that provides the
MessagingServiceJmsConnectionFactory
i.e. the pom.xml should have dependencies as below.
XMLCopy<dependency> <groupId>com.sap.cloud.servicesdk.xbem</groupId> <artifactId>emjapi-connector-sap-cp</artifactId> <version>${version.xbem.client}</version> </dependency> <dependency> <groupId>com.sap.cloud.servicesdk.xbem</groupId> <artifactId>emjapi-core</artifactId> <version>${version.xbem.client}</version> </dependency> <dependency> <groupId>com.sap.cloud.servicesdk.xbem</groupId> <artifactId>emjapi-extension-sap-cp-jms</artifactId> <version>${version.xbem.client}</version> </dependency>
Which files do you have to create for your java client application?
- enterprise-messaging spring service connector that provides the
- Step 3
-
Open the
manifest.yml
file for the projects and make changes to the following parameters:-
applications:
-
name: «Customized name of choice. Should be unique for a space of SCP» -
services:
-
«name of the SAP Event Mesh instance»
-
-
Get the
MessagingService
JavaCopyServiceConnectorConfig config = null; // currently there are no configurations for the MessagingServiceFactory supported Cloud cloud = new CloudFactory().getCloud(); // get a messaging service factory via the service connector MessagingService messagingService = cloud.getSingletonServiceConnector(MessagingService.class, config);
-
Create a
MessagingServiceFactory
object with the help ofMessagingServiceFactoryCreator
and get aMessagingServiceJmsConnectionFactory
.The Connection Factory can be configured with the
MessagingServiceJmsSettings
. In case the reconnection feature is not needed and an individual connection mechanism (for example, through a connection cache) is used these settings can be skipped. The connection factory can be built withmessagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class,settings)
.JavaCopyMessagingServiceJmsSettings settings = new MessagingServiceJmsSettings(); // settings are preset with default values (see JavaDoc) settings.setMaxReconnectAttempts(5); // use -1 for unlimited attempts settings.setInitialReconnectDelay(3000); settings.setReconnectDelay(3000); MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService); MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings)
-
Create a connection and a session
JavaCopyConnection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
-
- Step 4
Sending
Open the
MessagingServiceRestController.java
source code. Change the value ofQUEUE_PATH
based on the values of the instance you created.JavaCopy
private static final String QUEUE_PATH = "queue/{queueName}";For sending messages a Connection and a Session are required first. Note that those resources must be closed if they are not needed anymore. As those objects
are implementing theautoclosable
interface they will be closed automatically after the try-catch-block. Now aBytesMessage
can be created. In the next
steps a queue is bound to a producer. The queue must be created on the broker first (via for example, the UI or MM API). Note that the prefix “queue:” is
mandatory. Finally, the message can be sent to the queue.JavaCopytry ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { connection.start(); BytesMessage byteMessage = session.createBytesMessage(); byteMessage.writeBytes(message.getBytes()); Queue queue = session.createQueue("queue:" + "<queue-name>"); // even though the JMS API is "createQueue" the queue will not be created on the message broker MessageProducer producer = session.createProducer(queue); producer.send(byteMessage); } catch (JMSException e) { LOG.error("Could not send message={}.", message, e); }
Receiving
In this example, a consumer is listening to a queue. Again a Connection and a Session are required. Note that those resources must be closed if they are not
needed anymore. First, a queue with the mandatory prefix “queue:” is bound to a consumer. Since the messages are sent as aByteMassage
, the message needs to be
converted to say a String.JavaCopytry (Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { connection.start(); Queue queue = session.createQueue(QUEUE_PREFIX + queueName); // see comments above MessageConsumer consumer = session.createConsumer(queue); BytesMessage message = (BytesMessage) consumer.receive(); // Blocking call. Define a timeout or use a Message Listener byte[] byteData = new byte[(int) message.getBodyLength()]; message.readBytes(byteData); } catch (JMSException e) { LOG.error("Could not receive message.", e); }
- Step 5
Open the
MessageingServiceRestController.java
source code. Change the value ofTOPIC_PATH
andQUEUE_PATH
based on the values of the instance you created.JavaCopy`private static final String TOPIC_PATH = "topic/{topicName}"; private static final String QUEUE_PATH = "queue/{queueName}";`
Sending
For sending messages a Connection and a Session are required first. Note that those resources must be closed if they are not needed anymore. As those objects are
implementing theautoclosable
interface they will be closed automatically after the try-catch-block. Now aBytesMessage
can be created. In the next steps,
a topic is bound (not created) to a producer. Note, that the prefix “topic:” is mandatory. Finally, the message can be sent to the topic.JavaCopytry ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { connection.start(); Topic topic = session.createTopic("topic:" + "<topic-name>"); BytesMessage byteMessage = session.createBytesMessage(); byteMessage.writeBytes(message.getBytes()); MessageProducer producer = session.createProducer(topic); producer.send(byteMessage); } catch (JMSException e) { LOG.error("Could not send message={}.", message, e); }
Receiving
Currently, direct topic subscription is not supported for the default plan. In this example, a consumer is subscribed to a specific topic. Again a Connection
and a Session are needed. Note that those resources must be closed if they are not needed anymore. First a topic (not created) with the mandatory prefix “topic:”
is bound to consumer. Since the messages are sent as aByteMessage
the message needs to be converted to say a StringJavaCopytry ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { connection.start(); Topic topic = session.createTopic(TOPIC_PREFIX + topicName); MessageConsumer consumer = session.createConsumer(topic); // Blocking call. Define a timeout or use a Message Listener BytesMessage message = (BytesMessage) consumer.receive(); byte[] byteData = new byte[(int) message.getBodyLength()]; message.readBytes(byteData); } catch (JMSException e) { LOG.error("Could not receive message.", e); }
- Step 6
-
Build the project with maven (
maven clean install
). -
Push it to Cloud Foundry via
cf push
using CLI from the folder where the executable is available. -
After successful deployment, follow Send and Receive Test Event Mesh to test sending and receiving of message using the Java client.
What communication options does Event Mesh offer to connect applications, services and systems?
-