Working With Broadleaf Messaging and Spring Cloud Stream Config

Overview

Broadleaf uses Spring Cloud Stream as an abstraction layer for interacting with messages and brokers allowing the framework to support a number of backing broker implementations that are natively supported by Spring Cloud Stream (e.g. Kafka, Google Pub/Sub, Amazon Kinesis, Azure Event Hubs etc...). By default, our reference implementation and starter projects use Kafka as an example. 

NOTE: With this setup, we highly recommend and leverage the binder's "auto create resources" capabilities such as the "auto create topics" configuration defined here for the Kafka Binder:  https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_configuration_options . Most other Spring Cloud Stream Binders outside of Kafka also have this support as well

Tips

If for some reason, you need to understand the full spectrum of Spring Cloud Stream bindings (whether you need to generate them yourself or you need redefine them for example to prefix/suffix the topics in case of a shared instance etc...) The best way to do this is pull in a report/list of all the generated bindings given your specific Flex Package Composition. Each Broadleaf service (Jar) defines its own bindings and can be overridden and re-defined and can vary based on how the Flex Package is composed and deployed. There are several mechanisms to help with this which will be outlined below.

Enable Binder Actuator Endpoints

One good way to understand this list is to startup your specific application locally (or in a controlled environment) with auto-create resources enabled and visit the Spring Boot Actuator endpoints. When this is launched with the following endpoints active, you can then visit for example (based on your Flex Package) to get a list of all registered bindings for that particular deployment:

  • min: "https://localhost:8447/actuator/bindings"
  • indexer: "https://localhost:8462/actuator/bindings"
  • browse: "https://localhost:9447/actuator/bindings"
  • cart: "https://localhost:9458/actuator/bindings"
  • processing: "https://localhost:9461/actuator/bindings"
  • supporting: "https://localhost:9457/actuator/bindings"
NOTE: to enable these actuator endpoints using the following configuration in the appropriate application yml files:
broadleafdemo:
  actuator:
    anonymous: true
management:
  endpoints:
    web:
      exposure:
        include:
          - bindings
          - "*"

See: (https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding_visualization_control)

Utilizing Broadleaf's Environment Report

During Application Startup, Broadleaf includes an Environment Report that details the active and inactive channels for that particular Flex Package/Deployment. This report is emitted via the console during startup and may look something like this:

* Broadleaf Environment Report
*
* Report of environment factors affecting this autoconfigured application.
* This information is useful when attempting to configure the application
* by understanding what config the system determines is currently being used.
* This information is helpful in diagnosing configuration issues,
* including those that might impact messages and async communication.
* You can disable this report by setting the
* `broadleaf.environment.report.disabled` property to true.
*
* CHANNEL INFORMATION (Note - especially disabled OUTPUT type channels may have
* an impact on listeners perhaps defined in another application.)
*
* Name: com.broadleafcommerce.notification.service.messaging.NotificationAuthenticationMessageConsumer         , Type: INPUT , Enabled: false, Properties: broadleaf.notification.messaging.active                       , Reason: Properties `broadleaf.notification.messaging.active` specified as false
* Name: com.broadleafcommerce.notification.service.messaging.NotificationCheckoutCompletionConsumer            , Type: INPUT , Enabled: false, Properties: broadleaf.notification.messaging.active                       , Reason: Properties `broadleaf.notification.messaging.active` specified as false
* Name: com.broadleafcommerce.notification.service.messaging.NotificationFulfillmentCancelledConsumer          , Type: INPUT , Enabled: false, Properties: broadleaf.notification.messaging.active                       , Reason: Properties `broadleaf.notification.messaging.active` specified as false
* Name: com.broadleafcommerce.notification.service.messaging.NotificationFulfillmentFulfilledConsumer          , Type: INPUT , Enabled: false, Properties: broadleaf.notification.messaging.active                       , Reason: Properties `broadleaf.notification.messaging.active` specified as false
* Name: com.broadleafcommerce.adminuser.tenant.message.TenantSyncPersistenceConsumer                           , Type: INPUT , Enabled: true , Properties: broadleaf.adminuser.data.tenant.sync.active                   , Reason: Explicitly enabled
* Name: com.broadleafcommerce.bulk.messaging.BulkTriggeredJobEventConsumer                                     , Type: INPUT , Enabled: true , Properties: broadleaf.bulk.messaging.active                               , Reason: Not explicitly declared, but matchIfMissing used.
...
Note: You can disable this report using the following property: "broadleaf.environment.report.disabled"
In the console, you will notice all the registered Spring Cloud Stream binder classes associated with the deployment. Broadleaf allows you to turn on and off each of these bindings via a property. From here you could go and look at the source of each of these classes to see all the registered Input and Output channels (or correlate these back to the registered bindings in the output of the Actuator endpoint mentioned above)

Deploy Kafdrop (if utilizing Kafka)

If you're looking for a visual way to see the topics and consumers locally when running Kafka, it may be useful to utilize a tool like Kafdrop (https://github.com/obsidiandynamics/kafdrop). You can do this easily by including the following configuration in your `docker-compose.yml` file:

  kafdrop:
    depends_on:
      - localkafka
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "localkafka:29092"
      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    networks:
      - kafkanet

Other Resources Worth Reading: