Lagom Message Broker API Testing

Table of contents
Reading Time: 3 minutes

Many of us have been working with Lagom, so I will skip telling about Lagom. Let’s just quickly go through the  important APIs that Lagom provides:

  • Service API
  • Message Broker API
  • Pesistence API

In this blog, we will see how we can test the Message Broker API provided by Lagom. But before doing that let’s just briefly see what this API provides!

Lagom’s Message Broker API lets services communicate with each other asynchronously by providing a distributed publish-subscribe model that services can use to share data via topics.
And as we all know the importance to write test cases for whatever code we write, so to be able to do so Lagom also provides ways to test this message broker Api.

Lagom lets de-coupled services to be tested by providing in-memory implementations of the Message Broker API to run tests fast, instead of starting the broker for consumption and publishing of messages in test cases.

Enough of the talking, lets see some code!

Here is an upstream service named HelloService which is responsible for the production of messages to the topic named greetings



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


public interface HelloService extends Service {
ServiceCall<NotUsed, String> hello(String id);
ServiceCall<GreetingMessage, Done> useGreeting(String id);
Topic<HelloEvent> helloEvents();
@Override default Descriptor descriptor() {
return named("hello")
.withCalls(pathCall("/api/hello/:id", this::hello),
pathCall("/api/hello/:id", this::useGreeting))
.withTopics( topic("hello-events", this::helloEvents))
.withAutoAcl(true);
}
}

And there is also a consumer service named StreamService which is consuming from greetings topic of HelloService.

There are two ways in which we can consume the messages.
First one is to consume the messages in the StreamServiceImpl itself, and the second one by creating the consumer class and binding it as an EagerSingleton in the module.
It depends on your use case which one to go with.

Here we will be binding the consumer class as an EagerSingleton class as we want to pull messages continuously from greetings topic of HelloService.

import akka.Done;
import akka.stream.javadsl.Flow;
import org.knoldus.hello.api.HelloEvent;
import org.knoldus.hello.api.HelloService;

import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;

/**
 * This subscribes to the HelloService event stream.
 */
public class StreamSubscriber {

  @Inject
  public StreamSubscriber(HelloService helloService, StreamRepository repository) {
    // Create a subscriber
    helloService.helloEvents().subscribe()
      // And subscribe to it with at least once processing semantics.
      .atLeastOnce(
        //consume events and do some processing
      );

  }
}

So, we have two services, one is the HelloService whose responsibility is to produce messages  to the topic and second one is the StreamService which consumes from HelloService. Testing both in isolation is very important.

Let’s first see how we can test the consumer class.
For testing the consumer implementation in isolation with the producer, we require to start the Service under test i.e. StreamService along with a stub of the upstream Service which is responsible for producing data into the topic in our case “HelloService“, to verify if the messages are consumed properly from the topic or not, the steps  to be followed are:

I. Setting up the server:
i. Create a server with a modified Setup where the upstream HelloService is replaced with a HelloServiceStub.

final ServiceTest.Setup setup = defaultSetup().withCassandra(true).configureBuilder(builder -> builder.overrides(
        bind(HelloService.class).to(HelloServiceStub.class)
));

ii. Get the service client for a service.

HelloService service = testServer.client(HelloService.class);

II. Stubbing the upstream service
i. 
An instance of a ProducerStub is to be declared. This instance will be bound when the Server is started and the HelloServiceStub created.

private static ProducerStub<HelloEvent> producer;

ii. Inject ProducerStubFactory to obtain a ProducerStub for the appropriate Topic in the constructor.

iii. Request the producer for a specific topic: The stubbed method that implements the TopicCall, must return the Topic bound to the ProducerStub created in the constructor.

III. In the test case:
Use the ProducerStub to send messages to the topic and interact normally with the service under test to verify the Service code.
Here is the whole test class with the server setup and stub of the upstream service along with the test cases:



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


