Distributed log analytics using Apache Kafka, Kafka Connect and Fluentd

At Cloudbox Labs, we think logs is an incredibly interesting dataset. They are the heart beats of our tech stack. They give us insight into how users interact with us. They provider real time application intelligence. For that reason we built a robust set of data infrastructure that can handle large volume of logs from all our applications, and allow for real time analytics as well as batch processing.

For data processing architecture that are running distributed services, collecting and aggregating logs from production services can be challenging. For some of our clients, we deploy Docker based distributed services, the task of making sure we capture logs from all the containers and routing them to various downstream analytics engines becomes very interesting.

Our log processing pipeline uses Fluentd for unified logging inside Docker containers, Apache Kafka as a persistent store and streaming pipe and Kafka Connect to route logs to both ElasticSearch for real time indexing and search, as well as S3 for batch analytics and archival.

Fluentd is an open source unified logging application that can collect logs intelligently for many different types of system, from app logs, nginx logs to database and system logs. It comes with various plugins that connects fluentdwith external systems. We are going to use its Apache Kafka plugin to forward logs to a Kafka topic in JSON format.

Apache Kafka almost needs no introduction. It’s a popular distributed pub-sub messaging platform that offers persistent store and high scalability. Aggregating all our Docker container logs on Kafka allows us to handle high message throughput and from there route them to any number of downstream systems using Kafka Connect.

Kafka Connect is a collective name for a set of connector that connects Kafka with external systems, e.g. JDBC databases, AWS S3, Google Cloud BigQuery, etc. Each connector can be installed separately on the Connect platform and users can interact with connectors via a REST interface on Kafka Connect. We will use the S3 and ElasticSearch connector in our log processing.

Elasticsearch is an popular open-source index and search software. For our purpose we will sink and index our logs into elasticsearch that can be analyzed in real time.

The overall architecture looks like this.

log_arch.png

In this post for demo purpose we are going to spin up entire log processing pipeline using Docker Compose, including web app, fluentd, kafka, zookeeper, kafka connect and elasticsearch. Docker Compose allows us to easily run multi-container Docker apps in an isolated environment and iterate development quickly.

The entire stack can be created by using one YAML file. Here is the full code.

A bit about networking amongst the containers so that they can talk to each other. Docker compose sets up a single network that each container can join. Each container is discoverable by it’s container name in the default network. For example, Kafka bootstrap servers can be accessed by kafka:9092, Zookeep is discoverable via zookeeper:2181. Docker compose takes care of all the low level networking setup.

Now let’s walk through our Docker containers.

  1. To simulate our web app, we simply create a Docker container that runs an Apache HTTP server using an existing httpd image. It has a REST api that produces logs every time it receives a GET request. We also map host port 8080 to container port 80, where apache server runs on so we can curl the web app at http://localhost:8080/. It also links the fluentd container so that logs can be forwarded across containers. Lastly we set up a logging driver named fluentd with a tag.

 web:
      image: httpd
      ports:
        - "8080:80"
      links:
        - fluentd
      logging:
        driver: "fluentd"
        options:
          fluentd-address: localhost:24224
          tag: httpd.access
view raw

2. Next up we set up a container that runs fluentd. Instead of using an existing Docker image, we opted to run our own Dockerfile, so that we can install the fluentd kafka plugin to have logs forwarded to our Kafka container.

fluentd:
      build:
        context: ./
        dockerfile: Dockerfile-fluentd
      volumes:
        - ./fluentd/conf:/fluentd/etc
      depends_on:
        - kafka
      links:
        - kafka
      ports:
        - "24224:24224"
        - "24224:24224/udp"

Our Dockerfile looks like this. It run the fluentd:v0.12-debian image and installs the kafka plugin.

FROM fluent/fluentd:v0.12-debian

ENV FLUENT_UID=0
RUN mkdir /buffer
RUN ["gem", "install", "fluent-plugin-kafka", "--no-rdoc", "--no-ri", "--version", "0.7.9"]

The kafka plugin configuration file is mounted onto the Docker container at /fluentd/conf/fluentd.conf.

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>
<match *.**>
    @type kafka_buffered

    # list of seed brokers
    brokers kafka:9092

    # buffer settings
    buffer_type file
    buffer_path /buffer/td
    flush_interval 3s

    # topic settings
    default_topic log-messages

    # data type settings
    output_data_type json
    compression_codec gzip

    # producer settings
    max_send_retries 1
    required_acks -1
