Este post también está disponible en español.

Domain events are one of the tactical design patterns of Domain Driven Design (DDD).

They appeared after the blue book where Eric Evans introduces the core concepts of DDD. They had a great impact on the community because of the possibilities they provide.

On the strategic side of DDD, the community discovered that focusing on the main events that occurs on the business leads to a better understanding of the problem domain (for example that a customer made a purchase or that a product is ready to be shipped).

Events are a great tool that allows discovering and understanding complex business flows involving different actors. Successful exploratory practices like Event Storming are based on focusing on the main events of a company.

events_sample

The idea of this pattern is to make domain events an explicit part of our model. A domain event is a simple object that contains information about what has happened, for example:
class ProductBought(product: Product, buyer: Buyer, date: LocalDateTime): DomainEvent() {
}

It’s important to notice that domain events are not technical concerns (for example that an entity has been persisted or that a button was clicked), they are pure business concepts. They allow us to enrich our model and the ubiquitous language by introducing facts.

Events are triggered as a consequence of executing a command in our model. The aggregates are responsible for triggering the domain events. For example, if the command PublishCampaign is executed, our Campaign aggregate will generate the CampaignPublished event.

Domain events are published and, as a consequence, different handlers react and handle them. The event handlers are responsible for processing domain events (for example sending an email to notify that the campaign was published, acting on other aggregates to maintain eventual consistency or communicating to an external system so that a related process can be initiated).

An event handler can perform a business action inside the domain or an application action (like sending an SMS).

One of the cons of every event-based architecture is that reducing coupling introduces indirection. It’s harder to find all the effects of every action and to follow the sequence of execution.

External vs internal events

Domain events are always internal to a Bounded Context. They are first-class citizens in our model and our ubiquitous language.

There are many ways to integrate Bounded Contexts, but the most effective is by using events. These events are known as external or integration events and should not be confused with domain events, that are internal. For example, a Sales Bounded Context sends a ProductPurchased integration event to the Shipping Bounded Context so it can ship the product to the customer.

These integration events are also expressed terms of the domain language, but not the language of a particular subdomain like in the case of the ubiquitous language. They use a shared language that represents a contract between many areas of the company, this is known as a Published Language. Changing aspects of these events (attributes, names, etc.) requires a lot of analysis and agreements because a non-consensual change can make existing systems stop working.

Because they occur inside a Bounded Context, domain events can change freely. If something stops working, the code will not compile and the tests won’t pass. These events can also contain direct references to model objects (entities, aggregates, value objects).

On the contrary, integration events reference aggregates only by id and use basic data types. This way we avoid coupling other systems to our model.

external_events

Use cases

Domain events are commonly used in these situations:

  • Ensure eventual consistency between different aggregates of the same Bounded Context. When an aggregate-related command is executed, if there are some additional business rules related to other aggregates, they are usually processed through domain events. For example, if a customer makes more than 3 expensive purchases in a small period of time it is marked as suspected of fraud. One of the rules of aggregates indicates that only one instance should be modified in the same transaction, all dependent changes should happen on separate transactions. To guarantee the eventual consistency between the aggregates we can synchronize the changes using domain events.
  • Coordination between an aggregate root and it’s children. An aggregate root can listen for child entities’ events and perform calculations or aggregation tasks as a consequence. For example, recalculate the order total if the amount of any of it’s rows changes.
  • Allow Bounded Contexts integration by translating them into integration events in the public language. They synchronize the state of the Bounded Contexts and guarantees eventual consistency. This allows the systems to be highly scalable (since no distributed transactions like two-phase commit are used), resilient and with a low level of coupling between them.
  • Replace batch processes that are usually executed at low-traffic hours, processing high amounts of data. Instead of executing complex queries to detect the changes that occurred during the day and then perform complex maintenance tasks, with events is possible to do this kind of tasks as the changes occurs during the day.
  • Allow to have an audit log to inspect the activity over the main entities of the system by persisting the events.
  • Allow to implement Event Sourcing and CQRS patterns.

