Skip to content
This repository has been archived by the owner on Jan 11, 2022. It is now read-only.

lightbend/RSocketCloudflow

Repository files navigation

RSocket Cloudflow Ingress

This repository is the companion code to the article

This project is based on the articles RSocket Intro Reactive Service-to-service and Reactor and cloudflow sensor example See also presentations here

The actual RSocket code is modeled after this one and this one

Project structure

Project contains several modules:

  • images - contains images for this Readme
  • interactions - initial experimentations with RSockets, based on this code. The examples here are:
    • Fire and forget demonstrates implementation of a simple fire and forget
    • Fire and forget with lease demonstrates implementation of a fire and forget using leasing
    • Load balanced Fire and forget demonstrates implementation of a loadbalancer for fire and forget
    • Request-stream demonstrates implementation of a back pressured request stream
    • Resumable Request-stream demonstrates implementation of a resumable request stream. Running this example requires usage of sockat. Start socat by running command socat -d TCP-LISTEN:7001,fork,reuseaddr TCP:localhost:7000, which creates a mapping between ports 7000 and 7001, so that a client can reach a server. To demonstrate resumability stop and restart socat.
    • Channel demonstrates implementation of a back pressured channel
  • transports - is an experimentations with RSockets pluggable transports (see below)
  • support is a shared project containing Avro definitions, shared transformation code and experimental Akka implementations
  • sensordata is a cloudflow implementation for the Sensor data Processing
  • client is an implementation of rsocket-based data provider for publishing sensor data.

Cloudflow implementation

The idea behind the implementation is to replace HTTP Ingress from the original implementation with the RSocket ingress. Three different ingress implementations are provided:

  • Fire and forget JSON based ingress implemented by the class. Here Rsocket fire and forget interactions are used, and sensor data is passed as text JSON.
  • Fire and forget Avro based ingress implemented by the class. Here RSocket fire and forget interactions are used, and sensor data is passed as Avro encoded binary. Alternatively, for this interaction you can use class using Akka source
  • Stream Avro based ingress implemented by the class. Here Rsocket request-stream interactions are used, and sensor data is passed as Avro encoded binary.

Any of the implementations can be used. To pick the one that you want to use, go to blueprint and uncomment the one that you want to experiment with.

To support these three interactions there are three data publishers:

A class Producer runner is provided allowing to pick an individual publisher.

Running locally

To run locally:

  • Select a server configuration by uncommenting your selection in sensordata/src/main/blueprint/blueprint.conf
  • Start Cloudflow implementation
    • sbt runLocal
  • Tail log provided by a previous command
    • tail -f ...
  • Run corresponding data provider.
    • in the application config set the required client
      • 1 for binary fire and forget producer
      • 2 for UTF8 fire and forget producer
      • 3 for binary streaming producer
    • Start Producer runner either directly from Intellij (with a corresponding option) or using the following command - sbt "project client" run

Note: You can also change the data publishing interval (default is 1000ms) by changing producer.interval value in application config

Running on Kubernetes Cluster

Note: This example assumes that you already have CloudFlow deployed on a cluster, if you are not already familiar with deploying Cloudflow to a cluster it is recommended to complete this tutorial

To run on a cluster (Assuming you have a Cloudflow instance deployed on it)

  • Select an application configuration by uncommenting your selection in sensordata/src/main/blueprint/blueprint.conf
  • Edit the file target.env
    • ensure that your cloudflowDockerRegistry is correct. A simplest way is to use (Docker hub)[https://hub.docker.com/]
  • Publish an image to the docker registry:
    • First make sure that you have access to the cluster. Try to run kubectl get nodes to verify this
    • Then publish sbt buildAndPublish
  • Deploy the application to the cluster
    • Run kubectl cloudflow deploy docker.io/lightbend/sensordata:...
    • Check the status with get pods -n sensordata
    • You can also go to enterprise console to make sure that the application is installed. You should see the picture similar to this Console. Note that there is no throughtput, because we are not publishing any data
  • Setup a local proxy to the ingress
    • Find the Pod name for the ingress kubectl get pods --all-namespaces | grep sensor-data-rsocket-ingress | awk '{ print $2 }'
    • Create a proxy kubectl port-forward <pod name> -n sensor-data 3000:3000
  • Run corresponding data provider
    • sbt "project client" run
    • Select the option corresponding to the option selected in the blueprint

Instead of setting up local proxy and connecting to the cluster from the local machine, you can also build an image containing client and then deploy it to the cluster. To do this, first execute the first 4 steps above.

  • Build client image
    • Run sbt docker to build an image. This should create an image lightbend/client:0.1
  • Push an image to the docker repository. In this example I am using docker hub docker push lightbend/client:0.1
  • Deploy it to the cluster
    • Validate service name and port by running kubectl get service -n sensordata
    • Update deployment to set required parameters
    • Deploy client by running kubectl apply -f <your project location>install/client.yaml

Protocol pluggability and comparison

The module transports shows transports pluggability in RSocket implementations. Out of the box, RSocket java supports 3 treansports:

  • TCP
  • WebSockets
  • Local (in JVM)

Some interesting TCP vs Web Sockets comparisons are here, here and here For more information on multi transport support in RSocket, reffer to this blog post

Kafka transport

To demonstrate protocol pluggability, there is a quick implementation of Kafka transport based on the Reactive Kafka APIs

To simplify local testing, the project also contains Embedded Kafka Server

An example of embedded server usage with Reactive Kafka APIs can be found here

The actual implementation is based on 3 classes:

  • Kafka Duplex connection a duplex connection for RSocket over Kafka. This connection is based on Kafka bootstrapServers connection string and a base topic, used by server to recieve message. By convention used here this topic is used for server to recieve and client to send message. A companion topic with the name topic-reply is used by server to send and consumer to recieve messages.
  • Kafka client transport is an implementation of RSocket client transport for Kafka
  • Kafka server transport is an implementation of RSocket server transport for Kafka

A Simple example for using RSocket communications over Kafka can be found here

IPC transport

IPC Transport implementation is based on MappedByteBuffer, which provides access to a memory-mapped region of a file. Since in many operating systems RAM can be mounted as a file system, we can easily use the same MappedByteBuffer for communication over a file or shared memory region. Examples leveraging IPC transport can be found here

Releases

No releases published

Packages

No packages published