Observer Design Pattern & RxJava & @Async

 

Observer Design Pattern

According to GoF definition, observer pattern defines a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. It is also referred to as the publish-subscribe pattern.

In observer pattern, there are many observers (subscriber objects) that are observing a particular subject (publisher object). Observers register themselves to a subject to get a notification when there is a change made inside that subject.

A observer object can register or unregister from subject at any point of time. It helps is making the objects objects loosely coupled.

1. When to use observer design pattern

As described above, when you have a design a system where multiple entities are interested in any possible update to some particular second entity object, we can use the observer pattern



The flow is very simple to understand. Application creates the concrete subject object. All concrete observers register themselves to be notified for any further update in the state of subject.

As soon as the state of subject changes, subject notifies all the registered observers and the observers can access the updated state and act accordingly.




                                                                Observer Pattern Sequence Diagram


2. Real world example of observer pattern

  • A real world example of observer pattern can be any social media platform such as Facebook or twitter. When a person updates his status – all his followers gets the notification.

    A follower can follow or unfollow another person at any point of time. Once unfollowed, person will not get the notifications from subject in future.

  • In programming, observer pattern is the basis of message oriented applications. When a application has updated it’s state, it notifies the subscribers about updates. Frameworks like HornetQJMS work on this pattern.
  • Similarly, Java UI based programming, all keyboard and mouse events are handled by it’s listeners objects and designated functions. When user click the mouse, function subscribed to the mouse click event is invoked with all the context data passed to it as method argument.

3. Observer design pattern

3.1. Architecture




3.2. Design participants

The observer pattern has four participants.

  • Subject – interface or abstract class defining the operations for attaching and de-attaching observers to the subject.
  • ConcreteSubject – concrete Subject class. It maintain the state of the object and when a change in the state occurs it notifies the attached Observers.
  • Observer – interface or abstract class defining the operations to be used to notify this object.
  • ConcreteObserver – concrete Observer implementations.

4. Observer design pattern example

In below example, I am creating a message publisher of type Subject and three subscribers of type Observer. Publisher will publish the message periodically to all subscribed or attached observers and they will print the updated message to console.

Subject and ConcreteSubject

Subject.java
public interface Subject
{
    public void attach(Observer o);
    public void detach(Observer o);
    public void notifyUpdate(Message m);
}
MessagePublisher.java
import java.util.ArrayList;
import java.util.List;
 
public class MessagePublisher implements Subject {
     
    private List<Observer> observers = new ArrayList<>();
 
    @Override
    public void attach(Observer o) {
        observers.add(o);
    }
 
    @Override
    public void detach(Observer o) {
        observers.remove(o);
    }
 
    @Override
    public void notifyUpdate(Message m) {
        for(Observer o: observers) {
            o.update(m);
        }
    }
}

Observer and ConcreteObservers

Observer.java
public interface Observer
{
    public void update(Message m);
}
MessageSubscriberOne.java
public class MessageSubscriberOne implements Observer
{
    @Override
    public void update(Message m) {
        System.out.println("MessageSubscriberOne :: " + m.getMessageContent());
    }
}
MessageSubscriberTwo.java
public class MessageSubscriberTwo implements Observer
{
    @Override
    public void update(Message m) {
        System.out.println("MessageSubscriberTwo :: " + m.getMessageContent());
    }
}
MessageSubscriberThree.java
public class MessageSubscriberThree implements Observer
{
    @Override
    public void update(Message m) {
        System.out.println("MessageSubscriberThree :: " + m.getMessageContent());
    }
}

State object

This must be an immutable object so that no class can modify it’s content by mistake.

Message.java
public class Message
{
    final String messageContent;
     
    public Message (String m) {
        this.messageContent = m;
    }
 
    public String getMessageContent() {
        return messageContent;
    }
}

Now test the communication between publisher and subscribers.

Main.java
public class Main
{
    public static void main(String[] args)
    {
        MessageSubscriberOne s1 = new MessageSubscriberOne();
        MessageSubscriberTwo s2 = new MessageSubscriberTwo();
        MessageSubscriberThree s3 = new MessageSubscriberThree();
         
        MessagePublisher p = new MessagePublisher();
         
        p.attach(s1);
        p.attach(s2);
         
        p.notifyUpdate(new Message("First Message"));   //s1 and s2 will receive the update
         
        p.detach(s1);
        p.attach(s3);
         
        p.notifyUpdate(new Message("Second Message")); //s2 and s3 will receive the update
    }
}

Program output.

Console
MessageSubscriberOne :: First Message
MessageSubscriberTwo :: First Message
 
MessageSubscriberTwo :: Second Message
MessageSubscriberThree :: Second Message

