Skip to content

AxonFramework

Implementation of Command Query Responsibility Segregation (CQRS) architecture in Java can be a challenging task if performed from scratch, but is very elegant using the AxonFramework. This allows for building of central concepts of CQRS and fosters the usage of event sourcing by providing different event stores for that. The main advantage of event sourcing is the possibility to trigger a replay of the events and re-create the view-side projections from that. This is particular interesting, if the view-side is using non-persistent store of the domain model projection, like cache.

AxonFramework offers two ways of event listener registration. Monitoring event processors are registered on the EventSource in a way that the event delivery is performed synchronously in the same thread. Events delivered in the past can’t be replayed for monitoring event processors. Tracking event processors are registered on the EventSource in a way that a special token is maintained for every event processor, indicating the last event the processor has received. AxonFramework tries to deliver all events between last delivered and latest to the event processor.

Starting from AxonFramework 3.x, the event replay has been removed from the public API, but in fact it can be easily performed.

Tracking Processor

First of all, we need a tracking event processor. This can be registered either by providing a global configuration for all event processors:

@Configuration
public class CommandConfiguration {

    @Autowired
    public void configureProcessors(EventHandlingConfiguration eventHandlingConfiguration) {
        eventHandlingConfiguration.usingTrackingProcessors();
    }
}

I prefer to register only certain processors by explicitly naming them, putting into a processing group:

@ProcessingGroup("myProcessor")
public class MyHandler {

    @EventHandler
    public void on(final MyEvent event) {
       ...
    }
}

@Configuration
public class CommandConfiguration {

    @Autowired
    public void configureProcessors(EventHandlingConfiguration eventHandlingConfiguration) {
        eventHandlingConfiguration.registerTrackingProcessor("myProcessor");
    }
}

After such registration, AxonFramework will create a token for each tracking processor which can be seen in the token_entry table in DB in my case.
A very nice annotation-based approach of registration has been provided by Michiel Rook in his blog.

Accessing the token

AxonFramework uses internal abstraction called TokenStore to read and write tokens. Look on TokenStore interface if you are interested in more details. Essentially, there are InMemoryTokenStore, JdbcTokenStore and JpaTokenStore, implementing this interface. In my case the JpaTokenStore is used.
TokenStore is not offering any possibilities to delete the current Token, so I’m using my own implementation for this. Since I’m relying on Spring Data JPA for persistence, I just defined the TokenJpaRepository in my project in order to have a random access to tokens:

public interface TokenJpaRepository extends JpaRepository<TokenEntry, TokenEntry.PK> {

}

Providing the replay functionality

After all preparations has been finished, the replay can be implemented. Wen need to stop the tracking processor, delete the token and start the processor again:

@Component
@Slf4j
public class TrackingProcessorService {

    private final TokenJpaRepository repository;
    private final EventHandlingConfiguration eventHandlingConfiguration;

    public TrackingProcessorService(EventHandlingConfiguration eventHandlingConfiguration, TokenJpaRepository repository) {
        this.eventHandlingConfiguration = eventHandlingConfiguration;
        this.repository = repository;
    }

    public void startReplay(String name) {
        final TokenEntry.PK id = new TokenEntry.PK(name, 0);
        final TokenEntry one = this.repository.findOne(id);
        final Supplier<? extends RuntimeException> notFoundSupplier = () -> new IllegalArgumentException("Processor " + name + " not registered.");
        if (one == null) {
             throw notFoundSupplier.get();
        }
        this.eventHandlingConfiguration.getProcessor(name).orElseThrow(notFoundSupplier).shutDown();
        this.repository.delete(id);
        this.eventHandlingConfiguration.getProcessor(name).orElseThrow(notFoundSupplier).start();   
    }
}

If you want to start a replay, just call it with the name of the processor. In my example it will be:

trackingProcessorService.startReplay("myProcessor");

I implemented annotation-based registration of tracking processors from Michiel Rook and wired the restart of all registered tracking processors on the restart of the component.

OpenHAB Ansible Docker

Since some times I’m building a smart home based on OpenHAB. After playing around with local installation and an installation on the Raspberry PI, I decided to change the hosting platform. I’m running a full-fledged HP Proliant G6 server at home (for some other reasons) which has Debian 8 installed on it. For all systems provisioned, I’m using Ansible and Docker. This article gives some insights in the installation.
continue reading…

meanjs

Mean.js is a new popular Javascript full stack using MongoDB for persistence. To be more precise it incorporates MongoDB, ExpressJS, AngularJS, and Node.js and allows for very fast and light-weight application development. The use of scaffolder Yeoman boosts creation of modules, routes, controller, views and other boilerplate Javascript code. As a result, a Single Page Application can be generated very fast. In addition a a user module with Passport authorization supporting a local and several social backend strategies is included.

In order to leverage the the backend authorization capabilities to be used ineth enterprise environment, I combined it with LDAP authorization backend. Since  passport-ldapauth implements this problem it was natural to use this component. In the following post,  I provide the configuration needed for the implementation, inspired by the author of the module.

continue reading…

During last weeks, some local building contractor was involved in reconstruction of our house and renewing the basement drainage. As a result the new basement drainage has been installed and a new hopper for the sewage pumps has been placed. To be honest, it is a lot of heavy work, performed by the builder including digging, insulating, pounding and other dirty staff – but after all the system works if sewage pump is removing the drainage water. In constrast to the earthworks, where you need much experience and human force, which I don’t have, I took over the plumbing and electrical work on the pumps. In order to have a fail-over system, I installed two pumps, where the second pump is triggered if the first one fails. In order to be able to operate on a short circuit of the first pump, I put the second pump on a separate phase (in Europe, we have three phases power supply, 220V each, shifted by 120° to each other, not like split-phase in US). Having this system installed, you get some periodic work to do: finally you want to make sure by regular testing procedures, that the second pump is operating if the first one has failed. Since I’m lazy and like inventing and constucting stuff more than executing regular test procedures, I decided to implement a monitoring system using some cheap electronic and computer components: Raspberry Pi, Tinkerforge Hardware. continue reading…

Ranked

Just started to develop a small application in Scala running on a standard enterprise java stack. You can find more details on github: https://github.com/holisticon/ranked.

Today, I’ll post some details on the persistence layer. The idea is to use Scala case classes and JPA as a persistence layer. For simple attributes it looks very cute:

/**
 * Superclass for all persistent entities.
 */
@MappedSuperclass
abstract class PersistentEntity(
  @BeanProperty @(Column @field)(name = "ID")@(Id @field)@(GeneratedValue @field)(strategy = GenerationType.AUTO) id: Long,
  @BeanProperty @(Column @field)(name = "VERSION")@(Version @field) version: Long) {
  def this() = this(-1, -1);
}

/**
 * Represents a player. A player has a name, initial and current ELO ranking.
 */
@Entity
@Table(name = "PLAYER")
case class Player(
  @BeanProperty @(Column @field)(name = "NAME") name: String) extends PersistentEntity {

  def this() = this(null);
}

/**
 * Represents a team. A team contains of et least one player and might have a name.
 */
@Entity
@Table(name = "TEAM")
case class Team(
  @BeanProperty @(Column @field)(name = "NAME") name: String) extends PersistentEntity {

  def this() = this(null);
}

Stay tuned about the development progress.

If you are interested in Xtext and its new features introduced in the upcomming version 2.0 you might want to install and try them out. Since it will be officially realeased together with Eclipse Indigo, you have to execute some manual steps. In order to be able to install the new feature, you will require to enter two additional update sites into you update manager and download the update site containing xtext itself. The following steps worked for me:

Thanks to Dennis Huebner for the hints….


JFace Databinding enables an easy binding between values inside of data models and SWT/JFace widgets. No more boring listeners to implement – just create observables and connect them using the data binding context. There are several brilliant articles written about it. My favorites are those from Ralf Ebert and Lars Vogel.

One of the interesting aspects of databinding is data validation. The update strategies, responsible for propagation of changes in models or in widgets can be supplied with validators, making sure that the data changes are legal. In the same time the JSR-303 Bean Validation specification focuses on a modern standardized way of data validation. In this post, I combine these subjects and use JSR-303 in JFace Databinding Validators.

One of the core insights of the JSR-303 is the idea of annotation of data validation constraints on data itself. It is indeed a good observation, that validation code strongly relies on the data structure and semantics. To follow this idea consequently, the application developer should care of validation during implementation of business logic as less as possible. A much better idea is to encapsulate the entire validation into domain-specific types. Let me demonstrate it by example, imagine the following class:

public class Customer {
  private String name;
  private String address;
  private String zip;
  private String city;
}

This is perfectly reasonable, but now consider not only the data storage/transport aspects, but also the validation aspects. A standard approach would be to use the following validator logic, in the databinding:

public class CustomerComposite {
[...]
  public void bindValues(Customer model, DataBindingContext dbc) {
    UpdateValueStrategy m2t = new UpdateValueStrategy();
    m2t.setAfterGetValidator(new IValidator() {
      @Override
      public IStatus validate(Object value) {
        String name = (String) value;
        if (name == null || Helper.isRegex(name, "[A-Za-z -]*")) {
          return ValidationStatus.error("Wrong name");
        }
          return ValidationStatus.ok();
        }
      });
    dbc.bindValue(WidgetProperties.text(SWT.Modify).observe(namefield),
      BeanProperties.value(Customer.class, "name").observe(model),
      new UpdateValueStrategy(), m2t);

    m2t = new UpdateValueStrategy();
    m2t.setAfterGetValidator(new IValidator() {
      @Override
      public IStatus validate(Object value) {
        String zipCode = (String) value;
        if (zipCode == null || zipCode.length() > 5 || zipCode.length() < 5 || Helper.isRegex(zipCode, "[0-9]*")) {
          return ValidationStatus.error("Wrong zip code");
        }
          return ValidationStatus.ok();
        }
      });
    dbc.bindValue(WidgetProperties.text(SWT.Modify).observe(zipfield),
      BeanProperties.value(Customer.class, "zip").observe(model),
      new UpdateValueStrategy(), m2t);
  [...]
  }

Pretty much code, and rememeber that JFace Databinding code like this can not be reused in other parts of the application. Let’s put the validation logic on the data declaration in a way how JSR-303 proposes to do this:

public class Customer {
  @NotNull
  @Pattern(regexp = "[A-Za-z -]*")
  private String name;
  private String addressLine;
  @Size(min=1, max=5)
  @Pattern(regexp = "[0-9]*")
  private String zip;
  @NotNull
  @Pattern(regexp = "[A-Za-z -]*")
  private String city;
}

As a next step, let us develop an update strategy factory which create update strategies with embedded Validator for JSR-303 Bean Validation constaints.

public class BeanValidator implements IValidator {
  private ValidatorFactory factory = Validation.buildDefaultValidatorFactory();

  @Override
  public IStatus validate(Object value) {
    Set<ConstraintViolation<Object>> violations = factory.getValidator().validate(value,
      new Class<?>[] { Default.class });
    if (violations.size() > 0) {
      List<IStatus> statusList = new ArrayList<IStatus>();
      for (ConstraintViolation<Object> cv : violations) {
        statusList.add(ValidationStatus.error(cv.getMessage()));
      }
      return new MultiStatus(Activator.PLUGIN_ID, IStatus.ERROR,
        statusList.toArray(new IStatus[statusList.size()]), "Validation errors", null);
    }
    return ValidationStatus.ok();
  }
}

public class StrategyFactory {
 public static UpdateValueStrategy getStrategy() {
   UpdateValueStrategy strategy = new UpdateValueStrategy();
   strategy.setAfterConvertValidator(new BeanValidator());
   return strategy;
 }
}

Using the StrategyFactory, the validation code inside of the composite becomes trivial:

public class CustomerComposite {
[...]
  public void bindValues(Customer model, DataBindingContext dbc) {
    dbc.bindValue(WidgetProperties.text(SWT.Modify).observe(namefield),
      BeanProperties.value(Customer.class, "name").observe(model),
      new UpdateValueStrategy(), StrategyFactory.getStrategy());

    dbc.bindValue(WidgetProperties.text(SWT.Modify).observe(zipfield),
     BeanProperties.value(Customer.class, "zip").observe(model),
     new UpdateValueStrategy(), StrategyFactory.getStrategy());
 [...]
}

An important property of the introduced validation approach is the fact, that it can be reused in other application layers (e.G. in service layer, or in data access layer). In other words you can use the same validation logic across the entire application and just remain valid…