Cloud Native Integration: A working Demo
This repository contains a working demonstration of how to quickly implement a Cloud Native architecture approach (with Enterprise Integration patterns as a central concern) as distilled in What is Cloud Native Integration.
Demo Description
As we distill the high level system architecture as described by the Cloud Native Integration document
We notice a few distinct architectural layers:
Event Mesh The event mesh intends to handle peer to peer event communication in a fashion that allows for several Cloud Native characteristics such as high availability, reliability, and location agnostic behaviour between peers. The event mesh acts as a rendezvous point between eventing peers (event emitters and event receivers), and provides event emitters a graph of event receivers that may span clusters or even clouds.
Event Sink The event sink provides a port to the underlying event bus our integrations process events on.
Event Bus The event bus provides a service bus so that event processors, that often need to integrate multiple different data sources to provide event aggregate level output, are able to communicate with each other in an asynchronous fashion. This decouples event emitters and receivers, as well as decoupling event aggregate behaviour that distills our series of events into meaningful business level data. As a result, integrations are bound to their domain, and are likely decomposed along their bounded context. It is along the event bus that features that constitute our enterprise perform the stream processing and integration work that satisfies enterprise features. Inevitably, these stream processors and integration components aggregate events into sources of truth to maintain consistency and state across the enterprise.
Event Store The event store represents a persistent source of truth for events and event aggregates. We define aggregate event as events that attempt to provide consistency boundaries for transactions, distributions and concurrency. In our view, while events may be represented in a transitive store such as Kafka, event aggregates define consistency across our data plane and represent a conceptual whole for state at a point in time for our domain entitties. As a result, our event store should be viewed as a conceptual whole that can offer capabilities as a source of truth for volatile, short term events, and a source of truth for long standing event aggregates.
Representing these Ports, Adapters and Layers
This demo seeks to display these architectural techniques by leveraging the Red Hat Integration platform.
Event Mesh To provide event mesh capabilities, this demo leverages Apache Qpid Dispatch Router to ensure a service and event communication control plane that provides capabilities for high availability, resilience, multi and hybrid cloud as well as policy to apply over the mesh to ensure governance is properly applied as our event emitters and event receivers communicate in a peer to peer fashion
Event Sink To provide an event sink as a port into our underlying business logic and sources of truth, we leverage Apache Camel and a set of complementary cloud native tooling
Event Bus The event bus provides a normalized means of asynchronous behaviour for event consumers and emmitters. As Cloud Native Integration depends on and features cloud native capabilities, we leverage Knative Eventing to provide a cloud native service bus abstraction with Apache Kafka as the underlying persistence engine for our communication over our service bus channels.
Event Store As Apache Kafka is the persistence store for our volatile events and event aggregates that travel along our service bus, this demonstration relies on a traditional OLTP store as the inevitable source of truth for event aggregates. While an OLTP store is not required as a source of truth for event aggregates in our view of the world, it describes the complementary nature of Cloud Native Integration to traditional legacy enterprise deployments.
Getting Started
Quick Note: This demonstration uses Openshift 4.x which relies on Kubernetes 1.16 and above. As a result, despite the use of Operator Hub and Red Hat disitributed Operators, the steps outlined in this document may be followed in any Kubernetes distro that is 1.16 or higher and a matching community version of the operators being deployed
Installing Operators
From the Openshift Operator Hub, install the following operators (in our case, we’ll install these operators with cluster admin credentials, and will allow these operators to observe all namespaces).
- Red Hat Integration - AMQ Streams
- Red Hat Integration - AMQ Certificate Manager
- Red Hat Integration - AMQ Interconnect (this operator will need to be installed in multiple namespaces due to its limitation of watching a single K8s namespace)
- Openshift Serverless Operator
- Camel K Operator
We should now find Operators running in the OpenshiftOperators namespace:
Preparing For Deployment
At this point, we have installed the required operators; however, our environment still isn’t ready to start laying down deployments.
Install Knative Serving
To install Knative Serving which provides our serverless framework by the OpenshiftServerless Operator, there are a few more steps.
- Create the knative-serving namespace with a user that posseses the cluster-admin role:
oc create namespace knative-serving
- Apply the knative serving yaml to the cluster as described in the following
apiVersion: operator.knative.dev/v1alpha1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
Upon succesfful installation, there should be a similar result to the following:
oc get pods -w -n knative-serving
NAME READY STATUS RESTARTS AGE
activator-7db4dc788c-spsxb 1/1 Running 0 1m
activator-7db4dc788c-zqnnd 1/1 Running 0 45s
autoscaler-659dc48d89-swtmn 1/1 Running 0 59s
autoscaler-hpa-57fdfbb45c-hsd54 1/1 Running 0 49s
autoscaler-hpa-57fdfbb45c-nqz5c 1/1 Running 0 49s
controller-856b4bd96d-29xlv 1/1 Running 0 54s
controller-856b4bd96d-4c7nn 1/1 Running 0 46s
kn-cli-downloads-7558874f44-qdr99 1/1 Running 0 1m
webhook-7d9644cb4-8xkzt 1/1 Running 0 57s
Install Knative Eventing
Knative Eventing provides functionality around our Cloud Event abstraction and forms the operational basis of our cloud native service bus.
To install Knative Eventing we need to perform the following steps (with a cluster admin user):
- Create the knative-eventing namespace
oc create namespace knative-eventing
Upon creation of the namespace, we will want to create the knative eventing operators by applying the following cr
In our case, as we’ll have need later, we’ll install the Multi Tenant Channel Based Broker as our default Knative Eventing Broker implementation:
apiVersion: operator.knative.dev/v1alpha1
kind: KnativeEventing
metadata:
name: knative-eventing
namespace: knative-eventing
spec:
defaultBrokerClass: MTChannelBasedBroker
Upon applying this yaml, something similar to this should be true:
oc get pods -w -n knative-eventing
NAME READY STATUS RESTARTS AGE
broker-controller-67b56668bd-sgxwg 1/1 Running 0 1m
eventing-controller-544dc9945d-pz2cl 1/1 Running 0 1m
eventing-webhook-6c774678b5-lzfkn 1/1 Running 0 1m
imc-controller-78b8566465-smpb7 1/1 Running 0 1m
imc-dispatcher-57869b44c5-s7t92 1/1 Running 0 1m
At this point, we have installed the knative eventing controllers, dispatchers and webhooks; however, we only have support for the in memory channel, which means we will not be able to use a persistent approach to knative eventing brokers and their respective channels in our cluster.
Installing Kafka Channels
In our above Cloud Native Integration schematic, what lies at the heart of our event bus is Apache Kafka so for knative eventing to follow this architectural construct, we need our broker channels to be persisted by Kafka. While this component is not generally available, it has reached a considerable maturity level and relies largely on its surrounding ecosystem that is generally available.
In this demonstration, we will deploy a KafkaChannel deployment that creates a set of CR’s, controllers for our channels, requisite service accounts, and webhooks to instrument creation, removal and discovery of KafkaChannels that may be associated with a Knative eventing Broker.
By issuing the following command (this assumes cluster admin permissions for the user issuing commands and that the previous knative-eventing steps have taken place):
oc apply -f ./src/main/install/kafka-channel/kafka-channel-install.yaml
clusterrole.rbac.authorization.k8s.io/kafka-addressable-resolver created
clusterrole.rbac.authorization.k8s.io/kafka-channelable-manipulator created
clusterrole.rbac.authorization.k8s.io/kafka-ch-controller created
serviceaccount/kafka-ch-controller created
clusterrole.rbac.authorization.k8s.io/kafka-ch-dispatcher created
serviceaccount/kafka-ch-dispatcher created
clusterrole.rbac.authorization.k8s.io/kafka-webhook created
serviceaccount/kafka-webhook created
clusterrolebinding.rbac.authorization.k8s.io/kafka-ch-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-ch-dispatcher created
clusterrolebinding.rbac.authorization.k8s.io/kafka-webhook created
customresourcedefinition.apiextensions.k8s.io/kafkachannels.messaging.knative.dev created
configmap/config-kafka created
configmap/config-leader-election-kafka created
service/kafka-webhook created
deployment.apps/kafka-ch-controller created
mutatingwebhookconfiguration.admissionregistration.k8s.io/defaulting.webhook.kafka.messaging.knative.dev created
validatingwebhookconfiguration.admissionregistration.k8s.io/validation.webhook.kafka.messaging.knative.dev created
secret/messaging-webhook-certs created
deployment.apps/kafka-webhook created
We should now see new contoller, dispatcher, and webhook pods in our knative-eventing namespace:
NAME READY STATUS RESTARTS AGE
broker-controller-67b56668bd-sgxwg 1/1 Running 0 1h
eventing-controller-544dc9945d-pz2cl 1/1 Running 0 1h
eventing-webhook-6c774678b5-lzfkn 1/1 Running 0 1h
imc-controller-78b8566465-smpb7 1/1 Running 0 1h
imc-dispatcher-57869b44c5-s7t92 1/1 Running 0 1h
kafka-ch-controller-7f88b8c776-crhm4 1/1 Running 0 1m
kafka-webhook-b47dc9767-hmnm4 1/1 Running 0 1m
We should also now have a resource that describes resources that will inevitably describe how channels are bound to Kafka topics in our Knative Eventing framework.
oc api-resources | grep messaging.knative.dev
channels ch messaging.knative.dev true Channel
inmemorychannels imc messaging.knative.dev true InMemoryChannel
kafkachannels kc messaging.knative.dev true KafkaChannel
subscriptions sub messaging.knative.dev true Subscription
If we have gotten this far we have most of our infrastructure assembled from an operator perspective and now its time to being installing our concrete implementation.
Installing the Demo
At this point our operators are installed and we have mostly configured our environment; however, we still have a few house keeping tasks to take care of:
- We need to create and install a trust store so that we may communicate to and between clusters in a secure fashion
- We need to ensure proper trust and authority is distributed to the correct places in our cluster
Using the AMQ Certificate Manager Operator
We will use the cert-manager operator that we’ve previously provisioned to issue certificates that we’ll need to wire secure connections across the cluster.
Creating a CA to use across the cluster
For our puroposes we will create a Certificate Authority using OpenSSL. Initially, the following command should be issued to generate the private key for our CA
openssl genrsa -des3 -out cloudEventMeshDemoCA.key 2048
OpenSSL will prompt for a passphrase. It is recommended to use a passphrase even in a development environment, and especially when cluster resources may be accessible from outside of the cluster.
We’ll use the key to create a root certificate that will act as our certificate authority:
openssl req -x509 -new -nodes -key cloudEventMeshDemoCA.key -sha256 -days 1825 -out cloudEventMeshDemoCA.crt
At this point, you will be asked for a passphrase for again, and as always, it is apropos to use one and not skip this step. While creating this CA OpenSSL will ask for OU’s, DN’s, etc., and it may be important to use meaningful values for these as it may be required during later configuration.
Upon completing this process, we will now have an available CA to sign with, and we’ll use the AMQ distribution of the certificate manager to establish our CA as a certificate issuer across the cluster.
For convenience sake, we have included a secret in this demo that we should apply to where our certificate manager operator lives (in our case the project openshift-operators)
oc apply -f ./src/main/k8s/CA/cloud/cloud-native-event-mesh-ca-secret.yaml
This could also be accomplished by creating a secret from the the CA private and public key created above.
Now that we have created our CA keypair secret, let’s create a certificate manager issuer that uses our CA:
apiVersion: certmanager.k8s.io/v1alpha1
kind: ClusterIssuer
metadata:
name: cloud-native-event-mesh-demo-cert-issuer
spec:
ca:
secretName: cloud-native-event-mesh-demo-ca-pair
Upon completing our application of the clusterissuer resource we should have a cluster issuer.
oc describe clusterissuer cloud-native-event-mesh-demo-cert-issuer
Name: cloud-native-event-mesh-demo-cert-issuer
Namespace:
Labels: <none>
Annotations: kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"certmanager.k8s.io/v1alpha1","kind":"ClusterIssuer","metadata":{"annotations":{},"name":"cloud-native-event-mesh-demo-cert-issuer","name...
API Version: certmanager.k8s.io/v1alpha1
Kind: ClusterIssuer
Metadata:
Creation Timestamp: 2020-06-29T17:11:47Z
Generation: 2
Resource Version: 1485153
Self Link: /apis/certmanager.k8s.io/v1alpha1/clusterissuers/cloud-native-event-mesh-demo-cert-issuer
UID: 90035a64-67c1-4440-9526-a87d1297bfa2
Spec:
Ca:
Secret Name: test-key-pair
Status:
Conditions:
Last Transition Time: 2020-06-29T17:11:47Z
Message: Signing CA verified
Reason: KeyPairVerified
Status: True
Type: Ready
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal KeyPairVerified 8s (x2 over 8s) cert-manager Signing CA verified
We are now free to issue certificates in our cluster. This will be critical to setting up our next steps, the event mesh.
Installing the Event Mesh
Our event mesh will span three different namespaces in our cluster; however, our intention is to logically represent three seperate clusters in our topology. As a result, we will need to create trust in each of these namespaces for the other routers in our event mesh, as, we would not allow insecure communication from cluster to cluster.
Creating the namespaces
At this point we will wand to create the following namespaces:
- cluster-1
- cluster-2
- edge - this will represent an edge cluster in our topology. While some topologies may not call for an edge cluster, we still want to ensure that we use an edge router somewhere in our openshift cluster to ensure that we have connection concentration, a single source of policy application to incoming requests from outside of the cluster, as well as a terminal leaf node for our event mesh graph.
Installing the Interconnect Router in cluster-1
As we have use of the operator hub in Openshift 4, we will simply install an Interconnect Operator to the namespace “cluster-1”.
At this point, we will be able to start creating some certificates from the cluster issuer we have already established and inevitably configure our Interconnect router for trust. In the namespace “cluster-1”, we will provision a certificate for our Interconnect router which we will use to wire up the Interconnect router for trust across inter-router connections.
Please Note: The interconnect router has self-signed a certificate using the certificate manager, and the demonstrated use of certificate management here is only applicable to inter-router conncetions
Initially, we’ll lay down the certificate request custom resource:
oc apply -f ./src/main/k8s/cloud-1/router/cloud1-certificate-request.yaml
This certificate request:
apiVersion: certmanager.k8s.io/v1alpha1
kind: Certificate
metadata:
name: cluster-wide-tls
spec:
secretName: cluster-wide-tls
commonName: openshift.com
issuerRef:
name: cloud-native-event-mesh-demo-cert-issuer
kind: ClusterIssuer
dnsNames:
- openshift.com
- eipractice.com
- opentlc.com
Will leverage the certificate manager to create a secret in the cluter referred to as “cluster-wide-tls” which holds the CA certificate, private key, and other things to ensure trust as issued by the ClusterIssuer we have created above.
Upon issuing the certificate, it is now time to apply our Interconnect router custom resource for the cluster-1 namespace:
oc apply -f ./src/main/k8s/cloud-1/router/cloud1-mesh-router.yaml
This enables a few features as laid out by the Interconnect custom resource:
apiVersion: interconnectedcloud.github.io/v1alpha1
kind: Interconnect
metadata:
name: cloud1-router-mesh
spec:
sslProfiles:
- name: inter-router
credentials: cluster-wide-tls
caCert: cluster-wide-tls
deploymentPlan:
role: interior
size: 1
placement: AntiAffinity
interRouterListener:
sslProfile: cloud1-router-tls
expose: false
authenticatePeer: false
port: 55671
This creates an SSL Profile based on our CA certificate as issued to us by the cluster issuer as well as establishes an interRouterListener backed by this SSL Profile. The router will also configure other things by default, and with self-signed security, such as:
- A default AMQP listener secured by a self-signed certificate
- Metrics and management capabilities
- A means for Prometheus or other AMP technologies to observe the event mesh router as well as the link attachments that event peers make over the event mesh
At this point, in cluster-1 we will have both an event mesh router and the Interconnect operator in our “cluster-1” namespace:
[mcostell@work router]$ oc get pods -w -n cluster-1
NAME READY STATUS RESTARTS AGE
cloud1-router-mesh-7f698d8c65-wpnx7 1/1 Running 0 13m
interconnect-operator-56b7884d4-6j4jl 1/1 Running 0 5h
oc get interconnects
NAME AGE
cloud1-router-mesh 17m
We can also introspect the router via oc exec commands:
[mcostell@work router]$ oc exec cloud1-router-mesh-7f698d8c65-wpnx7 -i -t -- qdstat -g
2020-06-29 23:23:49.882216 UTC
cloud1-router-mesh-7f698d8c65-wpnx7
Router Statistics
attr value
========================================================================================
Version Red Hat AMQ Interconnect 1.8.0 (qpid-dispatch 1.12.0)
Mode interior
Router Id cloud1-router-mesh-7f698d8c65-wpnx7
Worker Threads 4
Uptime 000:00:22:17
VmSize 497 MiB
Area 0
Link Routes 0
Auto Links 0
Links 2
Nodes 0
Addresses 11
Connections 1
Presettled Count 0
Dropped Presettled Count 0
Accepted Count 24
Rejected Count 0
Released Count 0
Modified Count 0
Deliveries Delayed > 1sec 0
Deliveries Delayed > 10sec 0
Deliveries Stuck > 10sec 0
Deliveries to Fallback 0
Links Blocked 0
Ingress Count 24
Egress Count 23
Transit Count 0
Deliveries from Route Container 0
Deliveries to Route Container 0
Installing the Interconnect Router in cluster-2
Now that we have an event mesh router in cluster-1, we will link routers together to form an event mesh between cluster-1 and cluster-2. Please Note - the intention of this logical delineation is to represent 2 seperate clusters. In practice, interior event mesh routers would bind remote clusters together
The process of enabling inter-router connections between event mesh routers in namespaces cluster-1 and cluster-2 is similar to the provisioning that was required for the cluster-1 event mesh router.
Initially, we want to install the Red Hat - Interconnect Operator in the namespace cluster-2. As cluster-2 will also issue its certificates from the cluster-issuer provisioned previously in the demo, upon installation of the Interconnect Operator, we will provision a CA certificate for cluster-2 via a certificate request from the ClusterIssuer.
oc apply -f src/main/k8s/cloud-2/router/cloud2-certificate-request.yaml
This certificate request:
apiVersion: certmanager.k8s.io/v1alpha1
kind: Certificate
metadata:
name: cluster-wide-tls
spec:
secretName: cluster-wide-tls
commonName: openshift.com
issuerRef:
name: cloud-native-event-mesh-demo-cert-issuer
kind: ClusterIssuer
dnsNames:
- openshift.com
- eipractice.com
- opentlc.com
Will again provision our CA into a secret into the cluster named “cluster-wide-tls”.
Upon the certificate manager cluster issuer issuing a CA into cluster-2, we can lay down the Interconnect resource that will enable our cluster-1 and cluster-2 event meshing.
oc apply -f src/main/k8s/cloud-2/router/cloud2-certificate-request.yaml
The router provisioned takes mostly default values for the Interconnect configuration; however, does create an interior router connection to the event mesh router in cluster-1:
apiVersion: interconnectedcloud.github.io/v1alpha1
kind: Interconnect
metadata:
name: cloud2-router-mesh
spec:
sslProfiles:
- name: inter-cluster-tls
credentials: cluster-wide-tls
caCert: cluster-wide-tls
interRouterConnectors:
- host: cloud1-router-mesh.cluster-1.svc
port: 55671
verifyHostname: false
sslProfile: inter-cluster-tls
Upon succesful provisioning of the router in cluster-2 we should be able to see a successfull interior router connection between the routers in cluster-1 and cluster-2:
oc exec cloud2-router-mesh-d566476ff-msdrr -i -t -- qdstat -c
2020-06-29 23:37:04.652856 UTC
cloud2-router-mesh-d566476ff-msdrr
Connections
id host container role dir security authentication tenant last dlv uptime
=================================================================================================================================================================================================
2 cloud1-router-mesh.cluster-1.svc:55671 cloud1-router-mesh-7f698d8c65-wpnx7 inter-router out TLSv1/SSLv3(DHE-RSA-AES256-GCM-SHA384) x.509 000:00:00:00 000:00:00:54
9 127.0.0.1:33562 ae807937-90d0-44d7-8f7b-afdc14f5c47a normal in no-security no-auth 000:00:00:00 000:00:00:00
Installing the Edge Interconnect Router
At this point we have a mesh of Interconnect routers in cluster-1 and cluster-2; however, to properly be able to scale our router network and provide a connection concentrator for our eventing applications, we will want to establish a member of our event mesh to have the role of “edge” in our cluster. Edge routers act as connection concentrators for messaging applications. Each edge router maintains a single uplink connection to an interior router, and messaging applications connect to the edge routers to send and receive messages.
Initially, we will to create a namespace named “edge”. Please Note: in practice, our edge router would likely live in a seperate cluster meant to handle edge use cases, and would be seperate from cluster-1 and cluster-2.
Upon succesfull creation of the namespace, it is neccessarry to install the Interconnect Operator into this namespace. This will allow us to provision our edge router resource.
Initially, as with our other logical clusters we will provision our edge namespace with a CA certificate:
oc apply -f src/main/k8s/edge/edge-certificate-request.yaml
Upon creating our cluster-wide-tls secret in the namespace, we’ll provision our Interconnect Router which will have uplinks to both cluster-1 and cluster-2:
oc apply -f src/main/k8s/edge/edge-routers.yaml
This will provision our edge event mesh router, to perform a role as a terminal node in our event mesh.
apiVersion: interconnectedcloud.github.io/v1alpha1
kind: Interconnect
metadata:
name: edge-routers
spec:
sslProfiles:
- name: edge-router-tls
credentials: cluster-wide-tls
caCert: cluster-wide-tls
deploymentPlan:
role: edge
placement: AntiAffinity
edgeConnectors:
- host: cloud1-router-mesh.cloud-1
port: 45672
name: cloud1-edge-connector
- host: cloud2-router-mesh.cloud-2
port: 45672
name: cloud2-edge-connector
listeners:
- sslProfile: cluster-wide-tls
authenticatePeer: false
expose: true
http: false
port: 5671
If we were to peruse the Interconnect console for one of our clusters, we would be able to see our two interior routers fronted by a single edge router:
Installing the Event Bus
Cloud Native Integration creates an event mesh through which event emitters and receivers are able to negotiate with each other, get guarantees around delivery, and ensure proper communication flow via Interconnects wire level flow control capabilities.
While the event mesh serves to provide a communications control plane for event emitters and receivers, through which event level qos can be applied to its relevant use cases, and a reliable graph of receivers for emitted events, Cloud Native Integration introduces a complementary persistent event bus, as an event sink for event stream processing and integration from the event mesh. This event bus provides a persistent source of truth for event stream processors, and allows event receivers to take advantage of platform capabilities such as elasticity, contractual communication, and extend those capabilities into serverless capabilities such as scaling to zero and other highly elastic behaviour.
To accomplish this Cloud Native Integration relies on Knative Eventing, and Knative Serving from the Openshift Serverless Operator that we deployed earlier.
As “Knative Eventing” proposes a pub-sub architecture with per namespace Brokers and subscriptions as described by:
It will be neccessarry to provision a persistent source of truth for our Knative event brokers. Cloud Native Integration proposes the use of AMQ Streams as this persistent source of truth as it offers two features that uniquely assist in our pub-sub abstraction:
- A co-ordinated distributed log that replicates across distinct physical parts of an Openshift cluster
- A means of providing distributed log partitions to ephemeral consumer groups via the Knative channels abstraction and a Kafka Topic implementation
As the Integrations that handle our events consume from our underlying event bus via Knative subscriptions to Knative event channels, AMQ Streams provides a cloud native means to persist these events:
Provisioning AMQ Streams
For our use case in a single cluster, we will simply provision a single AMQ Streams cluster for both cluster-1 and cluster-2 event bus consumers; however, in practice, it would be apropos to at minimum extend the mult-tenant features of this demo to ensure the use of distinct/multi-tenant physical Kafka clusters.
As we have already installed the AMQ Streams Operator cluster wide, we will simply create a namespace for our AMQ Streams cluster to live in. Let’s call it “amq-streams”.
oc create namespace amq-streams
Upon succesful creation of the namespace, we will apply our AMQ Streams custom resource to create an AMQ Streams cluster in the namespace called “small-event-cluster”:
oc apply -f src/main/k8s/cloud-1/kafka/small-event-cluster.yaml
Upon succesfful creation of our resources, we should see our “amq-streams” cluster with some new pods:
oc get pods -w -n amq-streams
NAME READY STATUS RESTARTS AGE
small-event-cluster-entity-operator-7cb59948f7-r22kp 3/3 Running 0 33s
small-event-cluster-kafka-0 2/2 Running 1 1m
small-event-cluster-zookeeper-0 1/1 Running 0 2m
small-event-cluster-zookeeper-1 1/1 Running 0 2m
small-event-cluster-zookeeper-2 1/1 Running 0 2m
At this point we have provisioned AMQ Streams but we need to install Knative Eventing and Knative Serving so that AMQ Streams is the backbone of our enterprise service bus abstraction.
Provisioning Kafka Channels using the Knative Eventing Broker
As we have already installed the Openshift Serverless Operator in a previous step, we are now free to leverage the Kafka Channel configuration we created earlier to provision our Knative evening abstraction channel as a Kafka topic.
Let’s initially ensure we have configured our knative eventing operator’s webhooks to wire up for default behaviour by applying several configmaps:
oc apply -f ./src/main/k8s/config-maps/knative-eventing/ -n knative-eventing
This will wire up our broker and channel controllers to have a default broker configuration:
kind: ConfigMap
apiVersion: v1
metadata:
name: config-br-defaults
namespace: knative-eventing
data:
default-br-config: |
clusterDefault:
brokerClass: MTChannelBasedBroker
apiVersion: v1
kind: ConfigMap
name: kafka-channel
namespace: knative-eventing
namespaceDefaults:
test-tenant:
brokerClass: MTChannelBasedBroker
apiVersion: v1
kind: ConfigMap
name: imc-channel
namespace: knative-eventing
At this point, we will label our respective cluster-1 and cluster-2 namespaces for injection by our Knative Eventing Operator.
oc label namespace cluster-1 knative-eventing-injection=enabled
oc label namespace cluster-2 knative-eventing-inection=enabled
Initially, in the namespace *cluster-1* we'll provision Knative eventing *Channel* custom resources:
Upon labeling these namespaces, we should see broker knative-eventing broker pods in our namespace (for instance in namespace cluster-1):
oc get pods -w -n cluster-1
NAME READY STATUS RESTARTS AGE
cloud1-router-mesh-7f698d8c65-xft27 1/1 Running 0 1h
default-broker-filter-b5967fd6c-rs8vm 1/1 Running 0 13s
default-broker-ingress-6d4c6474c8-9gd4m 1/1 Running 0 13s
interconnect-operator-5684994fbf-587dk 1/1 Running 0 1h
We should also be able to interrogate our AMQ Streams broker and see that we now have topics for our trigger based subscriptions in our default brokers in namespaces that have been injected:
oc exec small-event-cluster-kafka-0 -c kafka -it -- bin/kafka-topics.sh --zookeeper localhost:2181 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
knative-messaging-kafka.cluster-1.default-kne-trigger
knative-messaging-kafka.cluster-2.default-kne-trigger
At this point, our default brokers in namespace are wired for use of a default channel, a Kafka Channel, and we can begin wiring up our knative brokers for the channels we would like to interact with:
Let’s start by creating a channel in cluster-1 that we will use during our demo:
oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbevents-channel.yaml -n cluster-1
If we interrogate K8s about this resource, we’ll see that our testing-dbevents channel is in fact a kafka channel:
oc describe channels testing-dbevents
Name: testing-dbevents
Namespace: cluster-1
Labels: <none>
Annotations: kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"messaging.knative.dev/v1beta1","kind":"Channel","metadata":{"annotations":{},"name":"testing-dbevents","namespace":"cluster-1"}}
messaging.knative.dev/creator=opentlc-mgr
messaging.knative.dev/lastModifier=opentlc-mgr
API Version: messaging.knative.dev/v1beta1
Kind: Channel
Metadata:
Creation Timestamp: 2020-07-07T02:00:56Z
Generation: 1
Resource Version: 1628649
Self Link: /apis/messaging.knative.dev/v1beta1/namespaces/cluster-1/channels/testing-dbevents
UID: c0f05d96-5e8d-46b5-862f-adcb2a2a8df1
Spec:
Channel Template:
API Version: messaging.knative.dev/v1alpha1
Kind: KafkaChannel
Spec:
Status:
Address:
URL: http://testing-dbevents-kn-channel.cluster-1.svc.cluster.local
Channel:
API Version: messaging.knative.dev/v1alpha1
Kind: KafkaChannel
Name: testing-dbevents
Namespace: cluster-1
Conditions:
Last Transition Time: 2020-07-07T02:00:56Z
Status: True
Type: Ready
Observed Generation: 1
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal ChannelReconciled 1m (x4 over 1m) channel-controller Channel reconciled: "cluster-1/testing-dbevents"
We can also interrogate the cluster, to list our kafka channels:
oc get kafkachannels -n cluster-1
NAME READY REASON URL AGE
default-kne-trigger True http://default-kne-trigger-kn-channel.cluster-1.svc.cluster.local 53m
testing-dbevents True http://testing-dbevents-kn-channel.cluster-1.svc.cluster.local 5m54s
At this point, we’ll go ahead and apply the rest of our channel resources to further prepare our Openshift cluster for the demo:
oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbeventaggregate-channels.yaml -n cluster-1
channel.messaging.knative.dev/testing-dbeventaggregate created
oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbevents-channel.yaml -n cluster-2
channel.messaging.knative.dev/testing-dbevents created
oc apply -f ./src/main/k8s/cloud-1/eventing/testing-dbeventaggregate-channels.yaml -n cluster-2
channel.messaging.knative.dev/testing-dbeventaggregate created
We should see our full complement of channels if we interrogate one of our namespaces:
oc get channels -n cluster-1
NAME READY REASON URL AGE
testing-dbeventaggregate True http://testing-dbeventaggregate-kn-channel.cluster-1.svc.cluster.local 17m
testing-dbevents True http://testing-dbevents-kn-channel.cluster-1.svc.cluster.local 25m
Installing the Event Sink
As we have previously installed the Red Hat - Camel K Operator, we can leverage Camel-K to run Apache Camel based integrations as AMQP based event receivers along our event mesh, and ultimately, act as a sink for our Knative Eventing based event bus.
Using the Kamel CLI
While this document will not describe the installation of the Kamel CLI, it is well described here: https://camel.apache.org/camel-k/latest/installation/openshift.html
Please note: this demo will not attempt to demonstrate proper CICD procedures for use of Camel-K; however, please stay tuned to this series for Cloud Native Integration: Continuous Integration and Deployment. It is also worth noting the following Apache Camel example: https://camel.apache.org/camel-k/latest/tutorials/tekton/tekton.html
In the meantime, let’s use the Kamel CLI to install our event sink for one of the channels we have installed into our namespace cluster-1 - testing-dbevents:
kamel run -n cluster-1 --trait deployer.kind=deployment --trait container.limit-cpu=500m --trait container.limit-memory=512Mi --trait container.name=cloudnative-integration ./src/main/java/io/entropic/integration/examples/eventmesh/AMQPSinkIntegration.java
Upon issuing this CLI command, the Camel-K Operator will do a few things:
- It will create a build kit
- It will create a builder pod
- It will pull resources, from the Camel-K catalogue that match a specific version of Apache Camel. This means our developers will not need to labouriously configure underlying camel component resources!
- It will create an Integration deployment based on a set of traits that we use (in our case some requests and limits as well as a deployment type)
- It will care and feed for our integration, and provide us a path towards integration with other systems such as our knative eventing event bus, API Management via 3Scale, as well as other Ingress controller and Service Mesh features through the use of Istio
Right off the bat, we see a few things in our namespace cluster-1:
oc get pods -w -n cluster-1
NAME READY STATUS RESTARTS AGE
amqp-sink-integration-64dc6fcd4d-szl4t 1/1 Running 0 20m
camel-k-kit-bs1un3ubhuc7jr4mqb30-1-build 0/1 Completed 0 21m
camel-k-kit-bs1un3ubhuc7jr4mqb30-builder 0/1 Completed 0 22m
We notice a build kit pod as well as the actual integration deployment, when we describe our Camel-K integration it gives us more information in regards to what these things are and how they build out our integration:
oc describe integration amqp-sink-integration
Name: amqp-sink-integration
Namespace: cluster-1
Labels: <none>
Annotations: <none>
API Version: camel.apache.org/v1
Kind: Integration
Metadata:
Creation Timestamp: 2020-07-07T03:27:11Z
Generation: 1
Resource Version: 1679979
Self Link: /apis/camel.apache.org/v1/namespaces/cluster-1/integrations/amqp-sink-integration
UID: ef4a8c8f-7cd7-4ff7-a025-00532c7087ff
Spec:
Sources:
Content: package io.entropic.integration.examples.eventmesh;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
public class AMQPSinkIntegration extends RouteBuilder {
@BindToRegistry
public javax.jms.ConnectionFactory connectionFactory() {
return new org.apache.qpid.jms.JmsConnectionFactory(
"amqp://cloud1-router-mesh.cluster-1:5672?transport.trustAll=true&transport.verifyHost=false");
}
@Override
public void configure() throws Exception {
from("amqp:queue:test.db-events")
.log("**Received message ${body}**")
.setBody().simple("${body} processed by AMQPSinkIntegration")
.to("knative:channel/testing-dbevents")
.log("**sent message to knative channel ${body} **");
}
}
Name: AMQPSinkIntegration.java
Traits:
Container:
Configuration:
Limit - Cpu: 500m
Limit - Memory: 512Mi
Name: cloudnative-integration
Deployer:
Configuration:
Kind: deployment
Status:
Capabilities:
platform-http
Conditions:
Last Transition Time: 2020-07-07T03:27:11Z
Last Update Time: 2020-07-07T03:27:11Z
Message: camel-k
Reason: IntegrationPlatformAvailable
Status: True
Type: IntegrationPlatformAvailable
Last Transition Time: 2020-07-07T03:29:33Z
Last Update Time: 2020-07-07T03:29:33Z
Message: kit-bs1un3ubhuc7jr4mqb30
Reason: IntegrationKitAvailable
Status: True
Type: IntegrationKitAvailable
Last Transition Time: 2020-07-07T03:29:37Z
Last Update Time: 2020-07-07T03:29:37Z
Message: different controller strategy used (deployment)
Reason: CronJobNotAvailableReason
Status: False
Type: CronJobAvailable
Last Transition Time: 2020-07-07T03:29:37Z
Last Update Time: 2020-07-07T03:29:37Z
Message: deployment name is amqp-sink-integration
Reason: DeploymentAvailable
Status: True
Type: DeploymentAvailable
Last Transition Time: 2020-07-07T03:29:37Z
Last Update Time: 2020-07-07T03:29:37Z
Message: different controller strategy used (deployment)
Reason: KnativeServiceNotAvailable
Status: False
Type: KnativeServiceAvailable
Dependencies:
camel:amqp
mvn:org.apache.camel.k/camel-k-loader-java
mvn:org.apache.camel.k/camel-k-runtime-main
mvn:org.apache.camel.k:camel-knative
Digest: v3y2DwE-7npkTmwW-3zeh8sBAn2vpJHSXHsFabGHgk3I
Image: image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30@sha256:cde967f32c00d46020f7a5bd726d67145162e7abaf20a4abbf9bbd53dd191881
Kit: kit-bs1un3ubhuc7jr4mqb30
Phase: Running
Platform: camel-k
Profile: Knative
Replicas: 1
Runtime Provider: main
Runtime Version: 1.3.0.fuse-jdk11-800012-redhat-00001
Version: 1.0
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal ReasonRelatedObjectChanged 24m camel-k-integration-kit-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Integration Kit) changed phase to Build Running
Normal IntegrationPhaseUpdated 24m camel-k-integration-controller Integration amqp-sink-integration in phase Waiting For Platform
Normal IntegrationConditionChanged 24m camel-k-integration-controller IntegrationPlatformAvailable for Integration amqp-sink-integration: camel-k
Normal IntegrationPhaseUpdated 24m camel-k-integration-controller Integration amqp-sink-integration in phase Initialization
Normal IntegrationConditionChanged 24m camel-k-integration-controller No IntegrationKitAvailable for Integration amqp-sink-integration: creating a new integration kit
Normal IntegrationPhaseUpdated 24m camel-k-integration-controller Integration amqp-sink-integration in phase Building Kit
Normal ReasonRelatedObjectChanged 24m camel-k-integration-kit-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Integration Kit) changed phase to Build Submitted
Normal ReasonRelatedObjectChanged 24m camel-k-build-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Scheduling
Normal ReasonRelatedObjectChanged 24m camel-k-build-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Pending
Normal ReasonRelatedObjectChanged 24m camel-k-build-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Running
Normal IntegrationConditionChanged 24m camel-k-integration-controller No IntegrationPlatformAvailable for Integration amqp-sink-integration: camel-k
Normal ReasonRelatedObjectChanged 22m camel-k-build-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Build) changed phase to Succeeded
Normal ReasonRelatedObjectChanged 22m camel-k-integration-kit-controller Integration amqp-sink-integration dependent resource kit-bs1un3ubhuc7jr4mqb30 (Integration Kit) changed phase to Ready
Normal IntegrationConditionChanged 22m camel-k-integration-controller IntegrationKitAvailable for Integration amqp-sink-integration: kit-bs1un3ubhuc7jr4mqb30
Normal IntegrationPhaseUpdated 22m camel-k-integration-controller Integration amqp-sink-integration in phase Deploying
Normal IntegrationConditionChanged 22m camel-k-integration-controller No CronJobAvailable for Integration amqp-sink-integration: different controller strategy used (deployment)
Normal IntegrationConditionChanged 22m camel-k-integration-controller DeploymentAvailable for Integration amqp-sink-integration: deployment name is amqp-sink-integration
Normal IntegrationConditionChanged 22m camel-k-integration-controller No KnativeServiceAvailable for Integration amqp-sink-integration: different controller strategy used (deployment)
Normal IntegrationPhaseUpdated 22m camel-k-integration-controller Integration amqp-sink-integration in phase Running
We’ll notice our integration code (which can be in nearly any java variant), as well as a number of things about our inevitable run time environment that our build kit and builder pod ultimately have deployed.
Taking a look at our builder pod, we notice some very familiar things:
- Maven dependencies being fetched
- Wiring up of an underlying JVM environment
- Building of an image
A quick snippet from our buil kit pod shows some very familiar things:
{"level":"info","ts":1594092510.535882,"logger":"camel-k.builder","msg":"executing step","step":"github.com/apache/camel-k/pkg/builder/IncrementalImageContext","phase":30,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092510.59943,"logger":"camel-k.builder","msg":"step done in 0.063516 seconds","step":"github.com/apache/camel-k/pkg/builder/IncrementalImageContext","phase":30,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092510.5994673,"logger":"camel-k.builder","msg":"executing step","step":"github.com/apache/camel-k/pkg/builder/s2i/Publisher","phase":40,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092572.896473,"logger":"camel-k.builder","msg":"step done in 62.296992 seconds","step":"github.com/apache/camel-k/pkg/builder/s2i/Publisher","phase":40,"name":"kit-bs1un3ubhuc7jr4mqb30","task":"builder"}
{"level":"info","ts":1594092572.8965232,"logger":"camel-k.builder","msg":"dependencies: [camel:amqp mvn:org.apache.camel.k/camel-k-loader-java mvn:org.apache.camel.k/camel-k-runtime-main mvn:org.apache.camel.k:camel-knative]"}
{"level":"info","ts":1594092572.896557,"logger":"camel-k.builder","msg":"artifacts: [org.apache.camel:camel-amqp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-jms:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-support:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-spring:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-xml-jaxb:jar:3.1.0.fuse-jdk11-800011-redhat-00001 jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2 jakarta.activation:jakarta.activation-api:jar:1.2.1 com.sun.xml.bind:jaxb-core:jar:2.3.0 com.sun.xml.bind:jaxb-impl:jar:2.3.0 org.apache.camel:camel-xml-jaxp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-core-xml:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.springframework:spring-core:jar:5.2.3.RELEASE org.springframework:spring-jcl:jar:5.2.3.RELEASE org.springframework:spring-aop:jar:5.2.3.RELEASE org.springframework:spring-expression:jar:5.2.3.RELEASE org.springframework:spring-jms:jar:5.2.3.RELEASE org.springframework:spring-messaging:jar:5.2.3.RELEASE org.springframework:spring-context:jar:5.2.3.RELEASE org.springframework:spring-tx:jar:5.2.3.RELEASE org.springframework:spring-beans:jar:5.2.3.RELEASE org.apache.geronimo.specs:geronimo-jms_2.0_spec:jar:1.0.0.alpha-2-redhat-2 org.apache.qpid:qpid-jms-client:jar:0.48.0.redhat-00004 io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-unix-common:jar:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-kqueue:jar:4.1.45.Final-redhat-00001 org.apache.qpid:proton-j:jar:0.33.3.redhat-00001 io.netty:netty-buffer:jar:4.1.45.Final-redhat-00002 io.netty:netty-common:jar:4.1.45.Final-redhat-00002 io.netty:netty-handler:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec:jar:4.1.45.Final-redhat-00002 io.netty:netty-transport:jar:4.1.45.Final-redhat-00002 io.netty:netty-resolver:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec-http:jar:4.1.45.Final-redhat-00002 org.apache.camel.k:camel-k-loader-java:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-core:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-core-engine:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-base:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-management-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-util:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-endpointdsl:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.jooq:joor:jar:0.9.12 org.apache.camel.k:camel-k-runtime-main:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-main:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-caffeine-lrucache:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.github.ben-manes.caffeine:caffeine:jar:2.8.1 org.apache.camel:camel-headersmap:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.cedarsoftware:java-util:jar:1.40.0 org.apache.camel:camel-bean:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.logging.log4j:log4j-core:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-api:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-slf4j-impl:jar:2.13.1.redhat-00001 org.apache.camel.k:camel-knative:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.slf4j:slf4j-api:jar:1.7.30 org.apache.camel:camel-cloud:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.fasterxml.jackson.core:jackson-databind:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-annotations:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-core:jar:2.10.3.redhat-00001 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.3.redhat-00001 org.apache.camel.k:camel-knative-api:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-knative-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-platform-http:jar:3.1.0.fuse-jdk11-800011-redhat-00001 io.vertx:vertx-web:jar:3.8.5.redhat-00005 io.vertx:vertx-auth-common:jar:3.8.5.redhat-00005 io.vertx:vertx-bridge-common:jar:3.8.5.redhat-00005 io.vertx:vertx-web-client:jar:3.8.5.redhat-00005 io.vertx:vertx-web-common:jar:3.8.5.redhat-00005 io.vertx:vertx-core:jar:3.8.5.redhat-00005 io.netty:netty-handler-proxy:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-socks:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-http2:jar:4.1.45.Final-redhat-00001 io.netty:netty-resolver-dns:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-dns:jar:4.1.45.Final-redhat-00001]"}
{"level":"info","ts":1594092572.8966222,"logger":"camel-k.builder","msg":"artifacts selected: [org.apache.camel:camel-amqp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-jms:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-support:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-spring:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-xml-jaxb:jar:3.1.0.fuse-jdk11-800011-redhat-00001 jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2 jakarta.activation:jakarta.activation-api:jar:1.2.1 com.sun.xml.bind:jaxb-core:jar:2.3.0 com.sun.xml.bind:jaxb-impl:jar:2.3.0 org.apache.camel:camel-xml-jaxp:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-core-xml:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.springframework:spring-core:jar:5.2.3.RELEASE org.springframework:spring-jcl:jar:5.2.3.RELEASE org.springframework:spring-aop:jar:5.2.3.RELEASE org.springframework:spring-expression:jar:5.2.3.RELEASE org.springframework:spring-jms:jar:5.2.3.RELEASE org.springframework:spring-messaging:jar:5.2.3.RELEASE org.springframework:spring-context:jar:5.2.3.RELEASE org.springframework:spring-tx:jar:5.2.3.RELEASE org.springframework:spring-beans:jar:5.2.3.RELEASE org.apache.geronimo.specs:geronimo-jms_2.0_spec:jar:1.0.0.alpha-2-redhat-2 org.apache.qpid:qpid-jms-client:jar:0.48.0.redhat-00004 io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-unix-common:jar:4.1.45.Final-redhat-00001 io.netty:netty-transport-native-kqueue:jar:4.1.45.Final-redhat-00001 org.apache.qpid:proton-j:jar:0.33.3.redhat-00001 io.netty:netty-buffer:jar:4.1.45.Final-redhat-00002 io.netty:netty-common:jar:4.1.45.Final-redhat-00002 io.netty:netty-handler:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec:jar:4.1.45.Final-redhat-00002 io.netty:netty-transport:jar:4.1.45.Final-redhat-00002 io.netty:netty-resolver:jar:4.1.45.Final-redhat-00002 io.netty:netty-codec-http:jar:4.1.45.Final-redhat-00002 org.apache.camel.k:camel-k-loader-java:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-core:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-core-engine:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-base:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-management-api:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-util:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-endpointdsl:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.jooq:joor:jar:0.9.12 org.apache.camel.k:camel-k-runtime-main:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-main:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.camel:camel-caffeine-lrucache:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.github.ben-manes.caffeine:caffeine:jar:2.8.1 org.apache.camel:camel-headersmap:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.cedarsoftware:java-util:jar:1.40.0 org.apache.camel:camel-bean:jar:3.1.0.fuse-jdk11-800011-redhat-00001 org.apache.logging.log4j:log4j-core:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-api:jar:2.13.1.redhat-00001 org.apache.logging.log4j:log4j-slf4j-impl:jar:2.13.1.redhat-00001 org.apache.camel.k:camel-knative:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.slf4j:slf4j-api:jar:1.7.30 org.apache.camel:camel-cloud:jar:3.1.0.fuse-jdk11-800011-redhat-00001 com.fasterxml.jackson.core:jackson-databind:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-annotations:jar:2.10.3.redhat-00001 com.fasterxml.jackson.core:jackson-core:jar:2.10.3.redhat-00001 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.3.redhat-00001 org.apache.camel.k:camel-knative-api:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-knative-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel.k:camel-k-runtime-http:jar:1.3.0.fuse-jdk11-800012-redhat-00001 org.apache.camel:camel-platform-http:jar:3.1.0.fuse-jdk11-800011-redhat-00001 io.vertx:vertx-web:jar:3.8.5.redhat-00005 io.vertx:vertx-auth-common:jar:3.8.5.redhat-00005 io.vertx:vertx-bridge-common:jar:3.8.5.redhat-00005 io.vertx:vertx-web-client:jar:3.8.5.redhat-00005 io.vertx:vertx-web-common:jar:3.8.5.redhat-00005 io.vertx:vertx-core:jar:3.8.5.redhat-00005 io.netty:netty-handler-proxy:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-socks:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-http2:jar:4.1.45.Final-redhat-00001 io.netty:netty-resolver-dns:jar:4.1.45.Final-redhat-00001 io.netty:netty-codec-dns:jar:4.1.45.Final-redhat-00001]"}
{"level":"info","ts":1594092572.8966591,"logger":"camel-k.builder","msg":"base image: registry.access.redhat.com/ubi8/openjdk-11:1.3"}
{"level":"info","ts":1594092572.8966627,"logger":"camel-k.builder","msg":"resolved base image: registry.access.redhat.com/ubi8/openjdk-11:1.3"}
{"level":"info","ts":1594092572.8966658,"logger":"camel-k.builder","msg":"resolved image: image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30:1678529"}
And ultimately, our build pod shows the creation of the ultimate image that represents our Integration:
oc logs camel-k-kit-bs1un3ubhuc7jr4mqb30-1-build -f
Caching blobs under "/var/cache/blobs".
Pulling image registry.access.redhat.com/ubi8/openjdk-11:1.3 ...
Getting image source signatures
Copying blob sha256:1b99828eddf5297ca28fa7eac7f47a40d36a693c628311406788a14dfe75e076
Copying blob sha256:fba81d8872a85de345b80cc69b1f23309ee284950d45dbee2bd8bf9bf17c60a7
Copying blob sha256:e96e3a1df3b2b1e01f9614725b50ea4d1d8e480980e456815ade3c7afca978d7
Copying config sha256:d43c43b348f8c2cfcbe4beff8c36dd8c73cd78cb89e3912190b0b357904f367d
Writing manifest to image destination
Storing signatures
STEP 1: FROM registry.access.redhat.com/ubi8/openjdk-11:1.3
STEP 2: ADD . /deployments
time="2020-07-07T03:29:09Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:09Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 1a55bc7e5af
STEP 3: USER 1000
time="2020-07-07T03:29:10Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:10Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 562ea4ed86e
STEP 4: ENV "OPENSHIFT_BUILD_NAME"="camel-k-kit-bs1un3ubhuc7jr4mqb30-1" "OPENSHIFT_BUILD_NAMESPACE"="cluster-1"
time="2020-07-07T03:29:10Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:10Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 2a358bb17bf
STEP 5: LABEL "io.openshift.build.name"="camel-k-kit-bs1un3ubhuc7jr4mqb30-1" "io.openshift.build.namespace"="cluster-1"
STEP 6: COMMIT temp.builder.openshift.io/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30-1:bd99c4f2
time="2020-07-07T03:29:10Z" level=info msg="Image operating system mismatch: image uses \"\", expecting \"linux\""
time="2020-07-07T03:29:10Z" level=info msg="Image architecture mismatch: image uses \"\", expecting \"amd64\""
--> 59f4e2ff6c4
59f4e2ff6c47ff3cf2e065bb8b6da3a97e87c54f4f48cf30b2213e787c3bc636
Pushing image image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30:1678529 ...
Getting image source signatures
Copying blob sha256:e96e3a1df3b2b1e01f9614725b50ea4d1d8e480980e456815ade3c7afca978d7
Copying blob sha256:fba81d8872a85de345b80cc69b1f23309ee284950d45dbee2bd8bf9bf17c60a7
Copying blob sha256:50fdb1a6f2f893727eed92d3fb141a71b9abde2b839c1d33d08265a3cd313dc7
Copying blob sha256:1b99828eddf5297ca28fa7eac7f47a40d36a693c628311406788a14dfe75e076
Copying config sha256:59f4e2ff6c47ff3cf2e065bb8b6da3a97e87c54f4f48cf30b2213e787c3bc636
Writing manifest to image destination
Storing signatures
Successfully pushed image-registry.openshift-image-registry.svc:5000/cluster-1/camel-k-kit-bs1un3ubhuc7jr4mqb30@sha256:cde967f32c00d46020f7a5bd726d67145162e7abaf20a4abbf9bbd53dd191881
Push successful
We now have our image created by our Integration Build Kit in the local Openshift Image registry.
All with a simple invocation of the Kamel CLI.
At this point, we’ll also deploy our event sink to cluster-2, as both cluster-1 and cluster-2 will have event receivers that are participating in our event mesh:
kamel run -n cluster-2 --trait deployer.kind=deployment --trait container.limit-cpu=500m --trait container.limit-memory=512Mi --trait container.name=cloudnative-integration ./src/main/java/io/entropic/integration/examples/eventmesh/AMQPSinkIntegration.java
We’ll notice that we have a similar set of pods in cluster-2:
oc get pods -w -n cluster-2
NAME READY STATUS RESTARTS AGE
amqp-sink-integration-76bb7c667f-5mdm2 1/1 Running 0 11m
camel-k-kit-bs1v76ubhuc7jr4mqb3g-1-build 0/1 Completed 0 12m
camel-k-kit-bs1v76ubhuc7jr4mqb3g-builder 0/1 Completed 0 13m
cloud2-router-mesh-d566476ff-bqtrx 1/1 Running 0 3h
default-broker-filter-b5967fd6c-fs87l 1/1 Running 0 2h
default-broker-ingress-5485ff998c-k468n 1/1 Running 0 2h
interconnect-operator-745d45688b-7snf2 1/1 Running 0 3h
Taking a peak at the logs of our integration pod, we’ll notice it has linked up with our event mesh and some familar Apache Camel logging:
oc logs amqp-sink-integration-768c959d79-tz4zc -f
2020-07-07 04:19:59.179 INFO [main] LRUCacheFactory - Detected and using LURCacheFactory: camel-caffeine-lrucache
2020-07-07 04:19:59.664 INFO [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.RuntimeConfigurer@6bf08014
2020-07-07 04:19:59.664 INFO [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.ContextConfigurer@6ee4d9ab
2020-07-07 04:19:59.665 INFO [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.RoutesConfigurer@302f7971
2020-07-07 04:19:59.666 INFO [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.RoutesDumper@9573584
2020-07-07 04:19:59.666 INFO [main] ApplicationRuntime - Add listener: org.apache.camel.k.listener.PropertiesFunctionsConfigurer@352c1b98
2020-07-07 04:19:59.670 INFO [main] ApplicationRuntime - Listener org.apache.camel.k.listener.RuntimeConfigurer@6bf08014 executed in phase Starting
2020-07-07 04:19:59.679 INFO [main] ApplicationRuntime - Listener org.apache.camel.k.listener.PropertiesFunctionsConfigurer@352c1b98 executed in phase Starting
2020-07-07 04:19:59.680 INFO [main] BaseMainSupport - Using properties from:
2020-07-07 04:19:59.763 INFO [main] RuntimeSupport - Looking up loader for language: java
2020-07-07 04:19:59.765 INFO [main] RuntimeSupport - Found loader org.apache.camel.k.loader.java.JavaSourceLoader@36060e for language java from service definition
2020-07-07 04:20:02.571 INFO [main] RoutesConfigurer - Loading routes from: file:/etc/camel/sources/i-source-000/AMQPSinkIntegration.java?language=java
2020-07-07 04:20:02.571 INFO [main] ApplicationRuntime - Listener org.apache.camel.k.listener.RoutesConfigurer@302f7971 executed in phase ConfigureRoutes
2020-07-07 04:20:02.875 INFO [main] RuntimeSupport - Found customizer org.apache.camel.k.http.PlatformHttpServiceContextCustomizer@1b70203f with id platform-http from service definition
2020-07-07 04:20:02.876 INFO [main] RuntimeSupport - Apply ContextCustomizer with id=platform-http and type=org.apache.camel.k.http.PlatformHttpServiceContextCustomizer
2020-07-07 04:20:03.079 INFO [main] PlatformHttpServiceEndpoint - Creating new Vert.x instance
2020-07-07 04:20:04.168 INFO [vert.x-eventloop-thread-1] PlatformHttpServer - Vert.x HttpServer started on 0.0.0.0:8080
2020-07-07 04:20:04.175 INFO [main] ApplicationRuntime - Listener org.apache.camel.k.listener.ContextConfigurer@6ee4d9ab executed in phase ConfigureContext
2020-07-07 04:20:04.176 INFO [main] AbstractCamelContext - Apache Camel 3.1.0.fuse-jdk11-800011-redhat-00001 (CamelContext: camel-k) is starting
2020-07-07 04:20:04.177 INFO [main] DefaultManagementStrategy - JMX is disabled
2020-07-07 04:20:04.179 INFO [main] HeadersMapFactoryResolver - Detected and using HeadersMapFactory: camel-headersmap
2020-07-07 04:20:04.566 INFO [main] BaseMainSupport - Autowired property: connectionFactory on component: AMQPComponent as exactly one instance of type: javax.jms.ConnectionFactory found in the registry
2020-07-07 04:20:04.766 INFO [main] KnativeComponent - found knative transport: org.apache.camel.component.knative.http.KnativeHttpTransport@68fe48d7 for protocol: http
2020-07-07 04:20:05.563 INFO [main] AbstractCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2020-07-07 04:20:11.665 INFO [AmqpProvider :(1):[amqp://cloud2-router-mesh.cluster-2:5672]] SaslMechanismFinder - Best match for SASL auth was: SASL-ANONYMOUS
2020-07-07 04:20:11.767 INFO [AmqpProvider :(1):[amqp://cloud2-router-mesh.cluster-2:5672]] JmsConnection - Connection ID:61991487-4cbc-4633-9bc4-36053a86d8f9:1 connected to remote Broker: amqp://cloud2-router-mesh.cluster-2:5672
2020-07-07 04:20:11.767 INFO [main] AbstractCamelContext - Route: route1 started and consuming from: amqp://queue:test.db-events
2020-07-07 04:20:11.770 INFO [main] AbstractCamelContext - Total 1 routes, of which 1 are started
2020-07-07 04:20:11.770 INFO [main] AbstractCamelContext - Apache Camel 3.1.0.fuse-jdk11-800011-redhat-00001 (CamelContext: camel-k) started in 7.595 seconds
2020-07-07 04:20:11.771 INFO [main] ApplicationRuntime - Listener org.apache.camel.k.listener.RoutesDumper@9573584 executed in phase Started
Deploying Serverless Integrations on the Event Bus
Now that we have an event sink to persist events onto our event bus, let’s leverage the power of Camel-K to have integrations participate on the event bus:
Upon executing this CLI command, we’ll notice something a little different than our previous Camel-K Integration deployment:
oc get pods -w -n cluster-1
NAME READY STATUS RESTARTS AGE
amqp-sink-integration-64dc6fcd4d-szl4t 1/1 Running 0 1h
camel-k-kit-bs1un3ubhuc7jr4mqb30-1-build 0/1 Completed 0 1h
camel-k-kit-bs1un3ubhuc7jr4mqb30-builder 0/1 Completed 0 1h
camel-k-kit-bs1vjgmbhuc7jr4mqb4g-1-build 0/1 Completed 0 2m
camel-k-kit-bs1vjgmbhuc7jr4mqb4g-builder 0/1 Completed 0 2m
cloud1-router-mesh-7f698d8c65-xft27 1/1 Running 0 4h
default-broker-filter-b5967fd6c-rs8vm 1/1 Running 0 2h
default-broker-ingress-6d4c6474c8-9gd4m 1/1 Running 0 2h
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x 2/2 Terminating 0 1m
interconnect-operator-5684994fbf-587dk 1/1 Running 0 4h
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x 2/2 Terminating 0 1m
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x 0/2 Terminating 0 1m
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x 0/2 Terminating 0 1m
event-bus-transformation-integration-wqksm-deployment-5bf8jkb7x 0/2 Terminating 0 1m
We’ll notice that our integration runs and then scales back down to zero. This is because we are using Knative Serving and our Integration is now serverless and scaled to 0. When we interrogate K8s, we’ll notice our integration is ready; however, we have 0 replicas running:
oc get integrations -n cluster-1
NAME PHASE KIT REPLICAS
amqp-sink-integration Running kit-bs1un3ubhuc7jr4mqb30 1
event-bus-transformation-integration Running kit-bs1vjgmbhuc7jr4mqb4g 0
And if we interrogate our knative serving system, we’ll notice we have a viable knative serving revision for our integration:
oc get revisions.serving.knative.dev -n cluster-1
NAME CONFIG NAME K8S SERVICE NAME GENERATION READY REASON
event-bus-transformation-integration-wqksm event-bus-transformation-integration event-bus-transformation-integration-wqksm 1 True
We will also notice that we have a subscription to a knative event channel:
oc get subscriptions.messaging.knative.dev
NAME READY REASON AGE
testing-dbevents-event-bus-transformation-integration True 9m38s
Describing the subscription resource:
[mcostell@work cloud-native-event-mesh]$ oc describe subscriptions.messaging.knative.dev testing-dbevents-event-bus-transformation-integration
Name: testing-dbevents-event-bus-transformation-integration
Namespace: cluster-1
Labels: camel.apache.org/generation=1
camel.apache.org/integration=event-bus-transformation-integration
Annotations: messaging.knative.dev/creator=system:serviceaccount:openshift-operators:camel-k-operator
messaging.knative.dev/lastModifier=system:serviceaccount:openshift-operators:camel-k-operator
API Version: messaging.knative.dev/v1beta1
Kind: Subscription
Metadata:
Creation Timestamp: 2020-07-07T04:29:19Z
Finalizers:
subscriptions.messaging.knative.dev
Generation: 1
Owner References:
API Version: camel.apache.org/v1
Block Owner Deletion: true
Controller: true
Kind: Integration
Name: event-bus-transformation-integration
UID: f21518f9-8906-4cb8-a164-fecf2df7fb6c
Resource Version: 1714673
Self Link: /apis/messaging.knative.dev/v1beta1/namespaces/cluster-1/subscriptions/testing-dbevents-event-bus-transformation-integration
UID: 968a8794-9744-414f-91b4-d6226670fa4d
Spec:
Channel:
API Version: messaging.knative.dev/v1alpha1
Kind: Channel
Name: testing-dbevents
Subscriber:
Ref:
API Version: serving.knative.dev/v1
Kind: Service
Name: event-bus-transformation-integration
Namespace: cluster-1
Status:
Conditions:
Last Transition Time: 2020-07-07T04:29:29Z
Status: True
Type: AddedToChannel
Last Transition Time: 2020-07-07T04:29:30Z
Status: True
Type: ChannelReady
Last Transition Time: 2020-07-07T04:29:30Z
Status: True
Type: Ready
Last Transition Time: 2020-07-07T04:29:30Z
Status: True
Type: Resolved
Observed Generation: 1
Physical Subscription:
Subscriber URI: http://event-bus-transformation-integration.cluster-1.svc.cluster.local
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal FinalizerUpdate 10m subscription-controller Updated "testing-dbevents-event-bus-transformation-integration" finalizers
Warning SubscriberResolveFailed 10m (x5 over 10m) subscription-controller Failed to resolve spec.subscriber: address not set for &ObjectReference{Kind:Service,Namespace:cluster-1,Name:event-bus-transformation-integration,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}
Normal SubscriberSync 10m subscription-controller Subscription was synchronized to channel "testing-dbevents"
Warning SubscriptionNotMarkedReadyByChannel 10m subscription-controller channel.Status.SubscribableStatus is nil
Normal SubscriptionReconciled 10m (x3 over 10m) subscription-controller Subscription reconciled: "cluster-1/testing-dbevents-event-bus-transformation-integration"
We now have an integration that is connected to the event bus and an event sink that acts as a source for our event bus and ultimately allows our serverless integrations to participate with our event mesh.
Apply this same integration to cluster-2 and then we will be ready to demonstrate some of the incredible power or our event mesh and event bus as we demonstrate active-passive, active-active and other event mesh topologies along with serverless integration in a way that makes us truly Cloud Native!!!