5. FAQs

  • Can different types of observers register to one subject?

    The nature and functionality of observers can be different but they all must implement the one common Observer interface which the subject support for registering and deregistering.

  • Can I add or remove observers at runtime?

    Yes. We can add or remove the observers at any point of time.

  • Difference between observer pattern and chain of responsibility pattern?

    In an observer pattern, all registered handler objects get notifications at the same time and they process the update at same time.

    But in a chain of responsibility pattern, handler objects in the chain are notified one by one, and this process continues until one object fully handles the notification.

  • Benefits of the observer pattern?

    The subject and observers make a loosely coupled system. They do not need to know each other explicitly. We can independently add or remove observers at any time.

Related Java classes:

Observer Java Doc
Observable Java Doc




RxJava









👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏👏


  Building @Async REST APIs with Spring @EnableAsync

Learn to create asynchronous controller methods in Spring framework with the help of @Async and @EnableAsync annotations, async thread pool on top of Java ExecutorService framework.

1. Spring @Async rest controller

Spring comes with @EnableAsync annotation and can be applied on application classes for asynchronous behavior. This annotation will look for methods marked with @Async annotation and run in background thread pools. The @Async annotated methods can return CompletableFuture to hold the result of an asynchronous computation.

To enable async configuration in spring, follow these steps:

  1. Create async thread pool

    AsyncConfiguration.java
    @Configuration
    @EnableAsync
    public class AsynchConfiguration
    {
      @Bean(name = "asyncExecutor")
      public Executor asyncExecutor()
      {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(3);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsynchThread-");
        executor.initialize();
        return executor;
      }
    }
  2. @Async controller methods

    Methods which shall run asynchronously, annotate them with @Async annotation and method return type should return

    @Async("asyncExecutor")
    public CompletableFuture<EmployeeNames> methodOne() throws InterruptedException {
      //code
    }
  3. Combine async method results

    Inside REST Controller
    CompletableFuture.allOf(methodOne, methodTwo, methodThree).join();

2. Spring @Async rest controller example

In this demo, we will create an REST API which will fetch data from three (3) remote services asynchronously and when responses from all 3 services is available then aggregate the responses. e.g.

  1. Invoke EmployeeName API
  2. Invoke EmployeeAddress API
  3. Invoke EmployeePhone API
  4. Wait for responses from above services
  5. Aggregate all three API responses and build final response to send back to client

2.1. EmployeeName, EmployeeAddress and EmployeePhone APIs to be accessed async way

EmployeeDataController.java
package com.howtodoinjava.example.sampleservice.controller;
 
import java.util.ArrayList;
import java.util.List;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import com.howtodoinjava.example.sampleservice.model.EmployeeAddress;
import com.howtodoinjava.example.sampleservice.model.EmployeeAddresses;
import com.howtodoinjava.example.sampleservice.model.EmployeeName;
import com.howtodoinjava.example.sampleservice.model.EmployeeNames;
import com.howtodoinjava.example.sampleservice.model.EmployeePhone;
 
@RestController
public class EmployeeDataController
{
  private static Logger log = LoggerFactory.getLogger(EmployeeDataController.class);
 
  @RequestMapping(value = "/addresses", method = RequestMethod.GET)
  public EmployeeAddresses getAddresses()
  {
    log.info("get addresses Start");
 
    EmployeeAddresses employeeAddressesList = new EmployeeAddresses();
 
    EmployeeAddress employeeAddress1 = new EmployeeAddress();
    EmployeeAddress employeeAddress2 = new EmployeeAddress();
     
    List<EmployeeAddress> addressList = new ArrayList<EmployeeAddress>();
     
    {
      employeeAddress1.setHouseNo("1111");
      employeeAddress1.setStreetNo("111");
      employeeAddress1.setZipCode("111111");
       
      employeeAddress2.setHouseNo("222");
      employeeAddress2.setStreetNo("222");
      employeeAddress2.setZipCode("222222");
       
      addressList.add(employeeAddress1);
      addressList.add(employeeAddress2);
       
      employeeAddressesList.setEmployeeAddressList(addressList);
    }
 
    return employeeAddressesList;
  }
 
  @RequestMapping(value = "/phones", method = RequestMethod.GET)
  public EmployeePhone getPhoneNumbers()
  {
    log.info("get phones Start");
 
    EmployeePhone employeePhone = new EmployeePhone();
    {
      ArrayList<String> phoneNumberList = new ArrayList<String>();
       
      phoneNumberList.add("100000");
      phoneNumberList.add("200000");
       
      employeePhone.setPhoneNumbers(phoneNumberList);
    }
 
    return employeePhone;
  }
 
  @RequestMapping(value = "/names", method = RequestMethod.GET)
  public EmployeeNames getEmployeeName()
  {
    log.info("get names Start");
 
    EmployeeNames employeeNamesList = new EmployeeNames();
 
    EmployeeName employeeName1 = new EmployeeName();
    EmployeeName employeeName2 = new EmployeeName();
     
    List<EmployeeName> employeeList = new ArrayList<EmployeeName>();
    {
      employeeName1.setFirstName("Santa");
      employeeName1.setLastName("Singh");
    }
    {
      employeeName2.setFirstName("Banta");
      employeeName2.setLastName("Singh");
    }
 
    employeeList.add(employeeName1);
    employeeList.add(employeeName2);
 
    employeeNamesList.setEmployeeNameList(employeeList);
 
    return employeeNamesList;
  }
}

2.2. Async thread pool configuration

AsyncConfiguration.java
import java.util.concurrent.Executor;
 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
@Configuration
@EnableAsync
public class AsyncConfiguration
{
  @Bean(name = "asyncExecutor")
  public Executor asyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(3);
    executor.setMaxPoolSize(3);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("AsynchThread-");
    executor.initialize();
    return executor;
  }
}

2.3. Spring @Async controller methods

AsyncService.java
package com.howtodoinjava.example.async.service;
 
import java.util.concurrent.CompletableFuture;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
 
import com.howtodoinjava.example.async.model.EmployeeAddresses;
import com.howtodoinjava.example.async.model.EmployeeNames;
import com.howtodoinjava.example.async.model.EmployeePhone;
 
@Service
public class AsyncService {
 
  private static Logger log = LoggerFactory.getLogger(AsyncService.class);
 
  @Autowired
  private RestTemplate restTemplate;
 
  @Bean
  public RestTemplate restTemplate() {
    return new RestTemplate();
  }
 
  @Async("asyncExecutor")
  public CompletableFuture<EmployeeNames> getEmployeeName() throws InterruptedException
  {
    log.info("getEmployeeName starts");
 
    EmployeeNames employeeNameData = restTemplate.getForObject("http://localhost:8080/name", EmployeeNames.class);
 
    log.info("employeeNameData, {}", employeeNameData);
    Thread.sleep(1000L);  //Intentional delay
    log.info("employeeNameData completed");
    return CompletableFuture.completedFuture(employeeNameData);
  }
 
  @Async("asyncExecutor")
  public CompletableFuture<EmployeeAddresses> getEmployeeAddress() throws InterruptedException
  {
    log.info("getEmployeeAddress starts");
 
    EmployeeAddresses employeeAddressData = restTemplate.getForObject("http://localhost:8080/address", EmployeeAddresses.class);
 
    log.info("employeeAddressData, {}", employeeAddressData);
    Thread.sleep(1000L);  //Intentional delay
    log.info("employeeAddressData completed");
    return CompletableFuture.completedFuture(employeeAddressData);
  }
 
  @Async("asyncExecutor")
  public CompletableFuture<EmployeePhone> getEmployeePhone() throws InterruptedException
  {
    log.info("getEmployeePhone starts");
 
    EmployeePhone employeePhoneData = restTemplate.getForObject("http://localhost:8080/phone", EmployeePhone.class);
 
    log.info("employeePhoneData, {}", employeePhoneData);
    Thread.sleep(1000L);  //Intentional delay
    log.info("employeePhoneData completed");
    return CompletableFuture.completedFuture(employeePhoneData);
  }
}

2.4. Call async methods and aggregate results

package com.howtodoinjava.example.async.controller;
 
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import com.howtodoinjava.example.async.model.EmployeeAddresses;
import com.howtodoinjava.example.async.model.EmployeeNames;
import com.howtodoinjava.example.async.model.EmployeePhone;
import com.howtodoinjava.example.async.service.AsyncService;
 
@RestController
public class AsyncController {
 
  private static Logger log = LoggerFactory.getLogger(AsyncController.class);
 
  @Autowired
  private AsyncService service;
 
  @RequestMapping(value = "/testAsynch", method = RequestMethod.GET)
  public void testAsynch() throws InterruptedException, ExecutionException
  {
    log.info("testAsynch Start");
 
    CompletableFuture<EmployeeAddresses> employeeAddress = service.getEmployeeAddress();
    CompletableFuture<EmployeeNames> employeeName = service.getEmployeeName();
    CompletableFuture<EmployeePhone> employeePhone = service.getEmployeePhone();
 
    // Wait until they are all done
    CompletableFuture.allOf(employeeAddress, employeeName, employeePhone).join();
     
    log.info("EmployeeAddress--> " + employeeAddress.get());
    log.info("EmployeeName--> " + employeeName.get());
    log.info("EmployeePhone--> " + employeePhone.get());
  }
}

2.5. How to run the demo

Download and start both the applications.

Hit the API: http://localhost:8081/testAsynch.

Observe the output in console.

2.5.1. With @Aync Enabled
With Aync Methods Enabled


2.5.2. Without Aync Enabled
Without Aync Methods Enabled


Drop me your questions related to creating spring boot non blocking rest api.

Happy Learning !!

References:

https://spring.io/guides/gs/async-method/

Comments

Popular posts from this blog

Hibernate (Java) -- by jps sasadara

JAVA uml Based cording <<< by jps sasadara >>>