Cloud Native Integration: A working Demo

This repository contains a working demonstration of how to quickly implement a Cloud Native architecture approach (with Enterprise Integration patterns as a central concern) as distilled in What is Cloud Native Integration.

Demo Description

As we distill the high level system architecture as described by the Cloud Native Integration document

Cloud Native Integration: The View From Space

We notice a few distinct architectural layers:

Event Mesh The event mesh intends to handle peer to peer event communication in a fashion that allows for several Cloud Native characteristics such as high availability, reliability, and location agnostic behaviour between peers. The event mesh acts as a rendezvous point between eventing peers (event emitters and event receivers), and provides event emitters a graph of event receivers that may span clusters or even clouds.

Event Sink The event sink provides a port to the underlying event bus our integrations process events on.

Event Bus The event bus provides a service bus so that event processors, that often need to integrate multiple different data sources to provide event aggregate level output, are able to communicate with each other in an asynchronous fashion. This decouples event emitters and receivers, as well as decoupling event aggregate behaviour that distills our series of events into meaningful business level data. As a result, integrations are bound to their domain, and are likely decomposed along their bounded context. It is along the event bus that features that constitute our enterprise perform the stream processing and integration work that satisfies enterprise features. Inevitably, these stream processors and integration components aggregate events into sources of truth to maintain consistency and state across the enterprise.

Event Store The event store represents a persistent source of truth for events and event aggregates. We define aggregate event as events that attempt to provide consistency boundaries for transactions, distributions and concurrency. In our view, while events may be represented in a transitive store such as Kafka, event aggregates define consistency across our data plane and represent a conceptual whole for state at a point in time for our domain entitties. As a result, our event store should be viewed as a conceptual whole that can offer capabilities as a source of truth for volatile, short term events, and a source of truth for long standing event aggregates.

Representing these Ports, Adapters and Layers

This demo seeks to display these architectural techniques by leveraging the Red Hat Integration platform.

Event Mesh To provide event mesh capabilities, this demo leverages Apache Qpid Dispatch Router to ensure a service and event communication control plane that provides capabilities for high availability, resilience, multi and hybrid cloud as well as policy to apply over the mesh to ensure governance is properly applied as our event emitters and event receivers communicate in a peer to peer fashion

Event Sink To provide an event sink as a port into our underlying business logic and sources of truth, we leverage Apache Camel and a set of complementary cloud native tooling

Event Bus The event bus provides a normalized means of asynchronous behaviour for event consumers and emmitters. As Cloud Native Integration depends on and features cloud native capabilities, we leverage Knative Eventing to provide a cloud native service bus abstraction with Apache Kafka as the underlying persistence engine for our communication over our service bus channels.

Event Store As Apache Kafka is the persistence store for our volatile events and event aggregates that travel along our service bus, this demonstration relies on a traditional OLTP store as the inevitable source of truth for event aggregates. While an OLTP store is not required as a source of truth for event aggregates in our view of the world, it describes the complementary nature of Cloud Native Integration to traditional legacy enterprise deployments.

Getting Started

Quick Note: This demonstration uses Openshift 4.x which relies on Kubernetes 1.16 and above. As a result, despite the use of Operator Hub and Red Hat disitributed Operators, the steps outlined in this document may be followed in any Kubernetes distro that is 1.16 or higher and a matching community version of the operators being deployed

Installing Operators

From the Openshift Operator Hub, install the following operators (in our case, we’ll install these operators with cluster admin credentials, and will allow these operators to observe all namespaces).

  • Red Hat Integration - AMQ Streams
  • Red Hat Integration - AMQ Certificate Manager
  • Red Hat Integration - AMQ Interconnect (this operator will need to be installed in multiple namespaces due to its limitation of watching a single K8s namespace)
  • Openshift Serverless Operator
  • Camel K Operator

We should now find Operators running in the OpenshiftOperators namespace: OpenshiftOperators

Preparing For Deployment

At this point, we have installed the required operators; however, our environment still isn’t ready to start laying down deployments.

Install Knative Serving

To install Knative Serving which provides our serverless framework by the OpenshiftServerless Operator, there are a few more steps.

  • Create the knative-serving namespace with a user that posseses the cluster-admin role:
oc create namespace knative-serving
  • Apply the knative serving yaml to the cluster as described in the following
apiVersion: operator.knative.dev/v1alpha1
kind: KnativeServing
metadata:
    name: knative-serving
    namespace: knative-serving

Upon succesfful installation, there should be a similar result to the following:

oc get pods -w -n knative-serving 
NAME                                READY     STATUS    RESTARTS   AGE
activator-7db4dc788c-spsxb          1/1       Running   0          1m
activator-7db4dc788c-zqnnd          1/1       Running   0          45s
autoscaler-659dc48d89-swtmn         1/1       Running   0          59s
autoscaler-hpa-57fdfbb45c-hsd54     1/1       Running   0          49s
autoscaler-hpa-57fdfbb45c-nqz5c     1/1       Running   0          49s
controller-856b4bd96d-29xlv         1/1       Running   0          54s
controller-856b4bd96d-4c7nn         1/1       Running   0          46s
kn-cli-downloads-7558874f44-qdr99   1/1       Running   0          1m
webhook-7d9644cb4-8xkzt             1/1       Running   0          57s

Install Knative Eventing

Knative Eventing provides functionality around our Cloud Event abstraction and forms the operational basis of our cloud native service bus.

To install Knative Eventing we need to perform the following steps (with a cluster admin user):

  • Create the knative-eventing namespace
oc create namespace knative-eventing

Upon creation of the namespace, we will want to create the knative eventing operators by applying the following cr

In our case, as we’ll have need later, we’ll install the Multi Tenant Channel Based Broker as our default Knative Eventing Broker implementation:

apiVersion: operator.knative.dev/v1alpha1
kind: KnativeEventing
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec:
  defaultBrokerClass: MTChannelBasedBroker

Upon applying this yaml, something similar to this should be true:

oc get pods -w -n knative-eventing 
NAME                                   READY     STATUS    RESTARTS   AGE
broker-controller-67b56668bd-sgxwg     1/1       Running   0          1m
eventing-controller-544dc9945d-pz2cl   1/1       Running   0          1m
eventing-webhook-6c774678b5-lzfkn      1/1       Running   0          1m
imc-controller-78b8566465-smpb7        1/1       Running   0          1m
imc-dispatcher-57869b44c5-s7t92        1/1       Running   0          1m

At this point, we have installed the knative eventing controllers, dispatchers and webhooks; however, we only have support for the in memory channel, which means we will not be able to use a persistent approach to knative eventing brokers and their respective channels in our cluster.

Installing Kafka Channels

In our above Cloud Native Integration schematic, what lies at the heart of our event bus is Apache Kafka so for knative eventing to follow this architectural construct, we need our broker channels to be persisted by Kafka. While this component is not generally available, it has reached a considerable maturity level and relies largely on its surrounding ecosystem that is generally available.

In this demonstration, we will deploy a KafkaChannel deployment that creates a set of CR’s, controllers for our channels, requisite service accounts, and webhooks to instrument creation, removal and discovery of KafkaChannels that may be associated with a Knative eventing Broker.

By issuing the following command (this assumes cluster admin permissions for the user issuing commands and that the previous knative-eventing steps have taken place):

oc apply -f ./src/main/install/kafka-channel/kafka-channel-install.yaml
clusterrole.rbac.authorization.k8s.io/kafka-addressable-resolver created
clusterrole.rbac.authorization.k8s.io/kafka-channelable-manipulator created
clusterrole.rbac.authorization.k8s.io/kafka-ch-controller created
serviceaccount/kafka-ch-controller created
clusterrole.rbac.authorization.k8s.io/kafka-ch-dispatcher created
serviceaccount/kafka-ch-dispatcher created
clusterrole.rbac.authorization.k8s.io/kafka-webhook created
serviceaccount/kafka-webhook created
clusterrolebinding.rbac.authorization.k8s.io/kafka-ch-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-ch-dispatcher created
clusterrolebinding.rbac.authorization.k8s.io/kafka-webhook created
customresourcedefinition.apiextensions.k8s.io/kafkachannels.messaging.knative.dev created
configmap/config-kafka created
configmap/config-leader-election-kafka created
service/kafka-webhook created
deployment.apps/kafka-ch-controller created
mutatingwebhookconfiguration.admissionregistration.k8s.io/defaulting.webhook.kafka.messaging.knative.dev created
validatingwebhookconfiguration.admissionregistration.k8s.io/validation.webhook.kafka.messaging.knative.dev created
secret/messaging-webhook-certs created
deployment.apps/kafka-webhook created

We should now see new contoller, dispatcher, and webhook pods in our knative-eventing namespace:

NAME                                   READY     STATUS    RESTARTS   AGE
broker-controller-67b56668bd-sgxwg     1/1       Running   0          1h
eventing-controller-544dc9945d-pz2cl   1/1       Running   0          1h
eventing-webhook-6c774678b5-lzfkn      1/1       Running   0          1h
imc-controller-78b8566465-smpb7        1/1       Running   0          1h
imc-dispatcher-57869b44c5-s7t92        1/1       Running   0          1h
kafka-ch-controller-7f88b8c776-crhm4   1/1       Running   0          1m
kafka-webhook-b47dc9767-hmnm4          1/1       Running   0          1m

We should also now have a resource that describes resources that will inevitably describe how channels are bound to Kafka topics in our Knative Eventing framework.

oc api-resources | grep messaging.knative.dev
channels                              ch               messaging.knative.dev                 true         Channel
inmemorychannels                      imc              messaging.knative.dev                 true         InMemoryChannel
kafkachannels                         kc               messaging.knative.dev                 true         KafkaChannel
subscriptions                         sub              messaging.knative.dev                 true         Subscription

If we have gotten this far we have most of our infrastructure assembled from an operator perspective and now its time to being installing our concrete implementation.

Installing the Demo

At this point our operators are installed and we have mostly configured our environment; however, we still have a few house keeping tasks to take care of:

  • We need to create and install a trust store so that we may communicate to and between clusters in a secure fashion
  • We need to ensure proper trust and authority is distributed to the correct places in our cluster

Using the AMQ Certificate Manager Operator

We will use the cert-manager operator that we’ve previously provisioned to issue certificates that we’ll need to wire secure connections across the cluster.

Creating a CA to use across the cluster

For our puroposes we will create a Certificate Authority using OpenSSL. Initially, the following command should be issued to generate the private key for our CA

openssl  genrsa -des3 -out cloudEventMeshDemoCA.key 2048