</match>

It’s pretty straightforward. It specified a plugin type called kafka_buffered as it buffered the logs into a local file at /buffer/td and flushed the contents to Kafka every 3 seconds. The Kafka broker is reached at kafka:9092 in the default network. The logs are published to Kafka topic “log-messages” in gzipped JSON blob.

3. Next we set up Apache Kafka and Zookeeper pair as our main pubsub backbone using existing Docker images.

zookeeper:
      image: wurstmeister/zookeeper
      ports:
        - "2181:2181"
    kafka:
      image: wurstmeister/kafka
      hostname: kafka
      depends_on:
        - zookeeper
      ports:
        - "9092:9092"
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_CREATE_TOPICS: log-messages:1:1,connect-config:1:1:compact,connect-offset:1:1:compact,connect-status:1:1:compact
        KAFKA_LISTENERS: PLAINTEXT://kafka:9092

The interest bit here is that we specify that Kafka should create 4 topics for processing.

  • log-messages: that’s where all our container logs will go. For demo purpose it’s a 1 partition, non-compatible topic.

  • connect-config/offset/status: these 3 topics are used by Kafka Connect and they have to be compact topics as required by Connect

4. Now comes the most interesting container of the whole stack. The Kafka Connect container.

kafka_connect:
      build:
        context: ./
        dockerfile: Dockerfile-kafka-connect
      depends_on:
        - kafka
        - elasticsearch
      links:
        - kafka
        - elasticsearch
      environment:
        CONNECT_BOOTSTRAP_SERVERS: kafka:9092
        CONNECT_GROUP_ID: 1
        CONNECT_CONFIG_STORAGE_TOPIC: connect-config
        CONNECT_OFFSET_STORAGE_TOPIC: connect-offset
        CONNECT_STATUS_STORAGE_TOPIC: connect-status
        CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
        CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
        CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
        CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
        CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
        CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
        CONNECT_REST_ADVERTISED_HOST_NAME: localhost
        CONNECT_PLUGIN_PATH: "/usr/share/java,/opt/connectors"

Here we run Kafka Connector in distributed mode. Distributed mode offers more scalability and fault tolerance because all the connector workers can coordinate amongst themselves. Distributed workers are stateless and can be controlled via a REST api running in the container. Kafka Connector has a wealth of worker configuration options. Here we use schemaless JSON converter for message de/serialization.

We also use a Dockerfile so we can install both the Elasticsearch and S3 connectors and mount connector configurations onto the container filesystem.

FROM confluentinc/cp-kafka-connect:4.0.0

RUN apt-get update && apt-get install -y vim

RUN mkdir -p /opt
RUN mkdir -p /opt/connectors
ADD connector_jars /opt/connectors/connector_jars
WORKDIR /opt

ADD connector_conf/ /opt/connector_conf
RUN wget https://github.com/confluentinc/kafka-connect-elasticsearch/archive/v3.3.3-rc1.tar.gz; \
    tar xzf v3.3.3-rc1.tar.gz -C /opt/connectors

Confluent Inc. releases tarball of the Elasticsearch connector but I was not able to find a pre-packaged S3 connector, so we packaged our own and mount the jars in /opt/connectors/connector_jars in the container where Kafka Connect can load them.

5. Lastly we set up a elasticsearch container based on the official Docker image, so logs can be indexed and searched.

Now let’s following these simple steps to run the log processing stack.

  1. Build the Docker image from the compose

docker-compose -f docker-compose-dist-logging.yaml build

2. Run the Docker image that will set up all the containers

docker-compose -f docker-compose-dist-logging.yaml up

3. Exec into the kafka_connect container and use the REST api to add both Elasticsearch and S3 connectors

docker-compose -f docker-compose-dist-logging.yaml exec kafka_connect bash
>> curl -X POST -H "Content-Type: application/json" --data @/opt/connector_conf/connector_elasticsearch.json http://localhost:8083/connectors
>> curl -X POST -H "Content-Type: application/json" --data @/opt/connector_conf/connector_s3.json http://localhost:8083/connectors

That’s it! Now have a full data pipeline that can collect and aggregate logs from distributed Docker containers and sink them to both Elasticsearch for real time indexing and search, as well as to S3 for batch processing and long term archival.

As always you can find the full code discussed in this post on Cloudbox Labs github.