Some usage examples might be:

  • Verify a new payment method of a VISA credit card (by doing a $1 purchase and then doing a refund) after adding it to the user payment methods.
  • Send a welcome email after a new user signs up.
  • Add a “Best buyer” badge to a user after doing 10 purchases on the same month.
  • When a user creates an account with a referral, a gift card is generated for the referrer.
  • After creating an account the user obtains a 10% discount to be used on the first month.
  • After purchasing a product:
    • Send a confirmation email to the user.
    • Send a Slack notification to the business.
    • Notify the shipping department.

Structure of a domain event

Domain events are simple classes (POJOs) and are immutable.

Since they refer to something that occurred, its name is written in past tense (for example Product Shipped). In general events are created as a reaction to some command, so its name usually refers to the executed command (for example ShipProduct).

They usually have a timestamp with the date and time of the occurrence. They also have information about the involved entities.

When they are translated into integration events, they usually add an unique ID that allows other systems to detect if the event was already processed or not, ignoring duplicates. Depending on the publishing mechanism to other systems, it’s possible for an event to be sent more than once.

They usually are serializable so they can be transmitted between processes and be easily persisted.

abstract class DomainEvent {
	val id = UUID.randomUUID().toString()
	val ocurredOn: LocalDateTime = SystemClock().now()

	override fun equals(other: Any?) = other is DomainEvent && other.id == id

	override fun hashCode() = id.hashCode()
}

class ProductShipped(
	val productId: ProductId, 
	val address: Address, 
	val customerId: CustomerId
): DomainEvent() {
}

Decoupling

When different actions or business rules must be executed in response to a command, we can be tempted to do everything inside the same application service (or use case if we are using Clean Architecture):

class PurchaseProductHandler(
	private val orders: OrderRepository,
	private val emailNotifications: EmailNotifications,
	private val slackNotifications: SlackNotifications,
	private val bestBuyerBadgeChecker: BestBuyerBadgeChecker,
) {
	fun handle(command: PurchaseProductCommand) {
		val order = Order.create(orders.nextId(), command.productId, command.customerId)
		orders.add(order)
		
		emailNotifications.notifyNewPurchase(order)
		slackNotifications.notifyNewPurchase(order)
		bestBuyerBadgeChecker.productPurchased(command.customerId)
	}
}

This handler of the PurchaseProduct command (an application service or use case) is coupled to all the side effects of purchasing a product. This violates the principles of Single-Responsibility and Open-Closed making our code harder to maintain and increasing the risks of introducing bugs when making changes to different features.

With domain events we can give our application service the responsibility to execute the command in a single aggregate and then we can put each one of the other side effects on their own event handler to be handled separately:

class PurchaseProductHandler(
	private val orders: OrderRepository,
	private val eventPublisher: DomainEventPublisher,
) {
	fun handle(command: PurchaseProductCommand) {
		val order = Order.create(orders.nextId(), command.productId, command.customerId)
		orders.add(order)
		eventPublisher.publish(order.recordedEvents)
	}
}

class SendEmailOnProductPurchased(
	private val emailNotifications: EmailNotifications,
): DomainEventHandler<ProductPurchased> {
	fun on(event: ProductPurchased) {
		emailNotifications.notifyNewPurchase(event.order)
	}
}

class NotifySlackOnProductPurchased(
	private val slackNotifications: SlackNotifications,
): DomainEventHandler<ProductPurchased> {
	fun on(event: ProductPurchased) {
		slackNotifications.notifyNewPurchase(event.order)
	}
}

class BestBuyerBadgeOnProductPurchased(
	private val bestBuyerBadgeChecker: BestBuyerBadgeChecker,
): DomainEventHandler<ProductPurchased> {
	fun on(event: ProductPurchased) {
		bestBuyerBadgeChecker.productPurchased(event.order.customerId)
	}
}

Because events are modeled explicitly, it’s easier to find if a certain side effect is implemented or not and where it is. With the original approach we will have to look in the code to find if the rule was implemented or not.

Synchronous or asynchronous

Domain events can be implemented in a synchronous (executed immediately in the same process) or asynchronous way. Integration events are always executed asynchronously.

Layers

Publishing and handling of events is an application layer concern and not a domain layer concern. Domain layer only deals with pure business matters (what a domain expert can understand) and not infrastructure issues like event dispatching.

Event handlers are usually located inside the application layer because they usually depend on some infrastructure services (although some authors also consider putting handlers within the domain). An event handler is very similar to an application service or use case. It is responsible for orchestrating domain objects such as aggregates or domain services. They can also invoke external services (send emails, publish integration events in other Bounded Contexts, etc.).

Transactions

Classic authors (Eric Evans, Vaughn Vernon) say that each aggregate must be executed in a single atomic transaction. Side effects that produce changes in other aggregates must be implemented using eventual consistency.

This is recommended because in large-scale applications, large transactions generate many database locks affecting scalability due to concurrency issues. Also, atomic changes are not usually required by the business.

transactions

Other authors, such as Jimmy Bogard, says that there is not problem with sharing the same transaction to make changes to different aggregates as long as the side effects are directly related to the command that was executed. If a cascade of effects occurs (a command generates an event which in turn generates a command and then another event is generated…) it’s no longer recommended. It’s also discouraged if the operations are slow and therefore whoever executes the command must wait.

single_transaction

My recommendation is to focus on understanding well the business requirements so you can make a good trade-off, balancing all the pros and cons. A mixed scheme can also be used where some handlers are processed synchronously in the same transaction and others asynchronously in other transactions.

Publish strategies

1) Observer pattern

The most simple way to implement domain events is with the Observer pattern.

With this implementation, each aggregate manages a collection of observers who are notified when an event occurs.

All observers implement a common interface, this protects the aggregate from knowing their particular implementations.

interface Observer {
	fun on(event: DomainEvent)
}

class Customer {
	private val observers = mutableListOf<Observer>()
	// ...

	fun subscribe(observer: Observer) {
		observers.add(observer)
	}

	private fun notify(event: DomainEvent) {
		observers.forEach { it.on(event) }
	}

	fun move(newAddress: Address) {
		// ...
		notify(CustomerMoved(id, newAddress))
	}
}

The problem with this approach is in the subscription and unsubscription of the aggregate. An application service should subscribe observers (or itself) to the aggregate and then remember to unsubscribe them (to avoid unwanted effects). Publishers and observers are coupled with this approach.

Another problem is caused because the event is published before the aggregate is persisted. In case of a persistence error, the change will not occur (because the aggregate’s new state will not be saved) but the event fill be fired, notifying of a fact that didn’t happen.

With this implementation handlers’ execution cannot be asynchronous.

2) In-Memory Bus

An in-memory bus can be used to decouple the publishers and the observes. This allows the subscription and publication to be made by an intermediary, the bus.

This implementation also allows the asynchronous execution of the handlers, the bus can determine when it’s best to notify each handler.

One of the problems with this approach is that you need to pass the bus reference throughout the domain. In addition to being cumbersome, it couples the domain to technical issues.

class Customer {
	// ...

	fun move(newAddress: Address, bus: MessageBus) {
		// ...
		bus.publish(CustomerMoved(id, newAddress))
	}
}

3) DomainEvents static class

This implementation, popularized by Udi Dahan, implies doing things that we usually don’t like to do: use static classes/singletons and coupling our domain to technical concerns.

interface EventHandler {
	fun on(event: DomainEvent)
}

object DomainEvents {
	private val handlers = mutableListOf<EventHandler>()

	fun register(handler: EventHandler) {
		handlers.add(handler)
	}

	fun raise(event: DomainEvent) {
		handlers.forEach { it.on(event) }
	}
}

class Customer {
	// ...

	fun move(newAddress: Address) {
		// ...
		DomainEvents.raise(CustomerMoved(id, newAddress))
	}
}

One problem of this implementation is that because it uses a static class or singleton, the reference is shared by multiple threads. This can bring concurrency problems and the need for synchronization, in addition to the problems associated with sharing global state.

4) Returning events

All previous methods published events from inside the aggregate. This has the following problems:

  • The handler is executed before the new aggregate’s state is persisted. This handler could notify that a change has occurred and yet it hasn’t happened yet. It could also be querying outdated information.
  • If the database transaction fails at the moment of saving the aggregate (for example because of a concurrency problem) the event has already been processed. For example, you could be sending an email to a user about an event that ultimately did not happen.

In this implementation, detailed by Jan Kronquist, each method of the aggregate that generates events returns them directly to the caller (application service or use case for example). The caller is in charge of publishing them (perhaps by using a bus or domain event publisher) and can coordinate with a repository to ensure that the publishing occurs after the persistence.

class Customer {
	// ...

	fun move(newAddress: Address): List<DomainEvent> {
		// ...
		return listOf(CustomerMoved(id, newAddress))
	}
}

With this approach it’s possible to decouple the domain from the publication and therefore from infrastructure concerns. It has the downside of increasing the complexity of method’s signatures.

5) RecordedEvents

This approach, described by Jimmy Bogard, is a variant of the previous one. The difference is that instead of returning the events on each method, the aggregate registers them in a collection of generated events (recorded events). After the execution of the aggregate, the application service / use case can read all the events generated from this property and publish them directly or indirectly through a bus or publisher.

class Customer {
	private val recordedEvents = mutableListOf<DomainEvent>()
	// ...

	fun move(newAddress: Address) {
		// ...
		recordedEvents.add(CustomerMoved(id, newAddress))
	}

	fun getRecordedEvents() = recordedEvents.toList()
}

The publishers, after publishing the events, usually clean the aggregate recorded events’ collection so that it can be reused.

If we want the handlers to be executed in the same transaction, we can publish the events within the aggregate’s repository:

class PostgreSQLCustomerRepository(
	private val db: DB, 
	private val eventPublisher: DomainEventPublisher
): CustomerRepository {
	fun update(customer: Customer) {
		db.transactional {
			db.execute("UPDATE customers SET ...")
			eventPublisher.publish(customer.recordedEvents)			
		}
	}
}

In this way we centralize the publication of the events for each aggregate and we ensure that they are executed within the same transaction atomically.

Asynchrony

If we want the publishing of the events to happen asynchronously, we will surely use some message broker like RabbitMQ or Kafka. We then will have workers that receive the events and execute them in another process.

To do this, we should publish the events with an asynchronous DomainEventPublisher after commiting the transaction that performs the aggregate’s persistence:

class PostgreSQLCustomerRepository(
	private val db: DB, 
	private val eventPublisher: AsyncDomainEventPublisher
): CustomerRepository {
	fun update(customer: Customer) {
		db.transactional {
			db.execute("UPDATE customers SET ...")
		}
		eventPublisher.publish(customer.recordedEvents)			
	}
}

One problem with this approach is that if the publishing fails, the events are not triggered, however the aggregate has already been modified.

If, on the other hand, we decide to publish the events before saving the aggregate, we have the opposite problem: the database operation could fail and yet the events have already been triggered.

Another problem is that the handlers could try to query information from the aggregate but would find a previous version where the changes haven’t yet occurred.

A solution to this problem is to use the Outbox pattern.

Outbox pattern

The Outbox pattern ensures that events are published reliably.

To ensure this, a database table is used as an “outbox” for events. Recorded events are saved in this table in the same transaction that persists the aggregate. This ensures that the events and the aggregate are saved atomically.

Then, a worker process reads this table, publishes the events and marks them as processed (or deletes them from the table).

outbox

One aspect to consider is that, when the worker is calling the handlers, it may fail (having already sent the event to some handlers and not to others). If this occurs, the event is not marked as processed. This would make the worker send the event again to some handlers in the next execution. This delivery strategy is called At-Least-Once, handlers must be designed to deal with duplicate events (they must be idempotent).

Event Sourcing

Event sourcing is a persistence pattern. Instead of saving the aggregate’s last state, all the domain events that changed it’s state are saved. Every time you want to fetch the aggregate from the persistence engine, all the events are processed again and it’s state is reconstituted.

Many times we want to know the current state of our world, but other times we want to know how we got there.

The chronological events that occured to an aggregate are called the Event Stream.

event_sourcing

This persistence strategy has some benefits:

  • Because there are no modifications but only INSERTs, it’s extremely performant, no locks are generated. An application using Event Sourcing has very low latency and high scalability.
  • By having the complete history of domain events, a lot of useful information and analytics is available to us. We can analyze usage trends, use history for debugging, etc.
  • If we have to develop a new feature (for example a new report), we’ll have all the information from the beginning of the application. If only the last state is saved, we may not have part of the information (because we didn’t register it) and therefore that report can only show the information since the creation of the new feature.

It also has some disadvantages:

  • Depending on the number of events that an aggregate has, its reconstitution may not be very performant. In practice, this doesn’t happen very often (an aggregate usually doesn’t have more than 15 or 20 events) but if so, we can save incremental snapshots of the state of the aggregate at certain points in time. With this approach we can recreate the aggregate from the last snapshot and not from the beginning. By having all the events we can always regenerate the snapshots if we need to.
  • It adds complexity to the solution, it usually requires the use of CQRS to be able to have the queries separated from the commands since is not performant to make the queries that the UI requires on the events table. In this case, denormalized tables are usually created for each query and updated as new events are generated. The components in charge of performing these updates are called projections.
  • Adding new properties or renaming events is often problematic because the history cannot be changed easily. It’s usually necessary to manage different versions of the events, which adds more complexity.

Sagas and Process Managers

If we use eventual consistency we’ll’ have to coordinate business processes that involve many aggregates so that they eventually become consistent with each other.

Suppose a customer buys a product. After purchasing it, a stock reservation must be made and finally a shipping order must be generated. This entire process must be consistent (we cannot buy a product if there’s no stock or generate an invoice without a shipment being made).

The saga is in charge of the coordination of this process and in the case that a step cannot be carried out, it generates compensating actions to undo what has already been done.

The saga listens for events triggered by the relevant components and issues commands to other components. If a step fails, it takes care of generating a compensatory command so that the state of the system is consistent.

saga_events

A saga may need to keep state to know the operations already executed and to be able to calculate the compensatory actions.

There are two ways to implement the saga pattern:

  • Choreography. In choreography there isn’t a central component organizing the entire process, each component that participates exchanges events and acts according to them.

    The choreography does not have a central point of failure because the responsibilities are distributed. The execution flow can be difficult to follow and there is a risk of introducing cyclic dependencies.

  • Orchestration. In the orchestration there is a central component that listens to events, triggers commands and organizes the entire flow.

For complex workflows it’s easier to see the entire process. It introduces a single point of failure.

Saga is often confused with a process manager. A saga is a process (centralized or not) that is usually in charge of compensatory actions in a multi-transactional flow. A process manager is responsible for orchestrating an entire process in a centralized way. When we implement sagas with orchestration we usually use process managers.

The classic example of a saga is when you want to buy a tour package:

saga

Policies

The practice of event storming uses colored post-its to represent the different elements of the domain and the relationships between them.

event_storming

The most common flow is as follows:

  • Users use a read model (the result of a CQRS query) in the UI in order to determine which action they want to perform. That action generates a command.
  • The command is executed by an aggregate.
  • The aggregate triggers a domain event.
  • The domain event updates a read model that is later consumed by the user.
  • The cycle begins again.

Some events are caught by another type of object, a policy. A policy is a handler that determines that on some events, some commands must be executed.

For example, a system can have a “WelcomePolicy” that every time a users is registered decides to send him a welcome email. There could be a “GreatBuyerDiscountPolicy” where if a user exceeds some amount of purchases in a month, a 40% discount is applied for future purchases in the same month.

These policies are nothing more than event handlers, but they represent domain concepts that are usually present in the ubiquitous language when you talk to business experts.

Passage of time events

Domain events are typically triggered as a result of the execution of a command. But this is not always the case.

There is another very common case: temporal events. A temporal event is a domain event that occurs at a specific date or time. For example, it’s friday, it’s 12 o’clock, it’s the last week of the month, it’s Christmas, it’s the day to pay salaries, etc.

These temporary processes often run inside cron jobs or scheduled commands. These implementations mean that a certain portion of the domain logic is not fully encapsulated within the model (the information of when some relevant temporal event occurs, for example).

One solution to this problem is to explicitly model the passage of time using domain events. With this approach we use a cron or scheduler to emit generic time events (like an hour passed, a day passed, etc.) and then we can have handlers that listens to them and triggers more relevant domain events (it’s Black Friday, Christmas bound pay day, etc.).

A handler can listen to “day passed” events and check when a trial period ends, for example.

Another handler might determine that an invoice is overdue and that a notification needs to be sent to the customer.

Conclusion

Domain events are a great tool to consider when modeling complex domains. They tend to help creating decoupled systems, encapsulated domains, and model business concepts more explicitly.

Like any other tool, it has trade-offs and multiple ways to implement it.

Is up to us to analyze our particular contexts and decide the best way to use them.

References