OpenSSL will prompt for a passphrase. It is recommended to use a passphrase even in a development environment, and especially when cluster resources may be accessible from outside of the cluster.

We’ll use the key to create a root certificate that will act as our certificate authority:

openssl req -x509 -new -nodes -key cloudEventMeshDemoCA.key -sha256 -days 1825 -out cloudEventMeshDemoCA.crt

At this point, you will be asked for a passphrase for again, and as always, it is apropos to use one and not skip this step. While creating this CA OpenSSL will ask for OU’s, DN’s, etc., and it may be important to use meaningful values for these as it may be required during later configuration.

Upon completing this process, we will now have an available CA to sign with, and we’ll use the AMQ distribution of the certificate manager to establish our CA as a certificate issuer across the cluster.

For convenience sake, we have included a secret in this demo that we should apply to where our certificate manager operator lives (in our case the project openshift-operators)

oc apply -f ./src/main/k8s/CA/cloud/cloud-native-event-mesh-ca-secret.yaml

This could also be accomplished by creating a secret from the the CA private and public key created above.

Now that we have created our CA keypair secret, let’s create a certificate manager issuer that uses our CA:

apiVersion: certmanager.k8s.io/v1alpha1
kind: ClusterIssuer
metadata:
  name: cloud-native-event-mesh-demo-cert-issuer
spec:
  ca:
    secretName: cloud-native-event-mesh-demo-ca-pair

Upon completing our application of the clusterissuer resource we should have a cluster issuer.

oc describe clusterissuer cloud-native-event-mesh-demo-cert-issuer 
Name:         cloud-native-event-mesh-demo-cert-issuer
Namespace:    
Labels:       <none>
Annotations:  kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"certmanager.k8s.io/v1alpha1","kind":"ClusterIssuer","metadata":{"annotations":{},"name":"cloud-native-event-mesh-demo-cert-issuer","name...
API Version:  certmanager.k8s.io/v1alpha1
Kind:         ClusterIssuer
Metadata:
  Creation Timestamp:  2020-06-29T17:11:47Z
  Generation:          2
  Resource Version:    1485153
  Self Link:           /apis/certmanager.k8s.io/v1alpha1/clusterissuers/cloud-native-event-mesh-demo-cert-issuer
  UID:                 90035a64-67c1-4440-9526-a87d1297bfa2
Spec:
  Ca:
    Secret Name:  test-key-pair
Status:
  Conditions:
    Last Transition Time:  2020-06-29T17:11:47Z
    Message:               Signing CA verified
    Reason:                KeyPairVerified
    Status:                True
    Type:                  Ready
Events:
  Type    Reason           Age              From          Message
  ----    ------           ----             ----          -------
  Normal  KeyPairVerified  8s (x2 over 8s)  cert-manager  Signing CA verified

We are now free to issue certificates in our cluster. This will be critical to setting up our next steps, the event mesh.

Installing the Event Mesh

Our event mesh will span three different namespaces in our cluster; however, our intention is to logically represent three seperate clusters in our topology. As a result, we will need to create trust in each of these namespaces for the other routers in our event mesh, as, we would not allow insecure communication from cluster to cluster.

Creating the namespaces

At this point we will wand to create the following namespaces:

  • cluster-1
  • cluster-2
  • edge - this will represent an edge cluster in our topology. While some topologies may not call for an edge cluster, we still want to ensure that we use an edge router somewhere in our openshift cluster to ensure that we have connection concentration, a single source of policy application to incoming requests from outside of the cluster, as well as a terminal leaf node for our event mesh graph.
Installing the Interconnect Router in cluster-1

As we have use of the operator hub in Openshift 4, we will simply install an Interconnect Operator to the namespace “cluster-1”. Installing the Interconnect Operator in Cluster 1

At this point, we will be able to start creating some certificates from the cluster issuer we have already established and inevitably configure our Interconnect router for trust. In the namespace “cluster-1”, we will provision a certificate for our Interconnect router which we will use to wire up the Interconnect router for trust across inter-router connections.

Please Note: The interconnect router has self-signed a certificate using the certificate manager, and the demonstrated use of certificate management here is only applicable to inter-router conncetions

Initially, we’ll lay down the certificate request custom resource:

oc apply -f ./src/main/k8s/cloud-1/router/cloud1-certificate-request.yaml

This certificate request:

apiVersion: certmanager.k8s.io/v1alpha1
kind: Certificate
metadata:
  name: cluster-wide-tls
spec:
  secretName: cluster-wide-tls
  commonName: openshift.com
  issuerRef:
    name: cloud-native-event-mesh-demo-cert-issuer
    kind: ClusterIssuer
  dnsNames: 
     - openshift.com
     - eipractice.com
     - opentlc.com

Will leverage the certificate manager to create a secret in the cluter referred to as “cluster-wide-tls” which holds the CA certificate, private key, and other things to ensure trust as issued by the ClusterIssuer we have created above.

Upon issuing the certificate, it is now time to apply our Interconnect router custom resource for the cluster-1 namespace:

oc apply -f ./src/main/k8s/cloud-1/router/cloud1-mesh-router.yaml 

This enables a few features as laid out by the Interconnect custom resource:

apiVersion: interconnectedcloud.github.io/v1alpha1
kind: Interconnect
metadata:
  name: cloud1-router-mesh
spec:
   sslProfiles:
   - name: inter-router
     credentials: cluster-wide-tls
     caCert: cluster-wide-tls
   deploymentPlan: 
      role: interior 
      size: 1
      placement: AntiAffinity
   interRouterListener: 
      sslProfile: cloud1-router-tls
      expose: false
      authenticatePeer: false
      port: 55671

This creates an SSL Profile based on our CA certificate as issued to us by the cluster issuer as well as establishes an interRouterListener backed by this SSL Profile. The router will also configure other things by default, and with self-signed security, such as:

  • A default AMQP listener secured by a self-signed certificate
  • Metrics and management capabilities
  • A means for Prometheus or other AMP technologies to observe the event mesh router as well as the link attachments that event peers make over the event mesh

At this point, in cluster-1 we will have both an event mesh router and the Interconnect operator in our “cluster-1” namespace:

[mcostell@work router]$ oc get pods -w -n cluster-1
NAME                                    READY     STATUS    RESTARTS   AGE
cloud1-router-mesh-7f698d8c65-wpnx7     1/1       Running   0          13m
interconnect-operator-56b7884d4-6j4jl   1/1       Running   0          5h

 oc get interconnects 
NAME                 AGE
cloud1-router-mesh   17m

We can also introspect the router via oc exec commands:

[mcostell@work router]$ oc exec cloud1-router-mesh-7f698d8c65-wpnx7 -i -t -- qdstat -g
2020-06-29 23:23:49.882216 UTC
cloud1-router-mesh-7f698d8c65-wpnx7

Router Statistics
  attr                             value
  ========================================================================================
  Version                          Red Hat AMQ Interconnect 1.8.0 (qpid-dispatch 1.12.0)
  Mode                             interior
  Router Id                        cloud1-router-mesh-7f698d8c65-wpnx7
  Worker Threads                   4
  Uptime                           000:00:22:17
  VmSize                           497 MiB
  Area                             0
  Link Routes                      0
  Auto Links                       0
  Links                            2
  Nodes                            0
  Addresses                        11
  Connections                      1
  Presettled Count                 0
  Dropped Presettled Count         0
  Accepted Count                   24
  Rejected Count                   0
  Released Count                   0
  Modified Count                   0
  Deliveries Delayed > 1sec        0
  Deliveries Delayed > 10sec       0
  Deliveries Stuck > 10sec         0
  Deliveries to Fallback           0
  Links Blocked                    0
  Ingress Count                    24
  Egress Count                     23
  Transit Count                    0
  Deliveries from Route Container  0
  Deliveries to Route Container    0

Installing the Interconnect Router in cluster-2

Now that we have an event mesh router in cluster-1, we will link routers together to form an event mesh between cluster-1 and cluster-2. Please Note - the intention of this logical delineation is to represent 2 seperate clusters. In practice, interior event mesh routers would bind remote clusters together

The process of enabling inter-router connections between event mesh routers in namespaces cluster-1 and cluster-2 is similar to the provisioning that was required for the cluster-1 event mesh router.

Initially, we want to install the Red Hat - Interconnect Operator in the namespace cluster-2. As cluster-2 will also issue its certificates from the cluster-issuer provisioned previously in the demo, upon installation of the Interconnect Operator, we will provision a CA certificate for cluster-2 via a certificate request from the ClusterIssuer.

oc apply -f src/main/k8s/cloud-2/router/cloud2-certificate-request.yaml

This certificate request:

apiVersion: certmanager.k8s.io/v1alpha1
kind: Certificate
metadata:
  name: cluster-wide-tls
spec:
  secretName: cluster-wide-tls
  commonName: openshift.com
  issuerRef:
    name: cloud-native-event-mesh-demo-cert-issuer
    kind: ClusterIssuer
  dnsNames: 
     - openshift.com
     - eipractice.com
     - opentlc.com

Will again provision our CA into a secret into the cluster named “cluster-wide-tls”.

Upon the certificate manager cluster issuer issuing a CA into cluster-2, we can lay down the Interconnect resource that will enable our cluster-1 and cluster-2 event meshing.

oc apply -f src/main/k8s/cloud-2/router/cloud2-certificate-request.yaml

The router provisioned takes mostly default values for the Interconnect configuration; however, does create an interior router connection to the event mesh router in cluster-1:

apiVersion: interconnectedcloud.github.io/v1alpha1
kind: Interconnect
metadata:
  name: cloud2-router-mesh
spec:
  sslProfiles:
  - name: inter-cluster-tls
    credentials: cluster-wide-tls
    caCert: cluster-wide-tls
  interRouterConnectors:
  - host: cloud1-router-mesh.cluster-1.svc
    port: 55671
    verifyHostname: false
    sslProfile: inter-cluster-tls

Upon succesful provisioning of the router in cluster-2 we should be able to see a successfull interior router connection between the routers in cluster-1 and cluster-2:

oc exec cloud2-router-mesh-d566476ff-msdrr -i -t -- qdstat -c 
2020-06-29 23:37:04.652856 UTC
cloud2-router-mesh-d566476ff-msdrr

Connections
  id  host                                    container                             role          dir  security                                authentication  tenant  last dlv      uptime
  =================================================================================================================================================================================================
  2   cloud1-router-mesh.cluster-1.svc:55671  cloud1-router-mesh-7f698d8c65-wpnx7   inter-router  out  TLSv1/SSLv3(DHE-RSA-AES256-GCM-SHA384)  x.509                   000:00:00:00  000:00:00:54
  9   127.0.0.1:33562                         ae807937-90d0-44d7-8f7b-afdc14f5c47a  normal        in   no-security                             no-auth                 000:00:00:00  000:00:00:00
Installing the Edge Interconnect Router

At this point we have a mesh of Interconnect routers in cluster-1 and cluster-2; however, to properly be able to scale our router network and provide a connection concentrator for our eventing applications, we will want to establish a member of our event mesh to have the role of “edge” in our cluster. Edge routers act as connection concentrators for messaging applications. Each edge router maintains a single uplink connection to an interior router, and messaging applications connect to the edge routers to send and receive messages.

Initially, we will to create a namespace named “edge”. Please Note: in practice, our edge router would likely live in a seperate cluster meant to handle edge use cases, and would be seperate from cluster-1 and cluster-2.

Upon succesfull creation of the namespace, it is neccessarry to install the Interconnect Operator into this namespace. This will allow us to provision our edge router resource.

Initially, as with our other logical clusters we will provision our edge namespace with a CA certificate:

oc apply -f src/main/k8s/edge/edge-certificate-request.yaml

Upon creating our cluster-wide-tls secret in the namespace, we’ll provision our Interconnect Router which will have uplinks to both cluster-1 and cluster-2:

oc apply -f src/main/k8s/edge/edge-routers.yaml

This will provision our edge event mesh router, to perform a role as a terminal node in our event mesh.

apiVersion: interconnectedcloud.github.io/v1alpha1
kind: Interconnect
metadata:
  name: edge-routers
spec:
  sslProfiles:
   - name: edge-router-tls
     credentials: cluster-wide-tls
     caCert: cluster-wide-tls
  deploymentPlan:
    role: edge
    placement: AntiAffinity
  edgeConnectors:  
    - host: cloud1-router-mesh.cloud-1
      port: 45672
      name: cloud1-edge-connector
    - host: cloud2-router-mesh.cloud-2
      port: 45672
      name: cloud2-edge-connector
  listeners: 
    - sslProfile: cluster-wide-tls
      authenticatePeer: false
      expose: true
      http: false
      port: 5671

If we were to peruse the Interconnect console for one of our clusters, we would be able to see our two interior routers fronted by a single edge router: Initial Event Mesh Topology

Installing the Event Bus

Cloud Native Integration creates an event mesh through which event emitters and receivers are able to negotiate with each other, get guarantees around delivery, and ensure proper communication flow via Interconnects wire level flow control capabilities.

While the event mesh serves to provide a communications control plane for event emitters and receivers, through which event level qos can be applied to its relevant use cases, and a reliable graph of receivers for emitted events, Cloud Native Integration introduces a complementary persistent event bus, as an event sink for event stream processing and integration from the event mesh. This event bus provides a persistent source of truth for event stream processors, and allows event receivers to take advantage of platform capabilities such as elasticity, contractual communication, and extend those capabilities into serverless capabilities such as scaling to zero and other highly elastic behaviour.

To accomplish this Cloud Native Integration relies on Knative Eventing, and Knative Serving from the Openshift Serverless Operator that we deployed earlier.

As “Knative Eventing” proposes a pub-sub architecture with per namespace Brokers and subscriptions as described by: Knative Broker Triggers

It will be neccessarry to provision a persistent source of truth for our Knative event brokers. Cloud Native Integration proposes the use of AMQ Streams as this persistent source of truth as it offers two features that uniquely assist in our pub-sub abstraction:

  • A co-ordinated distributed log that replicates across distinct physical parts of an Openshift cluster
  • A means of providing distributed log partitions to ephemeral consumer groups via the Knative channels abstraction and a Kafka Topic implementation

As the Integrations that handle our events consume from our underlying event bus via Knative subscriptions to Knative event channels, AMQ Streams provides a cloud native means to persist these events: Knative Broker Channels

Provisioning AMQ Streams

For our use case in a single cluster, we will simply provision a single AMQ Streams cluster for both cluster-1 and cluster-2 event bus consumers; however, in practice, it would be apropos to at minimum extend the mult-tenant features of this demo to ensure the use of distinct/multi-tenant physical Kafka clusters.

As we have already installed the AMQ Streams Operator cluster wide, we will simply create a namespace for our AMQ Streams cluster to live in. Let’s call it “amq-streams”.

oc create namespace amq-streams

Upon succesful creation of the namespace, we will apply our AMQ Streams custom resource to create an AMQ Streams cluster in the namespace called “small-event-cluster”:

oc apply -f src/main/k8s/cloud-1/kafka/small-event-cluster.yaml

Upon succesfful creation of our resources, we should see our “amq-streams” cluster with some new pods:

oc get pods -w -n amq-streams 
NAME                                                   READY     STATUS    RESTARTS   AGE
small-event-cluster-entity-operator-7cb59948f7-r22kp   3/3       Running   0          33s
small-event-cluster-kafka-0                            2/2       Running   1          1m
small-event-cluster-zookeeper-0                        1/1       Running   0          2m
small-event-cluster-zookeeper-1                        1/1       Running   0          2m
small-event-cluster-zookeeper-2                        1/1       Running   0          2m

At this point we have provisioned AMQ Streams but we need to install Knative Eventing and Knative Serving so that AMQ Streams is the backbone of our enterprise service bus abstraction.

Provisioning Kafka Channels using the Knative Eventing Broker

As we have already installed the Openshift Serverless Operator in a previous step, we are now free to leverage the Kafka Channel configuration we created earlier to provision our Knative evening abstraction channel as a Kafka topic.

Let’s initially ensure we have configured our knative eventing operator’s webhooks to wire up for default behaviour by applying several configmaps:

oc apply -f ./src/main/k8s/config-maps/knative-eventing/ -n knative-eventing 

This will wire up our broker and channel controllers to have a default broker configuration:

kind: ConfigMap
apiVersion: v1
metadata:
  name: config-br-defaults
  namespace: knative-eventing
data:
  default-br-config: |
    clusterDefault:
      brokerClass: MTChannelBasedBroker
      apiVersion: v1
      kind: ConfigMap
      name: kafka-channel
      namespace: knative-eventing
    namespaceDefaults:
      test-tenant:
        brokerClass: MTChannelBasedBroker
        apiVersion: v1
        kind: ConfigMap
        name: imc-channel
        namespace: knative-eventing

At this point, we will label our respective cluster-1 and cluster-2 namespaces for injection by our Knative Eventing Operator.

oc label namespace cluster-1 knative-eventing-injection=enabled 
oc label namespace cluster-2 knative-eventing-inection=enabled 
Initially, in the namespace *cluster-1* we'll provision Knative eventing *Channel* custom resources: 

Upon labeling these namespaces, we should see broker knative-eventing broker pods in our namespace (for instance in namespace cluster-1):

oc get pods -w -n cluster-1
NAME                                      READY     STATUS    RESTARTS   AGE
cloud1-router-mesh-7f698d8c65-xft27       1/1       Running   0          1h
default-broker-filter-b5967fd6c-rs8vm     1/1       Running   0          13s
default-broker-ingress-6d4c6474c8-9gd4m   1/1       Running   0          13s
interconnect-operator-5684994fbf-587dk    1/1       Running   0          1h

We should also be able to interrogate our AMQ Streams broker and see that we now have topics for our trigger based subscriptions in our default brokers in namespaces that have been injected:

oc exec small-event-cluster-kafka-0 -c kafka -it -- bin/kafka-topics.sh --zookeeper localhost:2181 --list 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
knative-messaging-kafka.cluster-1.default-kne-trigger
knative-messaging-kafka.cluster-2.default-kne-trigger

At this point, our default brokers in namespace are wired for use of a default channel, a Kafka Channel, and we can begin wiring up our knative brokers for the channels we would like to interact with:

Let’s start by creating a channel in cluster-1 that we will use during our demo:

oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbevents-channel.yaml -n cluster-1

If we interrogate K8s about this resource, we’ll see that our testing-dbevents channel is in fact a kafka channel:

oc describe channels testing-dbevents 
Name:         testing-dbevents
Namespace:    cluster-1
Labels:       <none>
Annotations:  kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"messaging.knative.dev/v1beta1","kind":"Channel","metadata":{"annotations":{},"name":"testing-dbevents","namespace":"cluster-1"}}

              messaging.knative.dev/creator=opentlc-mgr
              messaging.knative.dev/lastModifier=opentlc-mgr
API Version:  messaging.knative.dev/v1beta1
Kind:         Channel
Metadata:
  Creation Timestamp:  2020-07-07T02:00:56Z
  Generation:          1
  Resource Version:    1628649
  Self Link:           /apis/messaging.knative.dev/v1beta1/namespaces/cluster-1/channels/testing-dbevents
  UID:                 c0f05d96-5e8d-46b5-862f-adcb2a2a8df1
Spec:
  Channel Template:
    API Version:  messaging.knative.dev/v1alpha1
    Kind:         KafkaChannel
    Spec:
Status:
  Address:
    URL:  http://testing-dbevents-kn-channel.cluster-1.svc.cluster.local
  Channel:
    API Version:  messaging.knative.dev/v1alpha1
    Kind:         KafkaChannel
    Name:         testing-dbevents
    Namespace:    cluster-1
  Conditions:
    Last Transition Time:  2020-07-07T02:00:56Z
    Status:                True
    Type:                  Ready
  Observed Generation:     1
Events:
  Type    Reason             Age              From                Message
  ----    ------             ----             ----                -------
  Normal  ChannelReconciled  1m (x4 over 1m)  channel-controller  Channel reconciled: "cluster-1/testing-dbevents"

We can also interrogate the cluster, to list our kafka channels:

 oc get kafkachannels -n cluster-1
NAME                  READY     REASON    URL                                                                 AGE
default-kne-trigger   True                http://default-kne-trigger-kn-channel.cluster-1.svc.cluster.local   53m
testing-dbevents      True                http://testing-dbevents-kn-channel.cluster-1.svc.cluster.local      5m54s

At this point, we’ll go ahead and apply the rest of our channel resources to further prepare our Openshift cluster for the demo:

oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbeventaggregate-channels.yaml -n cluster-1
channel.messaging.knative.dev/testing-dbeventaggregate created
oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbevents-channel.yaml -n cluster-2
channel.messaging.knative.dev/testing-dbevents created
 oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbeventaggregate-channels.yaml -n cluster-2
channel.messaging.knative.dev/testing-dbeventaggregate created

We should see our full complement of channels if we interrogate one of our namespaces:

oc get channels -n cluster-1
NAME                       READY     REASON    URL                                                                      AGE
testing-dbeventaggregate   True                http://testing-dbeventaggregate-kn-channel.cluster-1.svc.cluster.local   17m
testing-dbevents           True                http://testing-dbevents-kn-channel.cluster-1.svc.cluster.local           25m

Installing the Event Sink

As we have previously installed the Red Hat - Camel K Operator, we can leverage Camel-K to run Apache Camel based integrations as AMQP based event receivers along our event mesh, and ultimately, act as a sink for our Knative Eventing based event bus.

Using the Kamel CLI

While this document will not describe the installation of the Kamel CLI, it is well described here: https://camel.apache.org/camel-k/latest/installation/openshift.html

Please note: this demo will not attempt to demonstrate proper CICD procedures for use of Camel-K; however, please stay tuned to this series for Cloud Native Integration: Continuous Integration and Deployment. It is also worth noting the following Apache Camel example: https://camel.apache.org/camel-k/latest/tutorials/tekton/tekton.html

In the meantime, let’s use the Kamel CLI to install our event sink for one of the channels we have installed into our namespace cluster-1 - testing-dbevents:

kamel run -n cluster-1 --trait deployer.kind=deployment --trait container.limit-cpu=500m --trait container.limit-memory=512Mi --trait container.name=cloudnative-integration ./src/main/java/io/entropic/integration/examples/eventmesh/AMQPSinkIntegration.java

Upon issuing this CLI command, the Camel-K Operator will do a few things:

  • It will create a build kit
  • It will create a builder pod
  • It will pull resources, from the Camel-K catalogue that match a specific version of Apache Camel. This means our developers will not need to labouriously configure underlying camel component resources!
  • It will create an Integration deployment based on a set of traits that we use (in our case some requests and limits as well as a deployment type)
  • It will care and feed for our integration, and provide us a path towards integration with other systems such as our knative eventing event bus, API Management via 3Scale, as well as other Ingress controller and Service Mesh features through the use of Istio

Right off the bat, we see a few things in our namespace cluster-1:

 oc get pods -w -n cluster-1
NAME                                       READY     STATUS      RESTARTS   AGE
amqp-sink-integration-64dc6fcd4d-szl4t     1/1       Running     0          20m
camel-k-kit-bs1un3ubhuc7jr4mqb30-1-build   0/1       Completed   0          21m
camel-k-kit-bs1un3ubhuc7jr4mqb30-builder   0/1       Completed   0          22m

We notice a build kit pod as well as the actual integration deployment, when we describe our Camel-K integration it gives us more information in regards to what these things are and how they build out our integration:

oc describe integration amqp-sink-integration 
Name:         amqp-sink-integration
Namespace:    cluster-1
Labels:       <none>
Annotations:  <none>
API Version:  camel.apache.org/v1
Kind:         Integration
Metadata:
  Creation Timestamp:  2020-07-07T03:27:11Z
  Generation:          1
  Resource Version:    1679979
  Self Link:           /apis/camel.apache.org/v1/namespaces/cluster-1/integrations/amqp-sink-integration
  UID:                 ef4a8c8f-7cd7-4ff7-a025-00532c7087ff
Spec:
  Sources:
    Content:  package io.entropic.integration.examples.eventmesh;

import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;

public class AMQPSinkIntegration extends RouteBuilder {

  @BindToRegistry
  public javax.jms.ConnectionFactory connectionFactory() {
    return new org.apache.qpid.jms.JmsConnectionFactory(
        "amqp://cloud1-router-mesh.cluster-1:5672?transport.trustAll=true&transport.verifyHost=false");
  }

  @Override
  public void configure() throws Exception {
    
    from("amqp:queue:test.db-events")
      .log("**Received message ${body}**")
      .setBody().simple("${body} processed by AMQPSinkIntegration")
      .to("knative:channel/testing-dbevents")
      .log("**sent message to knative channel ${body} **");

  }

}

    Name:  AMQPSinkIntegration.java
  Traits:
    Container:
      Configuration:
        Limit - Cpu:     500m
        Limit - Memory:  512Mi
        Name:            cloudnative-integration
    Deployer:
      Configuration:
        Kind:  deployment
Status:
  Capabilities:
    platform-http
  Conditions:
    Last Transition Time:  2020-07-07T03:27:11Z
    Last Update Time:      2020-07-07T03:27:11Z
    Message:               camel-k
    Reason:                IntegrationPlatformAvailable
    Status:                True
    Type:                  IntegrationPlatformAvailable
    Last Transition Time:  2020-07-07T03:29:33Z
    Last Update Time:      2020-07-07T03:29:33Z
    Message:               kit-bs1un3ubhuc7jr4mqb30
    Reason:                IntegrationKitAvailable
    Status:                True
    Type:                  IntegrationKitAvailable
    Last Transition Time:  2020-07-07T03:29:37Z
    Last Update Time:      2020-07-07T03:29:37Z
    Message:               different controller strategy used (deployment)
    Reason:                CronJobNotAvailableReason
    Status:                False
    Type:                  CronJobAvailable
    Last Transition Time:  2020-07-07T03:29:37Z
    Last Update Time:      2020-07-07T03:29:37Z
    Message:               deployment name is amqp-sink-integration
    Reason:                DeploymentAvailable
    Status:                True
    Type:                  DeploymentAvailable
    Last Transition Time:  2020-07-07T03:29:37Z
    Last Update Time:      2020-07-07T03:29:37Z
    Message:               different controller strategy used (deployment)
    Reason:                KnativeServiceNotAvailable
    Status:                False
    Type:                  KnativeServiceAvailable
  Dependencies:
    camel:amqp
    mvn:org.apache.camel.k/camel-k-loader-java
    mvn:org.apache.camel.k/camel-k-runtime-main
    mvn:org.apache.camel.k:camel-knative
  Digest:            v3y2DwE-7npkTmwW-3zeh8sBAn2vpJHSXHsFabGHgk3I
  Image:             image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30@sha256:cde967f32c00d46020f7a5bd726d67145162e7abaf20a4abbf9bbd53dd191881
  Kit:               kit-bs1un3ubhuc7jr4mqb30
  Phase:             Running
  Platform:          camel-k
  Profile:           Knative
  Replicas:          1
  Runtime Provider:  main
  Runtime Version:   1.3.0.fuse-jdk11-800012-redhat-00001
  Version:           1.0
Events:
  Type    Reason                       Age   From                                Message
  ----    ------                       ----  ----                                -------
  Normal  ReasonRelatedObjectChanged   24m   camel-k-integration-kit-controller  Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Integration Kit) changed phase to Build Running
  Normal  IntegrationPhaseUpdated      24m   camel-k-integration-controller      Integration amqp-sink-integration in phase Waiting For Platform
  Normal  IntegrationConditionChanged  24m   camel-k-integration-controller      IntegrationPlatformAvailable for Integration amqp-sink-integration: camel-k
  Normal  IntegrationPhaseUpdated      24m   camel-k-integration-controller      Integration amqp-sink-integration in phase Initialization
  Normal  IntegrationConditionChanged  24m   camel-k-integration-controller      No IntegrationKitAvailable for Integration amqp-sink-integration: creating a new integration kit
  Normal  IntegrationPhaseUpdated      24m   camel-k-integration-controller      Integration amqp-sink-integration in phase Building Kit
  Normal  ReasonRelatedObjectChanged   24m   camel-k-integration-kit-controller  Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Integration Kit) changed phase to Build Submitted
  Normal  ReasonRelatedObjectChanged   24m   camel-k-build-controller            Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Scheduling
  Normal  ReasonRelatedObjectChanged   24m   camel-k-build-controller            Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Pending
  Normal  ReasonRelatedObjectChanged   24m   camel-k-build-controller            Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Running
  Normal  IntegrationConditionChanged  24m   camel-k-integration-controller      No IntegrationPlatformAvailable for Integration amqp-sink-integration: camel-k
  Normal  ReasonRelatedObjectChanged   22m   camel-k-build-controller            Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Succeeded
  Normal  ReasonRelatedObjectChanged   22m   camel-k-integration-kit-controller  Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Integration Kit) changed phase to Ready
  Normal  IntegrationConditionChanged  22m   camel-k-integration-controller      IntegrationKitAvailable for Integration amqp-sink-integration: kit-bs1un3ubhuc7jr4mqb30
  Normal  IntegrationPhaseUpdated      22m   camel-k-integration-controller      Integration amqp-sink-integration in phase Deploying
  Normal  IntegrationConditionChanged  22m   camel-k-integration-controller      No CronJobAvailable for Integration amqp-sink-integration: different controller strategy used (deployment)
  Normal  IntegrationConditionChanged  22m   camel-k-integration-controller      DeploymentAvailable for Integration amqp-sink-integration: deployment name is amqp-sink-integration
  Normal  IntegrationConditionChanged  22m   camel-k-integration-controller      No KnativeServiceAvailable for Integration amqp-sink-integration: different controller strategy used (deployment)
  Normal  IntegrationPhaseUpdated      22m   camel-k-integration-controller      Integration amqp-sink-integration in phase Running

We’ll notice our integration code (which can be in nearly any java variant), as well as a number of things about our inevitable run time environment that our build kit and builder pod ultimately have deployed.

Taking a look at our builder pod, we notice some very familiar things:

  • Maven dependencies being fetched
  • Wiring up of an underlying JVM environment
  • Building of an image

A quick snippet from our buil kit pod shows some very familiar things:

{"level":"info","ts":1594092510.535882,"logger":"camel-k.builder","msg":"executing step","step":"github.com/apache/camel-k/pkg/builder/IncrementalImageContext","phase":30,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092510.59943,"logger":"camel-k.builder","msg":"step done in 0.063516 seconds","step":"github.com/apache/camel-k/pkg/builder/IncrementalImageContext","phase":30,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092510.5994673,"logger":"camel-k.builder","msg":"executing step","step":"github.com/apache/camel-k/pkg/builder/s2i/Publisher","phase":40,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092572.896473,"logger":"camel-k.builder","msg":"step done in 62.296992 seconds","step":"github.com/apache/camel-k/pkg/builder/s2i/Publisher","phase":40,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092572.8965232,"logger":"camel-k.builder","msg":"dependencies: [camel:amqp mvn:org.apache.camel.k/camel-k-loader-java mvn:org.apache.camel.k/camel-k-runtime-main mvn:org.apache.camel.k:camel-knative]"}
{"level":"info","ts":1594092572.896557,"logger":"camel-k.builder","msg":"artifacts: [org.apache.camel:camel-amqp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-jms:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-support:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-spring:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-xml-jaxb:jar:3.1.0.fuse-jdk11-800011-redhat-00001 jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2 jakarta.activation:jakarta.activation-api:jar:1.2.1 com.sun.xml.bind:jaxb-core:jar:2.3.0 com.sun.xml.bind:jaxb-impl:jar:2.3.0 org.apache.camel:camel-xml-jaxp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-core-xml:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.springframework:spring-core:jar:5.2.3.RELEASE org.springframework:spring-jcl:jar:5.2.3.RELEASE org.springframework:spring-aop:jar:5.2.3.RELEASE org.springframework:spring-expression:jar:5.2.3.RELEASE org.springframework:spring-jms:jar:5.2.3.RELEASE org.springframework:spring-messaging:jar:5.2.3.RELEASE org.springframework:spring-context:jar:5.2.3.RELEASE org.springframework:spring-tx:jar:5.2.3.RELEASE org.springframework:spring-beans:jar:5.2.3.RELEASE org.apache.geronimo.specs:geronimo-jms_2.0_spec:jar:1.0.0.alpha-2-redhat-2 org.apache.qpid:qpid-jms-client:jar:0.48.0.redhat-00004 io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-unix-common:jar:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-kqueue:jar:4.1.45.Final-redhat-00001 org.apache.qpid:proton-j:jar:0.33.3.redhat-00001 io.netty:netty-buffer:jar:4.1.45.Final-redhat-00002 io.netty:netty-common:jar:4.1.45.Final-redhat-00002 io.netty:netty-handler:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec:jar:4.1.45.Final-redhat-00002 io.netty:netty-transport:jar:4.1.45.Final-redhat-00002 io.netty:netty-resolver:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec-http:jar:4.1.45.Final-redhat-00002 org.apache.camel.k:camel-k-loader-java:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-core:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-core-engine:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-base:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-management-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-util:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-endpointdsl:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.jooq:joor:jar:0.9.12 org.apache.camel.k:camel-k-runtime-main:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-main:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-caffeine-lrucache:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.github.ben-manes.caffeine:caffeine:jar:2.8.1 org.apache.camel:camel-headersmap:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.cedarsoftware:java-util:jar:1.40.0 org.apache.camel:camel-bean:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.logging.log4j:log4j-core:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-api:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-slf4j-impl:jar:2.13.1.redhat-00001 org.apache.camel.k:camel-knative:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.slf4j:slf4j-api:jar:1.7.30 org.apache.camel:camel-cloud:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.fasterxml.jackson.core:jackson-databind:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-annotations:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-core:jar:2.10.3.redhat-00001 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.3.redhat-00001 org.apache.camel.k:camel-knative-api:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-knative-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-platform-http:jar:3.1.0.fuse-jdk11-800011-redhat-00001 io.vertx:vertx-web:jar:3.8.5.redhat-00005 io.vertx:vertx-auth-common:jar:3.8.5.redhat-00005 io.vertx:vertx-bridge-common:jar:3.8.5.redhat-00005 io.vertx:vertx-web-client:jar:3.8.5.redhat-00005 io.vertx:vertx-web-common:jar:3.8.5.redhat-00005 io.vertx:vertx-core:jar:3.8.5.redhat-00005 io.netty:netty-handler-proxy:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-socks:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-http2:jar:4.1.45.Final-redhat-00001 io.netty:netty-resolver-dns:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-dns:jar:4.1.45.Final-redhat-00001]"}
{"level":"info","ts":1594092572.8966222,"logger":"camel-k.builder","msg":"artifacts selected: [org.apache.camel:camel-amqp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-jms:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-support:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-spring:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-xml-jaxb:jar:3.1.0.fuse-jdk11-800011-redhat-00001 jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2 jakarta.activation:jakarta.activation-api:jar:1.2.1 com.sun.xml.bind:jaxb-core:jar:2.3.0 com.sun.xml.bind:jaxb-impl:jar:2.3.0 org.apache.camel:camel-xml-jaxp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-core-xml:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.springframework:spring-core:jar:5.2.3.RELEASE org.springframework:spring-jcl:jar:5.2.3.RELEASE org.springframework:spring-aop:jar:5.2.3.RELEASE org.springframework:spring-expression:jar:5.2.3.RELEASE org.springframework:spring-jms:jar:5.2.3.RELEASE org.springframework:spring-messaging:jar:5.2.3.RELEASE org.springframework:spring-context:jar:5.2.3.RELEASE org.springframework:spring-tx:jar:5.2.3.RELEASE org.springframework:spring-beans:jar:5.2.3.RELEASE org.apache.geronimo.specs:geronimo-jms_2.0_spec:jar:1.0.0.alpha-2-redhat-2 org.apache.qpid:qpid-jms-client:jar:0.48.0.redhat-00004 io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-unix-common:jar:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-kqueue:jar:4.1.45.Final-redhat-00001 org.apache.qpid:proton-j:jar:0.33.3.redhat-00001 io.netty:netty-buffer:jar:4.1.45.Final-redhat-00002 io.netty:netty-common:jar:4.1.45.Final-redhat-00002 io.netty:netty-handler:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec:jar:4.1.45.Final-redhat-00002 io.netty:netty-transport:jar:4.1.45.Final-redhat-00002 io.netty:netty-resolver:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec-http:jar:4.1.45.Final-redhat-00002 org.apache.camel.k:camel-k-loader-java:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-core:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-core-engine:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-base:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-management-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-util:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-endpointdsl:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.jooq:joor:jar:0.9.12 org.apache.camel.k:camel-k-runtime-main:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-main:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-caffeine-lrucache:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.github.ben-manes.caffeine:caffeine:jar:2.8.1 org.apache.camel:camel-headersmap:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.cedarsoftware:java-util:jar:1.40.0 org.apache.camel:camel-bean:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.logging.log4j:log4j-core:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-api:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-slf4j-impl:jar:2.13.1.redhat-00001 org.apache.camel.k:camel-knative:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.slf4j:slf4j-api:jar:1.7.30 org.apache.camel:camel-cloud:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.fasterxml.jackson.core:jackson-databind:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-annotations:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-core:jar:2.10.3.redhat-00001 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.3.redhat-00001 org.apache.camel.k:camel-knative-api:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-knative-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-platform-http:jar:3.1.0.fuse-jdk11-800011-redhat-00001 io.vertx:vertx-web:jar:3.8.5.redhat-00005 io.vertx:vertx-auth-common:jar:3.8.5.redhat-00005 io.vertx:vertx-bridge-common:jar:3.8.5.redhat-00005 io.vertx:vertx-web-client:jar:3.8.5.redhat-00005 io.vertx:vertx-web-common:jar:3.8.5.redhat-00005 io.vertx:vertx-core:jar:3.8.5.redhat-00005 io.netty:netty-handler-proxy:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-socks:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-http2:jar:4.1.45.Final-redhat-00001 io.netty:netty-resolver-dns:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-dns:jar:4.1.45.Final-redhat-00001]"}
{"level":"info","ts":1594092572.8966591,"logger":"camel-k.builder","msg":"base image: registry.access.redhat.com/ubi8/openjdk-11:1.3"}
{"level":"info","ts":1594092572.8966627,"logger":"camel-k.builder","msg":"resolved base image: registry.access.redhat.com/ubi8/openjdk-11:1.3"}
{"level":"info","ts":1594092572.8966658,"logger":"camel-k.builder","msg":"resolved image: image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30:1678529"}

And ultimately, our build pod shows the creation of the ultimate image that represents our Integration:

oc logs camel-k-kit-bs1un3ubhuc7jr4mqb30-1-build -f 
Caching blobs under "/var/cache/blobs".

Pulling image registry.access.redhat.com/ubi8/openjdk-11:1.3 ...
Getting image source signatures
Copying blob sha256:1b99828eddf5297ca28fa7eac7f47a40d36a693c628311406788a14dfe75e076
Copying blob sha256:fba81d8872a85de345b80cc69b1f23309ee284950d45dbee2bd8bf9bf17c60a7
Copying blob sha256:e96e3a1df3b2b1e01f9614725b50ea4d1d8e480980e456815ade3c7afca978d7
Copying config sha256:d43c43b348f8c2cfcbe4beff8c36dd8c73cd78cb89e3912190b0b357904f367d
Writing manifest to image destination
Storing signatures
STEP 1: FROM registry.access.redhat.com/ubi8/openjdk-11:1.3
STEP 2: ADD . /deployments
time="2020-07-07T03:29:09Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:09Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 1a55bc7e5af
STEP 3: USER 1000
time="2020-07-07T03:29:10Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:10Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 562ea4ed86e
STEP 4: ENV "OPENSHIFT_BUILD_NAME"="camel-k-kit-bs1un3ubhuc7jr4mqb30-1" "OPENSHIFT_BUILD_NAMESPACE"="cluster-1"
time="2020-07-07T03:29:10Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:10Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 2a358bb17bf
STEP 5: LABEL "io.openshift.build.name"="camel-k-kit-bs1un3ubhuc7jr4mqb30-1" "io.openshift.build.namespace"="cluster-1"
STEP 6: COMMIT temp.builder.openshift.io/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30-1:bd99c4f2
time="2020-07-07T03:29:10Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:10Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 59f4e2ff6c4
59f4e2ff6c47ff3cf2e065bb8b6da3a97e87c54f4f48cf30b2213e787c3bc636

Pushing image image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30:1678529 ...
Getting image source signatures
Copying blob sha256:e96e3a1df3b2b1e01f9614725b50ea4d1d8e480980e456815ade3c7afca978d7
Copying blob sha256:fba81d8872a85de345b80cc69b1f23309ee284950d45dbee2bd8bf9bf17c60a7
Copying blob sha256:50fdb1a6f2f893727eed92d3fb141a71b9abde2b839c1d33d08265a3cd313dc7
Copying blob sha256:1b99828eddf5297ca28fa7eac7f47a40d36a693c628311406788a14dfe75e076
Copying config sha256:59f4e2ff6c47ff3cf2e065bb8b6da3a97e87c54f4f48cf30b2213e787c3bc636
Writing manifest to image destination
Storing signatures
Successfully pushed image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30@sha256:cde967f32c00d46020f7a5bd726d67145162e7abaf20a4abbf9bbd53dd191881
Push successful

We now have our image created by our Integration Build Kit in the local Openshift Image registry.

All with a simple invocation of the Kamel CLI.

At this point, we’ll also deploy our event sink to cluster-2, as both cluster-1 and cluster-2 will have event receivers that are participating in our event mesh:

kamel run -n cluster-2 --trait deployer.kind=deployment --trait container.limit-cpu=500m --trait container.limit-memory=512Mi --trait container.name=cloudnative-integration ./src/main/java/io/entropic/integration/examples/eventmesh/AMQPSinkIntegration.java

We’ll notice that we have a similar set of pods in cluster-2:

oc get pods -w -n cluster-2
NAME                                       READY     STATUS      RESTARTS   AGE
amqp-sink-integration-76bb7c667f-5mdm2     1/1       Running     0          11m
camel-k-kit-bs1v76ubhuc7jr4mqb3g-1-build   0/1       Completed   0          12m
camel-k-kit-bs1v76ubhuc7jr4mqb3g-builder   0/1       Completed   0          13m
cloud2-router-mesh-d566476ff-bqtrx         1/1       Running     0          3h
default-broker-filter-b5967fd6c-fs87l      1/1       Running     0          2h
default-broker-ingress-5485ff998c-k468n    1/1       Running     0          2h
interconnect-operator-745d45688b-7snf2     1/1       Running     0          3h

Taking a peak at the logs of our integration pod, we’ll notice it has linked up with our event mesh and some familar Apache Camel logging:

oc logs amqp-sink-integration-768c959d79-tz4zc -f 
2020-07-07 04:19:59.179 INFO  [main] LRUCacheFactory - Detected and using LURCacheFactory: camel-caffeine-lrucache
2020-07-07 04:19:59.664 INFO  [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.RuntimeConfigurer@6bf08014
2020-07-07 04:19:59.664 INFO  [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.ContextConfigurer@6ee4d9ab
2020-07-07 04:19:59.665 INFO  [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.RoutesConfigurer@302f7971
2020-07-07 04:19:59.666 INFO  [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.RoutesDumper@9573584
2020-07-07 04:19:59.666 INFO  [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.PropertiesFunctionsConfigurer@352c1b98
2020-07-07 04:19:59.670 INFO  [main] ApplicationRuntime - Listener org.apache.camel.k.listener.RuntimeConfigurer@6bf08014 executed in phase Starting
2020-07-07 04:19:59.679 INFO  [main] ApplicationRuntime - Listener org.apache.camel.k.listener.PropertiesFunctionsConfigurer@352c1b98 executed in phase Starting
2020-07-07 04:19:59.680 INFO  [main] BaseMainSupport - Using properties from: 
2020-07-07 04:19:59.763 INFO  [main] RuntimeSupport - Looking up loader for language: java
2020-07-07 04:19:59.765 INFO  [main] RuntimeSupport - Found loader org.apache.camel.k.loader.java.JavaSourceLoader@36060e for language java from service definition
2020-07-07 04:20:02.571 INFO  [main] RoutesConfigurer - Loading routes from: file:/etc/camel/sources/i-source-000/AMQPSinkIntegration.java?language=java
2020-07-07 04:20:02.571 INFO  [main] ApplicationRuntime - Listener org.apache.camel.k.listener.RoutesConfigurer@302f7971 executed in phase ConfigureRoutes
2020-07-07 04:20:02.875 INFO  [main] RuntimeSupport - Found customizer org.apache.camel.k.http.PlatformHttpServiceContextCustomizer@1b70203f with id platform-http from service definition
2020-07-07 04:20:02.876 INFO  [main] RuntimeSupport - Apply ContextCustomizer with id=platform-http and type=org.apache.camel.k.http.PlatformHttpServiceContextCustomizer
2020-07-07 04:20:03.079 INFO  [main] PlatformHttpServiceEndpoint - Creating new Vert.x instance
2020-07-07 04:20:04.168 INFO  [vert.x-eventloop-thread-1] PlatformHttpServer - Vert.x HttpServer started on 0.0.0.0:8080
2020-07-07 04:20:04.175 INFO  [main] ApplicationRuntime - Listener org.apache.camel.k.listener.ContextConfigurer@6ee4d9ab executed in phase ConfigureContext
2020-07-07 04:20:04.176 INFO  [main] AbstractCamelContext - Apache Camel 3.1.0.fuse-jdk11-800011-redhat-00001 (CamelContext: camel-k) is starting
2020-07-07 04:20:04.177 INFO  [main] DefaultManagementStrategy - JMX is disabled
2020-07-07 04:20:04.179 INFO  [main] HeadersMapFactoryResolver - Detected and using HeadersMapFactory: camel-headersmap
2020-07-07 04:20:04.566 INFO  [main] BaseMainSupport - Autowired property: connectionFactory on component: AMQPComponent as exactly one instance of type: javax.jms.ConnectionFactory found in the registry
2020-07-07 04:20:04.766 INFO  [main] KnativeComponent - found knative transport: org.apache.camel.component.knative.http.KnativeHttpTransport@68fe48d7 for protocol: http
2020-07-07 04:20:05.563 INFO  [main] AbstractCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2020-07-07 04:20:11.665 INFO  [AmqpProvider :(1):[amqp://cloud2-router-mesh.cluster-2:5672]] SaslMechanismFinder - Best match for SASL auth was: SASL-ANONYMOUS
2020-07-07 04:20:11.767 INFO  [AmqpProvider :(1):[amqp://cloud2-router-mesh.cluster-2:5672]] JmsConnection - Connection ID:61991487-4cbc-4633-9bc4-36053a86d8f9:1 connected to remote Broker: amqp://cloud2-router-mesh.cluster-2:5672
2020-07-07 04:20:11.767 INFO  [main] AbstractCamelContext - Route: route1 started and consuming from: amqp://queue:test.db-events
2020-07-07 04:20:11.770 INFO  [main] AbstractCamelContext - Total 1 routes, of which 1 are started
2020-07-07 04:20:11.770 INFO  [main] AbstractCamelContext - Apache Camel 3.1.0.fuse-jdk11-800011-redhat-00001 (CamelContext: camel-k) started in 7.595 seconds
2020-07-07 04:20:11.771 INFO  [main] ApplicationRuntime - Listener org.apache.camel.k.listener.RoutesDumper@9573584 executed in phase Started

Deploying Serverless Integrations on the Event Bus

Now that we have an event sink to persist events onto our event bus, let’s leverage the power of Camel-K to have integrations participate on the event bus:



Upon executing this CLI command, we’ll notice something a little different than our previous Camel-K Integration deployment:

oc get pods -w -n cluster-1
NAME                                                              READY     STATUS        RESTARTS   AGE
amqp-sink-integration-64dc6fcd4d-szl4t                            1/1       Running       0          1h
camel-k-kit-bs1un3ubhuc7jr4mqb30-1-build                          0/1       Completed     0          1h
camel-k-kit-bs1un3ubhuc7jr4mqb30-builder                          0/1       Completed     0          1h
camel-k-kit-bs1vjgmbhuc7jr4mqb4g-1-build                          0/1       Completed     0          2m
camel-k-kit-bs1vjgmbhuc7jr4mqb4g-builder                          0/1       Completed     0          2m
cloud1-router-mesh-7f698d8c65-xft27                               1/1       Running       0          4h
default-broker-filter-b5967fd6c-rs8vm                             1/1       Running       0          2h
default-broker-ingress-6d4c6474c8-9gd4m                           1/1       Running       0          2h
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x   2/2       Terminating   0          1m
interconnect-operator-5684994fbf-587dk                            1/1       Running       0          4h
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x   2/2       Terminating   0         1m
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x   0/2       Terminating   0         1m
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x   0/2       Terminating   0         1m
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x   0/2       Terminating   0         1m

We’ll notice that our integration runs and then scales back down to zero. This is because we are using Knative Serving and our Integration is now serverless and scaled to 0. When we interrogate K8s, we’ll notice our integration is ready; however, we have 0 replicas running:

oc get integrations -n cluster-1
NAME                                   PHASE     KIT                        REPLICAS
amqp-sink-integration                  Running   kit-bs1un3ubhuc7jr4mqb30   1
event-bus-transformation-integration   Running   kit-bs1vjgmbhuc7jr4mqb4g   0

And if we interrogate our knative serving system, we’ll notice we have a viable knative serving revision for our integration:

oc get revisions.serving.knative.dev -n cluster-1
NAME                                         CONFIG NAME                            K8S SERVICE NAME                             GENERATION   READY     REASON
event-bus-transformation-integration-wqksm   event-bus-transformation-integration   event-bus-transformation-integration-wqksm   1            True      

We will also notice that we have a subscription to a knative event channel:

oc get subscriptions.messaging.knative.dev 
NAME                                                    READY     REASON    AGE
testing-dbevents-event-bus-transformation-integration   True                9m38s

Describing the subscription resource:

[mcostell@work cloud-native-event-mesh]$ oc describe subscriptions.messaging.knative.dev testing-dbevents-event-bus-transformation-integration 
Name:         testing-dbevents-event-bus-transformation-integration
Namespace:    cluster-1
Labels:       camel.apache.org/generation=1
              camel.apache.org/integration=event-bus-transformation-integration
Annotations:  messaging.knative.dev/creator=system:serviceaccount:openshift-operators:camel-k-operator
              messaging.knative.dev/lastModifier=system:serviceaccount:openshift-operators:camel-k-operator
API Version:  messaging.knative.dev/v1beta1
Kind:         Subscription
Metadata:
  Creation Timestamp:  2020-07-07T04:29:19Z
  Finalizers:
    subscriptions.messaging.knative.dev
  Generation:  1
  Owner References:
    API Version:           camel.apache.org/v1
    Block Owner Deletion:  true
    Controller:            true
    Kind:                  Integration
    Name:                  event-bus-transformation-integration
    UID:                   f21518f9-8906-4cb8-a164-fecf2df7fb6c
  Resource Version:        1714673
  Self Link:               /apis/messaging.knative.dev/v1beta1/namespaces/cluster-1/subscriptions/testing-dbevents-event-bus-transformation-integration
  UID:                     968a8794-9744-414f-91b4-d6226670fa4d
Spec:
  Channel:
    API Version:  messaging.knative.dev/v1alpha1
    Kind:         Channel
    Name:         testing-dbevents
  Subscriber:
    Ref:
      API Version:  serving.knative.dev/v1
      Kind:         Service
      Name:         event-bus-transformation-integration
      Namespace:    cluster-1
Status:
  Conditions:
    Last Transition Time:  2020-07-07T04:29:29Z
    Status:                True
    Type:                  AddedToChannel
    Last Transition Time:  2020-07-07T04:29:30Z
    Status:                True
    Type:                  ChannelReady
    Last Transition Time:  2020-07-07T04:29:30Z
    Status:                True
    Type:                  Ready
    Last Transition Time:  2020-07-07T04:29:30Z
    Status:                True
    Type:                  Resolved
  Observed Generation:     1
  Physical Subscription:
    Subscriber URI:  http://event-bus-transformation-integration.cluster-1.svc.cluster.local
Events:
  Type     Reason                               Age                From                     Message
  ----     ------                               ----               ----                     -------
  Normal   FinalizerUpdate                      10m                subscription-controller  Updated "testing-dbevents-event-bus-transformation-integration" finalizers
  Warning  SubscriberResolveFailed              10m (x5 over 10m)  subscription-controller  Failed to resolve spec.subscriber: address not set for &ObjectReference{Kind:Service,Namespace:cluster-1,Name:event-bus-transformation-integration,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}
  Normal   SubscriberSync                       10m                subscription-controller  Subscription was synchronized to channel "testing-dbevents"
  Warning  SubscriptionNotMarkedReadyByChannel  10m                subscription-controller  channel.Status.SubscribableStatus is nil
  Normal   SubscriptionReconciled               10m (x3 over 10m)  subscription-controller  Subscription reconciled: "cluster-1/testing-dbevents-event-bus-transformation-integration"

We now have an integration that is connected to the event bus and an event sink that acts as a source for our event bus and ultimately allows our serverless integrations to participate with our event mesh.

Apply this same integration to cluster-2 and then we will be ready to demonstrate some of the incredible power or our event mesh and event bus as we demonstrate active-passive, active-active and other event mesh topologies along with serverless integration in a way that makes us truly Cloud Native!!!


What is “Cloud Native”? And How Does it Apply to Integration?

An Enterprise Integration Practice Perspective

The terms “Cloud” and “Cloud Native” have become two pivotal terms in the software industry implying significant change and a flurry of new techniques aimed at providing ever increasing efficiency, and efficacy for modern compute platforms. These terms; however, are not often well defined and it is often unclear what the implications of the use of a new “cloud” or “cloud native” model might imply for applications and architecture delivered to this platform.

More specifically, as enterprises attempt to solve common problems solved by integration technologies and middleware at large, what if any difference is implied by this technique is often left up to implementers who may or may not be leveraging this new found compute platform for its efficiency, stability, and reliability gains.

The purpose of this document is to come to a common understanding of what these terms mean and what this might all means for traditional enterprise software and their integration tiers.

In the end, we’ll find that to meet our integration needs in our brave new cloud world, we need a new architectural view Cloud Native Integration

Defining Terms

Here we define the “Cloud” What is the Cloud?

Here we define the term “Cloud Native” What is Cloud Native?

Is MicroService Architecture Cloud Native?

Great question. In short, almost. Here we discuss why: Is MicroService Architecture Cloud Native?

Cloud Native Integration

As we define the cloud and cloud native , it becomes clear that integration between these myriad components requires integration. In fact, as workloads appear in different clouds, clusters, and transition between legacy approaches to cloud native approaches, integration becomes a front and center feature of our enterprise architecture.

Here we explain why a container platform is the Cloud Native run time of choice for Cloud Native Integration: Integration and the Container Platform

and

Here we define Cloud Native Integration and why it is an important evolutionary leap for our modern enterprise architecture(s): What is Cloud Native Integration?


Using Debezium To Create AMQP Based Events

Change Data Capture is an important architectural technique as we seek to describe OLTP events in our event driven architectures. Change Data Capture seeks to describe data written to traditional relational databases as consumable events for an event streaming platform.

Debezium is a way to tail database commit logs and create events out of these database writes. We use Debezium to ensure that as our surrounding business processes’ persist transactions to OLTP Stores (aka Databases) that we stream these events to an event bus, where stream processors can provide further integration with this data.

Often this event bus is Apache Kafka, as we use Debezium with Kafka Connect; however, Kafka Connect has limited message transformation capabilities, and only one sink/target Kafka.

Using Apache Camel, we can use Debezium to persist events to any number of targets, adapters, or services. Given Apache Camel’s maturity as an enterprise integration appliance, it provides a much more mature, and feature rich environment through which to use Debezium for Change Data Capture purposes.

In this article, we’ll discuss how to create meaningful AMQP events from change data capture events, and persist them along an event bus/mesh.

Using Camel with embedded Debezium

Without getting too far into what Apache Camel is, it is worth pointing out that Apache Camel provides an integration framework with hundreds of adapters. Apache Camel may also be run as a microservice self-managed, via other runtimes such as Springboot, Vert.x, or OSGI, and fits well as a runtime for Debezium providing out of the box support for many Debezium sources.

In this article, we will leverage the work done in a great article Integration Scenarious With Debezium and Camel.

We’ll extend the work done in this article to do a few more/different things:

  • Transform our payload to meet the OpenAPI specification
  • Prepare our payload for use as an AMQP based event
  • Prepare an event mesh with Apache Qpid Dispatch Router
  • Consume this message as an AMQP peer to our Debezium runtime

To illustrate this examples we’ll be using the source code found here: Camel Debezium Examples

Wiring up Camel for Debezium Usage

Initally, there are a few things we’ll need to do:

  • We’ll need to ensure we’re using Apache Camel 3.x
  • We need to wire up our camel route to use the Camel Posgres Debezium component [Camel Postgres Debezium Component] (https://camel.apache.org/components/latest/debezium-postgres-component.html)
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-debezium-postgres</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

Using the Debezium Camel component for Postgres, we can wire up a camel route like:


@Override
	public void configure() throws Exception {
		
		from("debezium-postgres:localhost?"
                + "databaseHostname="
                + "&databasePort="
                + "&databaseUser="
                + "&databasePassword="
                + "&databaseDbname=postgres"
                + "&databaseServerName=localhost"
                + "&pluginName=pgoutput"
                + "&schemaWhitelist="
                + "&tableWhitelist=.making_tests"
                + "&offsetStorage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
                .routeId("debeziumPGRoute")
                .routeDescription("This route  consumes from a PostGres DB and persists an AMQP Event")

Debezium Configuration

Its worth noting a few things about our Debezium usage. We wire up for our database with the usual suspects hostname, port, etc., and we wire up Debezium to whitelist a schema and table, which Debezium will use the underlying Postgres logical decoding of pgoutput. Debezium will create a struct out of this of the form:

org.apache.kafka.connect.data.Struct

which is a Kafka Connect data structure that we pull in with the Camel Debezium component. It is worth noting the use of the in memory backing store for Debezium. Debezium offers a FileOffsetBackingStore as well which may more sense for persistent needs and a closer to production ready deployment.

The rest of the Camel route

                .log(LoggingLevel.INFO, "Incoming message ${body} with headers ${headers}")
                .convertBodyTo(TestEvent.class)
                .setHeader("CamelInfinispanKey", simple("${body.id}"))
                .setHeader("CamelInfinispanValue", simple("${body}"))
                .setHeader("CamelInfinispanOperation", simple(InfinispanOperation.PUTIFABSENT.toString()))
                .setHeader("eventPayload", simple("${body}"))
                .to("infinispan:pg.event")
                .setBody(simple("${header.eventPayload}"))
                .doTry()
                	.to("amqp:.dbevents?connectionFactory=connFact")
                .doCatch(IOException.class)
                	.log("** Exception thrown: ${exception.message} **")
                	.log("Body of the message: ${body}")
                	.throwException(new Exception("${exception.message}"))
                .endDoTry();
		

The rest of this route simply leverages our Camel stream handling capabilities by using a Camel Converter, storing the message to Infinispan, and inevitably emitting the message via an AMQP message sender all wired up by Camel. As Camel is an incredibly mature integration framework, nearly any number of transformation, service mediation, or other enteprise integration patterns may be leveraged at this point with literally hundreds of adapters.

While this code snippet would need a good deal of elaboration to be ready for a real CDC use case, it displays that creation of events (even those data ingress events that might originate from more traditional legacy OLTP approaches) need not be coupled to Kafka as a store.

AMQP 1.0 as the Event Mesh Transport

By maintaining Kafka as a central bus for our event mesh, we suffer from lack of the following capabilities:

* Event routing

While there are certainly a number of ways to route Kafka producers/consumers using proxies, load balancers, LTM/GTM approaches, etc. and other machinations, event producers and consumers are coupled to the underlying Kafka wire protocol and inevitably Kafka broker. This coupling implies a request/response contract with the underlying broker, and bakes in Kafka specific needs to event producers.

* Built-In Settlement

While Kafka clients can certainly be written to avail themselves of various QoS by employing various techniques, emitting events to Kafka as a central event bus implies that a client is bound to Kafka’s In Sync Replica conditions. While this might work well in a QoS 0 type of situation where a client does not block and continues processing while spinning out a thread to handle the interaction with Kafka, disposition of the event by the event receiver is still bound by Kafka’s broker level ISR configuration. In the case of many event types, this monolithic behaviour to determine disposition of the event emitted, is inappropriate for the event producers use case.

* Multi-Tenancy

Additionally, Kafka has no real notion of multi-tenancy making it imperfect for use as an event bus. While we can certainly use keys for our message payloads that imply multi-tenancy in our broker, by leveraging an event mesh such as Qpid Dispatch Router QPID Dispatch Router, we can create a truly multi-tenany event mesh. For more information on how to leverage a vhost so that policy may be applied in a multi-tenant fashion to our event mesh, please check out: Configuring Authorization and Vhosts in QDR

* Producer Flow Control

The greatest advantage of decoupling Kafka as a store from our event mesh, is that the Kafka wire protocol is immature in its capabilities. While we do have flow control provided us to by our Kafka brokers, this approach is ad hoc and happens after the fact. It is not until we have exhausted our in memory backing queue in the broker that we will have tripped thresholds for flow controlling producers.

As a result, while emmitting our events directly to Kafka neatly couples our events to our store, and we leverage Kafka Connect to tail the head of commit logs in a DB that allows us to create a single topic producer, we have no way of decoupling a particular message producer from all message producers in the way that they are being flow controlled. As our number of producers grow, Kafka presents us little capability to dissagregate these stream producers from the overall broker capacity.

AMQP 1.0 makes this capability a Layer 7 wire protocol capability between peers. For instance, if the AMQP event receiver that receives our AMQP event produced from this code sample is not capable of taking in messages, it will offer us 0 credit (AMQP 1.0 employs a credit cased flow control mechanism) and block our producers. This implies that an individual CDC emitter may be flow controlled by our AMQP event sink.

* Event Routing

While there are certainly a number of ways to route Kafka producers/consumers using proxies, load balancers, LTM/GTM approaches, etc. and other machinations, event producers and consumers are coupled to the underlying Kafka wire protocol and inevitably Kafka broker. This coupling implies a request/response contract with the underlying broker, and bakes in Kafka specific needs to event producers.




Monitoring MicroServices and Prometheus

Prometheus has established itself as mainstay for application monitoring, alerting, and safe keeping of our key operational behaviours by maintaining a time series store of metrics from our application containers. As its lightweight, and focuses on high availability, Prometheus when deployed to a container platform can provide a cloud native means to monitoring application runtimes in our containers.

For distributed integration platforms like Red Hat Fuse, this need becomes even greater as our MicroService based architectures may imply many pods choreograph according to the dictates of our integration which may span many integration endpoints across many pods. As a result, being able to asses health of our applications, operational behaviour, and longitudinal trending across our enterprise and act on those indicators becomes a mission critical factor in our MicroService orchestration in container platforms.

The Setup

It is worth noting, the following techniques as described in this article, do not require use of anything but Kubernetes (k8s), the CoreOS Prometheus Operator, and a Grafana instance.

For the purposes of this demonstration; however, we will use:

  • Minishift With the latest CDK 3.11.0 installed (OKD 3.11.0)
  • Red Hat Fuse Springboot Camel images (https://access.redhat.com/containers/?tab=images&platform=openshift#/registry.access.redhat.com/fuse7/fuse-java-openshift)
  • Productized resource templates from Red Hat for preparation of our Prometheus Operator (https://github.com/jboss-fuse/application-templates)
  • The Prometheus Operator as released by CoreOS (https://coreos.com/operators/prometheus/docs/latest/)

Minishift configuration

As our purposes are for demonstration only, we’ll use Minishift, and as our resources will be fairly lightweight, we’ll simply use a default profile for minishift and allow it to size itself accordingly. Please note, to extend this example out further, or to really get a better feel for what many microservices would look like in your container platform and monitored by Prometheus it is advised to establish more resources for the Minishift VM: https://docs.okd.io/latest/minishift/using/profiles.html

The Prometheus Operator

The Prometheus Operator leverages the Operator SDK “as a class of software that operates other software, putting operational knowledge collected by humans into software” (https://coreos.com/blog/introducing-operators.html). Through the use of the operator framework, the Prometheus Operator is able to install a Service Account (prometheus), install a replicated cluster of time series databases, and install itself to manage state of Prometheus over its lifecycle.

Prometheus CRD’s

Custom Resource Definitions (CRD’s) in Kubernetes are a means to extend out Kubernetes API resources to introduce into a project or cluster. The Prometheus Operator uses CRD’s as they are introduced as new resources into the cluster to perform operations such as adding service monitors for new types of applications, to configure rules to run against metrics collected from service monitors, or to configure alerts to send to the Prometheus Alert Manager.

For use of the prometheus operator, we’ll extend Kubernetes to install the following crd’s https://raw.githubusercontent.com/jboss-fuse/application-templates/master/fuse-prometheus-crd.yml:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: prometheusrules.monitoring.coreos.com
spec:
  group: monitoring.coreos.com
  names:
    kind: PrometheusRule
    listKind: PrometheusRuleList
    plural: prometheusrules
    singular: prometheusrule
  scope: Namespaced
  version: v1
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: servicemonitors.monitoring.coreos.com
spec:
  group: monitoring.coreos.com
  names:
    kind: ServiceMonitor
    listKind: ServiceMonitorList
    plural: servicemonitors
    singular: servicemonitor
  scope: Namespaced
  version: v1
--- 
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition 
metadata:
  name: prometheuses.monitoring.coreos.com
spec:    
  group: monitoring.coreos.com
  names:
    kind: Prometheus
    listKind: PrometheusList
    plural: prometheuses
    singular: prometheus
  scope: Namespaced
  version: v1
--- 
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: alertmanagers.monitoring.coreos.com
spec:
  group: monitoring.coreos.com
  names:
    kind: Alertmanager
    listKind: AlertmanagerList
    plural: alertmanagers
    singular: alertmanager
  scope: Namespaced
  version: v1

These resource definitions allow our Prometheus Operator to use Kubernetes resources to establish an imperative model for managing Kubernetes resources.

Installing the Prometheus CRD’s

As we will need a cluster admin role binding to install CRD’s across the cluster, we will login to our minishift cluster as follows:

oc login -u system:admin

We’ll create a new namespace in OpenShift for our Prometheus deployment

oc new-project prometheus-dev

We’ll use the Red Hat productized yaml resources to create the CRD’s and our Operator; however, it is worth noting, that these yaml templates do not require use of Red Hat software, and simply install the CoreOS Operator. As result, we will issue the following command to the oc cluster:

oc create -f https://raw.githubusercontent.com/jboss-fuse/application-templates/master/fuse-prometheus-crd.yml

This will install the above custom resources and enable our Prometheus Operator to install Prometheus and other operational capabilities Prometheus requires over its lifecycle. To install the Prometheus Operator we’ll apply another Red Hat Fuse yaml template which will introduce a Service Account for our Prometheus runtime, a Service Account for the Prometheus operator, and everything required to install Prometheus.

oc process -f https://raw.githubusercontent.com/jboss-fuse/application-templates/master/fuse-prometheus-operator.yml | oc create -f -

After a couple of minutes (all depending), we should see a few things installed into the “prometheus-dev” project in our cluster:

[mcostell@work /]$ oc get pods -w 
NAME                                  READY     STATUS    RESTARTS   AGE
prometheus-operator-8586c4688-8m2kk   1/1       Running   0          4h
prometheus-prometheus-0               3/3       Running   1          4h

We’ll also see 2 new Service Accounts installed in the “prometheus-dev” namespace:

[mcostell@work /]$ oc get serviceaccounts -n prometheus-dev
NAME                  SECRETS   AGE
builder               2         4h
default               2         4h
deployer              2         4h
prometheus            2         4h
prometheus-operator   2         4h

At this point, we’ll also notice the Prometheus Operator has installed a route for the Prometheus console. Upon issuing this command:

[mcostell@work /]$ oc get routes -n prometheus-dev
NAME         HOST/PORT                                         PATH      SERVICES     PORT      TERMINATION   WILDCARD
prometheus   prometheus-prometheus-dev.192.168.42.118.nip.io             prometheus   <all>                   None

Will deliver the following Prometheus Console that currently does not have anything under management.

Prometheus Console

Creating a Fuse based Application To Monitor

TODO: Complete This Section

Configuring ServiceMonitors

So at this point, we have a functional Prometheus cluster as launched by the Prometheus Operator; however, we’re not actually monitoring anything. If we recall, we installed a set of CRD’s for the Prometheus Operator’s use, in prior steps. Let’s go ahead and create a Kubernetes resource based on the following CRD we installed into the cluster earlier:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: servicemonitors.monitoring.coreos.com
spec:
  group: monitoring.coreos.com
  names:
    kind: ServiceMonitor
    listKind: ServiceMonitorList
    plural: servicemonitors
    singular: servicemonitor
  scope: Namespaced
  version: v1

As we’re interested in our Fuse application created earlier, we’ll create a Service Monitor resource accordingly using the following productized Red Hat template (again, its worth noting, there is nothing specific to Red Hat in this template, only the CoreOS operator. Templates like these should likely be extended to accomodate the different use cases some of which we’ll discuss below).

The Red Hat template (found at: https://github.com/jboss-fuse/application-templates/blob/master/fuse-servicemonitor.yml) paramaterizes creation of Service and Serivce Monitor Object resources in Openshift/K8s:

apiVersion: template.openshift.io/v1
kind: Template
metadata:
  name: fuse-prometheus-service-monitor
  labels:
    app: fuse-prometheus-service-monitor
  annotations:
    openshift.io/display-name: "Red Hat Fuse service-monitor install"
    openshift.io/provider-display-name: "Red Hat, Inc."
    description: "A ServiceMonitor specifies how groups of services should be monitored - this template defines how to monitor a Fuse application for Prometheus."
    tags: "fuse,prometheus,prometheus-operator,monitoring"
    iconClass: "icon-rh-integration"
    version: "1.0"
message: |-
  prometheus-operator is now deployed to ${NAMESPACE}
parameters:
- name: NAMESPACE
  displayName: Namespace
  value: fuse
  required: true
  description: Namespace in which the prometheus-operator is installed. 
- name: FUSE_SERVICE_NAME
  displayName: Fuse Service Name
  value: 'myservicename'
  required: true
  description: The service name of the Fuse application to monitor.
- name: FUSE_SERVICE_TEAM
  displayName: Fuse Service Team
  value: 'fuse'
  required: true
- name: ENDPOINT_PORT
  displayName: Endpoint port
  value: 'web'
  required: true

objects:
#
# OpenShift resources 
#
- apiVersion: monitoring.coreos.com/v1
  kind: ServiceMonitor
  metadata:
    name: ${FUSE_SERVICE_NAME}
    namespace: ${NAMESPACE}
    labels:
      team: ${FUSE_SERVICE_TEAM}
  spec:
    selector:
      matchLabels:
        app: ${FUSE_SERVICE_NAME}
    endpoints:
    - port: ${ENDPOINT_PORT}

- apiVersion: v1
  kind: Service
  metadata:
    name: ${FUSE_SERVICE_NAME}
    namespace: ${NAMESPACE}
    labels:
      app: ${FUSE_SERVICE_NAME}
  spec:
    selector:
      app: ${FUSE_SERVICE_NAME}
    ports:
    - name: ${ENDPOINT_PORT}
      port: 9779  

As we mentioned earlier when installing our Fuse Rest Application, the Red Hat Fuse image we used to build our Rest service using Springboot and Apache Camel provides us with an underlying prometheus yaml, installs a prometheus jmx srcaper into our runtime, and provides us with a uri for Prometheus to interrogate to collect metrics.

As we setup our Serivce Monitor objects, we’ll notice that our Service Objects that represent our Fuse sample Rest service should expose port 9779 along with other ports needed for K8s to interrogate its health, and whatever port our sample application serves up http traffic for the Rest service it provides. It is also critical that they have the correct labels for our Service Monitor object to select the Service Object and put it under monitoring of Prometheus.

With the above configuration of our Service Monitor objects, we will be pointing our