AxonFramework

Implementation of Command Query Responsibility Segregation (CQRS) architecture in Java can be a challenging task if performed from scratch, but is very elegant using the AxonFramework. This allows for building of central concepts of CQRS and fosters the usage of event sourcing by providing different event stores for that. The main advantage of event sourcing is the possibility to trigger a replay of the events and re-create the view-side projections from that. This is particular interesting, if the view-side is using non-persistent store of the domain model projection, like cache.

AxonFramework offers two ways of event listener registration. Monitoring event processors are registered on the EventSource in a way that the event delivery is performed synchronously in the same thread. Events delivered in the past can’t be replayed for monitoring event processors. Tracking event processors are registered on the EventSource in a way that a special token is maintained for every event processor, indicating the last event the processor has received. AxonFramework tries to deliver all events between last delivered and latest to the event processor.

Starting from AxonFramework 3.x, the event replay has been removed from the public API, but in fact it can be easily performed.

Tracking Processor

First of all, we need a tracking event processor. This can be registered either by providing a global configuration for all event processors:

@Configuration
public class CommandConfiguration {

    @Autowired
    public void configureProcessors(EventHandlingConfiguration eventHandlingConfiguration) {
        eventHandlingConfiguration.usingTrackingProcessors();
    }
}

I prefer to register only certain processors by explicitly naming them, putting into a processing group:

@ProcessingGroup("myProcessor")
public class MyHandler {

    @EventHandler
    public void on(final MyEvent event) {
       ...
    }
}

@Configuration
public class CommandConfiguration {

    @Autowired
    public void configureProcessors(EventHandlingConfiguration eventHandlingConfiguration) {
        eventHandlingConfiguration.registerTrackingProcessor("myProcessor");
    }
}

After such registration, AxonFramework will create a token for each tracking processor which can be seen in the token_entry table in DB in my case.
A very nice annotation-based approach of registration has been provided by Michiel Rook in his blog.

Accessing the token

AxonFramework uses internal abstraction called TokenStore to read and write tokens. Look on TokenStore interface if you are interested in more details. Essentially, there are InMemoryTokenStore, JdbcTokenStore and JpaTokenStore, implementing this interface. In my case the JpaTokenStore is used.
TokenStore is not offering any possibilities to delete the current Token, so I’m using my own implementation for this. Since I’m relying on Spring Data JPA for persistence, I just defined the TokenJpaRepository in my project in order to have a random access to tokens:

public interface TokenJpaRepository extends JpaRepository<TokenEntry, TokenEntry.PK> {

}

Providing the replay functionality

After all preparations has been finished, the replay can be implemented. Wen need to stop the tracking processor, delete the token and start the processor again:

@Component
@Slf4j
public class TrackingProcessorService {

    private final TokenJpaRepository repository;
    private final EventHandlingConfiguration eventHandlingConfiguration;

    public TrackingProcessorService(EventHandlingConfiguration eventHandlingConfiguration, TokenJpaRepository repository) {
        this.eventHandlingConfiguration = eventHandlingConfiguration;
        this.repository = repository;
    }

    public void startReplay(String name) {
        final TokenEntry.PK id = new TokenEntry.PK(name, 0);
        final TokenEntry one = this.repository.findOne(id);
        final Supplier<? extends RuntimeException> notFoundSupplier = () -> new IllegalArgumentException("Processor " + name + " not registered.");
        if (one == null) {
             throw notFoundSupplier.get();
        }
        this.eventHandlingConfiguration.getProcessor(name).orElseThrow(notFoundSupplier).shutDown();
        this.repository.delete(id);
        this.eventHandlingConfiguration.getProcessor(name).orElseThrow(notFoundSupplier).start();   
    }
}

If you want to start a replay, just call it with the name of the processor. In my example it will be:

trackingProcessorService.startReplay("myProcessor");

I implemented annotation-based registration of tracking processors from Michiel Rook and wired the restart of all registered tracking processors on the restart of the component.