Observer Design Pattern & RxJava & @Async
Observer Design Pattern
According to GoF definition, observer pattern defines a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. It is also referred to as the publish-subscribe pattern.
In observer pattern, there are many observers (subscriber objects) that are observing a particular subject (publisher object). Observers register themselves to a subject to get a notification when there is a change made inside that subject.
A observer object can register or unregister from subject at any point of time. It helps is making the objects objects loosely coupled.
1. When to use observer design pattern
As described above, when you have a design a system where multiple entities are interested in any possible update to some particular second entity object, we can use the observer pattern
The flow is very simple to understand. Application creates the concrete subject object. All concrete observers register themselves to be notified for any further update in the state of subject.
As soon as the state of subject changes, subject notifies all the registered observers and the observers can access the updated state and act accordingly.
2. Real world example of observer pattern
- A real world example of observer pattern can be any social media platform such as Facebook or twitter. When a person updates his status – all his followers gets the notification.
A follower can follow or unfollow another person at any point of time. Once unfollowed, person will not get the notifications from subject in future.
- In programming, observer pattern is the basis of message oriented applications. When a application has updated it’s state, it notifies the subscribers about updates. Frameworks like HornetQ, JMS work on this pattern.
- Similarly, Java UI based programming, all keyboard and mouse events are handled by it’s listeners objects and designated functions. When user click the mouse, function subscribed to the mouse click event is invoked with all the context data passed to it as method argument.
3. Observer design pattern
3.1. Architecture
3.2. Design participants
The observer pattern has four participants.
- Subject – interface or abstract class defining the operations for attaching and de-attaching observers to the subject.
- ConcreteSubject – concrete Subject class. It maintain the state of the object and when a change in the state occurs it notifies the attached Observers.
- Observer – interface or abstract class defining the operations to be used to notify this object.
- ConcreteObserver – concrete Observer implementations.
4. Observer design pattern example
In below example, I am creating a message publisher of type Subject
and three subscribers of type Observer
. Publisher will publish the message periodically to all subscribed or attached observers and they will print the updated message to console.
Subject and ConcreteSubject
public interface Subject { public void attach(Observer o); public void detach(Observer o); public void notifyUpdate(Message m); } |
import java.util.ArrayList; import java.util.List; public class MessagePublisher implements Subject { private List<Observer> observers = new ArrayList<>(); @Override public void attach(Observer o) { observers.add(o); } @Override public void detach(Observer o) { observers.remove(o); } @Override public void notifyUpdate(Message m) { for (Observer o: observers) { o.update(m); } } } |
Observer and ConcreteObservers
public interface Observer { public void update(Message m); } |
public class MessageSubscriberOne implements Observer { @Override public void update(Message m) { System.out.println( "MessageSubscriberOne :: " + m.getMessageContent()); } } |
public class MessageSubscriberTwo implements Observer { @Override public void update(Message m) { System.out.println( "MessageSubscriberTwo :: " + m.getMessageContent()); } } |
public class MessageSubscriberThree implements Observer { @Override public void update(Message m) { System.out.println( "MessageSubscriberThree :: " + m.getMessageContent()); } } |
State object
This must be an immutable object so that no class can modify it’s content by mistake.
public class Message { final String messageContent; public Message (String m) { this .messageContent = m; } public String getMessageContent() { return messageContent; } } |
Now test the communication between publisher and subscribers.
public class Main { public static void main(String[] args) { MessageSubscriberOne s1 = new MessageSubscriberOne(); MessageSubscriberTwo s2 = new MessageSubscriberTwo(); MessageSubscriberThree s3 = new MessageSubscriberThree(); MessagePublisher p = new MessagePublisher(); p.attach(s1); p.attach(s2); p.notifyUpdate( new Message( "First Message" )); //s1 and s2 will receive the update p.detach(s1); p.attach(s3); p.notifyUpdate( new Message( "Second Message" )); //s2 and s3 will receive the update } } |
Program output.
MessageSubscriberOne :: First Message MessageSubscriberTwo :: First Message MessageSubscriberTwo :: Second Message MessageSubscriberThree :: Second Message |
5. FAQs
- Can different types of observers register to one subject?
The nature and functionality of observers can be different but they all must implement the one common
Observer
interface which the subject support for registering and deregistering. - Can I add or remove observers at runtime?
Yes. We can add or remove the observers at any point of time.
- Difference between observer pattern and chain of responsibility pattern?
In an observer pattern, all registered handler objects get notifications at the same time and they process the update at same time.
But in a chain of responsibility pattern, handler objects in the chain are notified one by one, and this process continues until one object fully handles the notification.
- Benefits of the observer pattern?
The subject and observers make a loosely coupled system. They do not need to know each other explicitly. We can independently add or remove observers at any time.
Related Java classes:
Learn to create asynchronous controller methods in Spring framework with the help of @Async
and @EnableAsync
annotations, async thread pool on top of Java ExecutorService
framework.
1. Spring @Async rest controller
Spring comes with @EnableAsync
annotation and can be applied on application classes for asynchronous behavior. This annotation will look for methods marked with @Async
annotation and run in background thread pools. The @Async
annotated methods can return CompletableFuture
to hold the result of an asynchronous computation.
To enable async configuration in spring, follow these steps:
Create async thread pool
AsyncConfiguration.java @Configuration
@EnableAsync
public
class
AsynchConfiguration
{
@Bean
(name =
"asyncExecutor"
)
public
Executor asyncExecutor()
{
ThreadPoolTaskExecutor executor =
new
ThreadPoolTaskExecutor();
executor.setCorePoolSize(
3
);
executor.setMaxPoolSize(
3
);
executor.setQueueCapacity(
100
);
executor.setThreadNamePrefix(
"AsynchThread-"
);
executor.initialize();
return
executor;
}
}
@Async controller methods
Methods which shall run asynchronously, annotate them with
@Async
annotation and method return type should return@Async
(
"asyncExecutor"
)
public
CompletableFuture<EmployeeNames> methodOne()
throws
InterruptedException {
//code
}
Combine async method results
Inside REST Controller CompletableFuture.allOf(methodOne, methodTwo, methodThree).join();
2. Spring @Async rest controller example
In this demo, we will create an REST API which will fetch data from three (3) remote services asynchronously and when responses from all 3 services is available then aggregate the responses. e.g.
- Invoke
EmployeeName
API - Invoke
EmployeeAddress
API - Invoke
EmployeePhone
API - Wait for responses from above services
- Aggregate all three API responses and build final response to send back to client
2.1. EmployeeName, EmployeeAddress and EmployeePhone APIs to be accessed async way
package com.howtodoinjava.example.sampleservice.controller; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import com.howtodoinjava.example.sampleservice.model.EmployeeAddress; import com.howtodoinjava.example.sampleservice.model.EmployeeAddresses; import com.howtodoinjava.example.sampleservice.model.EmployeeName; import com.howtodoinjava.example.sampleservice.model.EmployeeNames; import com.howtodoinjava.example.sampleservice.model.EmployeePhone; @RestController public class EmployeeDataController { private static Logger log = LoggerFactory.getLogger(EmployeeDataController. class ); @RequestMapping (value = "/addresses" , method = RequestMethod.GET) public EmployeeAddresses getAddresses() { log.info( "get addresses Start" ); EmployeeAddresses employeeAddressesList = new EmployeeAddresses(); EmployeeAddress employeeAddress1 = new EmployeeAddress(); EmployeeAddress employeeAddress2 = new EmployeeAddress(); List<EmployeeAddress> addressList = new ArrayList<EmployeeAddress>(); { employeeAddress1.setHouseNo( "1111" ); employeeAddress1.setStreetNo( "111" ); employeeAddress1.setZipCode( "111111" ); employeeAddress2.setHouseNo( "222" ); employeeAddress2.setStreetNo( "222" ); employeeAddress2.setZipCode( "222222" ); addressList.add(employeeAddress1); addressList.add(employeeAddress2); employeeAddressesList.setEmployeeAddressList(addressList); } return employeeAddressesList; } @RequestMapping (value = "/phones" , method = RequestMethod.GET) public EmployeePhone getPhoneNumbers() { log.info( "get phones Start" ); EmployeePhone employeePhone = new EmployeePhone(); { ArrayList<String> phoneNumberList = new ArrayList<String>(); phoneNumberList.add( "100000" ); phoneNumberList.add( "200000" ); employeePhone.setPhoneNumbers(phoneNumberList); } return employeePhone; } @RequestMapping (value = "/names" , method = RequestMethod.GET) public EmployeeNames getEmployeeName() { log.info( "get names Start" ); EmployeeNames employeeNamesList = new EmployeeNames(); EmployeeName employeeName1 = new EmployeeName(); EmployeeName employeeName2 = new EmployeeName(); List<EmployeeName> employeeList = new ArrayList<EmployeeName>(); { employeeName1.setFirstName( "Santa" ); employeeName1.setLastName( "Singh" ); } { employeeName2.setFirstName( "Banta" ); employeeName2.setLastName( "Singh" ); } employeeList.add(employeeName1); employeeList.add(employeeName2); employeeNamesList.setEmployeeNameList(employeeList); return employeeNamesList; } } |
2.2. Async thread pool configuration
import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class AsyncConfiguration { @Bean (name = "asyncExecutor" ) public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 3 ); executor.setMaxPoolSize( 3 ); executor.setQueueCapacity( 100 ); executor.setThreadNamePrefix( "AsynchThread-" ); executor.initialize(); return executor; } } |
2.3. Spring @Async controller methods
package com.howtodoinjava.example.async.service; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import com.howtodoinjava.example.async.model.EmployeeAddresses; import com.howtodoinjava.example.async.model.EmployeeNames; import com.howtodoinjava.example.async.model.EmployeePhone; @Service public class AsyncService { private static Logger log = LoggerFactory.getLogger(AsyncService. class ); @Autowired private RestTemplate restTemplate; @Bean public RestTemplate restTemplate() { return new RestTemplate(); } @Async ( "asyncExecutor" ) public CompletableFuture<EmployeeNames> getEmployeeName() throws InterruptedException { log.info( "getEmployeeName starts" ); EmployeeNames employeeNameData = restTemplate.getForObject( "http://localhost:8080/name" , EmployeeNames. class ); log.info( "employeeNameData, {}" , employeeNameData); Thread.sleep(1000L); //Intentional delay log.info( "employeeNameData completed" ); return CompletableFuture.completedFuture(employeeNameData); } @Async ( "asyncExecutor" ) public CompletableFuture<EmployeeAddresses> getEmployeeAddress() throws InterruptedException { log.info( "getEmployeeAddress starts" ); EmployeeAddresses employeeAddressData = restTemplate.getForObject( "http://localhost:8080/address" , EmployeeAddresses. class ); log.info( "employeeAddressData, {}" , employeeAddressData); Thread.sleep(1000L); //Intentional delay log.info( "employeeAddressData completed" ); return CompletableFuture.completedFuture(employeeAddressData); } @Async ( "asyncExecutor" ) public CompletableFuture<EmployeePhone> getEmployeePhone() throws InterruptedException { log.info( "getEmployeePhone starts" ); EmployeePhone employeePhoneData = restTemplate.getForObject( "http://localhost:8080/phone" , EmployeePhone. class ); log.info( "employeePhoneData, {}" , employeePhoneData); Thread.sleep(1000L); //Intentional delay log.info( "employeePhoneData completed" ); return CompletableFuture.completedFuture(employeePhoneData); } } |
2.4. Call async methods and aggregate results
package com.howtodoinjava.example.async.controller; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import com.howtodoinjava.example.async.model.EmployeeAddresses; import com.howtodoinjava.example.async.model.EmployeeNames; import com.howtodoinjava.example.async.model.EmployeePhone; import com.howtodoinjava.example.async.service.AsyncService; @RestController public class AsyncController { private static Logger log = LoggerFactory.getLogger(AsyncController. class ); @Autowired private AsyncService service; @RequestMapping (value = "/testAsynch" , method = RequestMethod.GET) public void testAsynch() throws InterruptedException, ExecutionException { log.info( "testAsynch Start" ); CompletableFuture<EmployeeAddresses> employeeAddress = service.getEmployeeAddress(); CompletableFuture<EmployeeNames> employeeName = service.getEmployeeName(); CompletableFuture<EmployeePhone> employeePhone = service.getEmployeePhone(); // Wait until they are all done CompletableFuture.allOf(employeeAddress, employeeName, employeePhone).join(); log.info( "EmployeeAddress--> " + employeeAddress.get()); log.info( "EmployeeName--> " + employeeName.get()); log.info( "EmployeePhone--> " + employeePhone.get()); } } |
2.5. How to run the demo
Download and start both the applications.
Hit the API: http://localhost:8081/testAsynch
.
Observe the output in console.
2.5.1. With @Aync Enabled
2.5.2. Without Aync Enabled
Drop me your questions related to creating spring boot non blocking rest api.
Happy Learning !!
References:
Comments
Post a Comment