import akka.Done;
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.api.broker.Topic;
import com.lightbend.lagom.javadsl.testkit.ProducerStub;
import com.lightbend.lagom.javadsl.testkit.ProducerStubFactory;
import com.lightbend.lagom.javadsl.testkit.ServiceTest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.knoldus.hello.api.GreetingMessage;
import org.knoldus.hello.api.HelloEvent;
import org.knoldus.hello.api.HelloService;
import scala.concurrent.duration.FiniteDuration;
import javax.inject.Inject;
import static com.lightbend.lagom.javadsl.testkit.ServiceTest.defaultSetup;
import static com.lightbend.lagom.javadsl.testkit.ServiceTest.eventually;
import static com.lightbend.lagom.javadsl.testkit.ServiceTest.startServer;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static play.inject.Bindings.bind;
public class ConsumerTest {
private static volatile ServiceTest.TestServer testServer;
private static ProducerStub<HelloEvent> producer;
private static HelloService service;
@BeforeClass
public static void setUp() {
final ServiceTest.Setup setup = defaultSetup().withCassandra(true).configureBuilder(builder -> builder.overrides(
bind(HelloService.class).to(HelloServiceStub.class)
));
testServer = startServer(setup);
service = testServer.client(HelloService.class);
}
@Test
public void shouldEventsConsumeMessageFromMessageService() {
producer.send(new HelloEvent.GreetingMessageChanged("Shivangi", "hello"));
eventually(new FiniteDuration(10, SECONDS), () -> {
//here you can use a service client instance to interact with the service
// and assert that the message was processed as expected.
});
assertEquals("hello-events", service.helloEvents().topicId().value());
}
static class HelloServiceStub implements HelloService {
@Inject
HelloServiceStub(ProducerStubFactory producerStubFactory) {
producer = producerStubFactory.producer("hello-events");
}
@Override
public ServiceCall<NotUsed, String> hello(String id) {
return null;
}
@Override
public ServiceCall<GreetingMessage, Done> useGreeting(String id) {
return null;
}
@Override
public Topic<HelloEvent> helloEvents() {
return producer.topic();
}
}
}
view raw

test

hosted with ❤ by GitHub

We have seen how we can test the consumer, let’s see how we can test the producer in isolation from the consumer. Following are the steps to be followed:

Setting up the server
i.
Using the ServiceTest, create a client representing the producer service.
ii. Use the created client to subscribe to the published topic.
iii. Finally, after interacting with the Service to cause the emission of some events you can assert events were published on the Topic.



This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters


import akka.stream.javadsl.Source;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import com.lightbend.lagom.javadsl.persistence.PersistentEntityRegistry;
import com.lightbend.lagom.javadsl.testkit.ServiceTest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.knoldus.hello.api.GreetingMessage;
import org.knoldus.hello.api.HelloEvent;
import org.knoldus.hello.api.HelloService;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import static com.lightbend.lagom.javadsl.testkit.ServiceTest.defaultSetup;
import static com.lightbend.lagom.javadsl.testkit.ServiceTest.startServer;
import static org.junit.Assert.assertEquals;
public class ProducerTest {
private static ServiceTest.TestServer testServer;
private static HelloService service;
private static PersistentEntityRegistry persistentEntityRegistry;
@BeforeClass
public static void setUp() {
testServer = startServer(defaultSetup().withCassandra(true));
service = testServer.client(HelloService.class);
persistentEntityRegistry = testServer.injector().instanceOf(PersistentEntityRegistry.class);
}
@Test
public void shouldEmitGreetingsMessageWhenHelloEntityEmitsEnEvent() {
service.useGreeting("user_id").invoke(new GreetingMessage("hi")).toCompletableFuture().join();
Source<HelloEvent, ?> source = service.helloEvents().subscribe().atMostOnceSource();
TestSubscriber.Probe<HelloEvent> probe =
source.runWith(TestSink.probe(testServer.system()), testServer.materializer());
HelloEvent helloEvent = probe.request(1).expectNext(FiniteDuration.apply(25, TimeUnit.SECONDS));
assertEquals("user_id", helloEvent.getName());
}
}
view raw

Producer.test

hosted with ❤ by GitHub

I hope you have understood the topic well. If you have any doubts, please leave a message in the comments section below.
Thank you for reading. Happy blogging 🙂
References:

Written by 

I am a Software Consultant and has experience of more than 1.5 years. I like to study and write about latest technologies.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading