Complete Guide: Configuring Filebeat to Send Logs to Kafka

Complete Guide: Configuring Filebeat to Send Logs to Kafka

How can I configure Filebeat to send logs to Kafka? This is a complete guide on configuring Filebeat to send logs to Kafka. Filebeat is one of the Elastic stack beats that is used to collect system log data and sent them either to Elasticsearch or Logstash or to distributed event store and handling large volumes of data streams processing platforms such as Kafka.

Configure Filebeat to Send Logs to Kafka

Install and Configure Kafka

You can check our guides on the links below to learn how to install and configure Kafka.

Easy Steps: Install Apache Kafka on Debian 12

Quick Guide: Configure Apache Kafka SSL/TLS Encryption for Enhanced Security

Install Filebeat

You need Filebeat to be able to collect system logs from various Linux systems and push them to Kafka.

You can check our guides on how to install Filebeat;

How to install Filebeat on Linux

Configure Filebeat to Send Logs to Kafka with SSL/TLS (PLAINTEXT)

Configure Filebeat to Connect to Plaintext Kafka

By default, Filebeat is set to send event data to Elasticsearch as you can see in the output section of the filebeat.yml configuration file.


# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
    hosts: ["localhost:9200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

In order to configure Filebeat to send logs to Kafka, edit the Filebeat configuration file and update the output section by configuring the Apache Kafka connection and other details.

vim /etc/filebeat/filebeat.yml

Start by commenting out the Elasticsearch output configs;


# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

Next, add a section for Apache Kafka output beneath (not necessarily) Elasticsearch output. If you are not sending any sensitive information to Kafka or the connection is just local to your network, this configuration below works for Kafka plaintext connection where traffic is send un-encrypted.


# ------------------------------ Kafka Output -------------------------------
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["192.168.57.32:9092"]

  # message topic selection + partitioning
  topic: "filebeat"
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

So, what do the options used here mean?

  • output.kafka: This specifies that output plugin to be used by Filebeat.
  • hosts: Defines the addresses and ports of the Kafka broker(s) to connect to for writing event data. In this case, Filebeat is configured to connect to a Kafka broker running on the IP address 192.168.57.32 and port 9092. If you have a Kafka cluster, you can use an HA approach;
    hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  • topic: It specifies the Kafka topic to which Filebeat will send the messages. In this case, the topic is set to filebeat. It can be anything, including dynamic names.
  • partition.round_robin: This setting determines how Filebeat selects the partition to write messages to. With reachable_only set to false, Filebeat will attempt to write to all available partitions, even if some of them are not currently reachable.
  • required_acks: It indicates the number of acknowledgments required from Kafka before considering a message as successfully written. A value of 1 means that only the leader of the partition needs to acknowledge the message.
  • compression: This parameter specifies the compression codec to use for compressing messages sent to Kafka. Here, ‘gzip’ compression is enabled, which can reduce the message size and network bandwidth usage.
  • max_message_bytes: It sets the maximum size of a message that Filebeat can send to Kafka. In this case, the maximum message size is set to 1,000,000 bytes (1 megabyte).

Read more on Filebeat Kafka output configuration options.

Also check how to change output codec BUT output remains as output.kafka.

Save the changes made to the Filebeat configuration and exit.

Next, test the configuration for any syntax error;

filebeat test config

Test the connection to Kafka broker;

filebeat test output

If everything is okay, you should see an output similar to;


Kafka: 192.168.57.32:9092...
  parse host... OK
  dns lookup... OK
  addresses: 192.168.57.32
  dial up... OK

Start Filebeat.

You can begin by running it in the foreground;

filebeat -e

If everything is fine, you should see a message about the connection to Kafka;

Connection to kafka(192.168.57.32:9092) established

You can then start Filebeat as service. Thus press ctrl+c to stop the foreground process.

systemctl enable --now

Confirm Creation of Kafka Topics on Kafka Broker

On Kafka broker/server terminal, execute this command to check if your topic has been created;

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

My sample output;

filebeat
kafka-topic-test

Reading Logs from Kafka Topic

Read the logs on the topic, use the command;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <broker_host>:<broker_port> --topic <topic_name> [--from-beginning]

For example;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic filebeat

Sample output;


{"@timestamp":"2023-07-16T19:42:06.697Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"log":{"offset":2384006,"file":{"path":"/var/log/auth.log"}},"input":{"type":"log"},"fileset":{"name":"auth"},"service":{"type":"system"},"ecs":{"version":"8.0.0"},"message":"Jul 16 19:42:01 mx CRON[18368]: pam_unix(cron:session): session opened for user root by (uid=0)","host":{"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)"},"id":"e46f0b2ecf5841be8df054ae93c48214","name":"mx.kifarunix-demo.com","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"]},"agent":{"ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"}}
{"@timestamp":"2023-07-16T19:42:06.697Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"input":{"type":"log"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"},"fileset":{"name":"auth"},"ecs":{"version":"8.0.0"},"message":"Jul 16 19:42:01 mx CRON[18369]: pam_unix(cron:session): session opened for user sogo by (uid=0)","log":{"offset":2384102,"file":{"path":"/var/log/auth.log"}},"service":{"type":"system"},"host":{"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"name":"mx.kifarunix-demo.com","ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian"}},"agent":{"version":"8.8.2","ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat"}}
{"@timestamp":"2023-07-16T19:42:06.697Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"message":"Jul 16 19:42:01 mx CRON[18368]: pam_unix(cron:session): session closed for user root","log":{"offset":2384198,"file":{"path":"/var/log/auth.log"}},"service":{"type":"system"},"ecs":{"version":"8.0.0"},"fileset":{"name":"auth"},"input":{"type":"log"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"},"host":{"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"name":"mx.kifarunix-demo.com","hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false},"agent":{"version":"8.8.2","ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat"}}
{"@timestamp":"2023-07-16T19:42:09.055Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-syslog-pipeline"},"log":{"offset":549530,"file":{"path":"/var/log/syslog"}},"message":"Jul 16 19:42:01 mx CRON[18370]: (sogo) CMD (/usr/sbin/sogo-tool expire-sessions 30 >/dev/null 2>&1; /usr/sbin/sogo-ealarms-notify >/dev/null 2>&1)","event":{"module":"system","timezone":"+00:00","dataset":"system.syslog"},"fileset":{"name":"syslog"},"agent":{"name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2","ephemeral_id":"27ebb032-f900-4d05-88d8-51de9b9c1c6d","id":"08b49b73-3585-41ba-9ef5-c0c731203047"},"input":{"type":"log"},"service":{"type":"system"},"ecs":{"version":"1.12.0"},"host":{"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"name":"mx.kifarunix-demo.com"}}
^CProcessed a total of 4 messages

This will read the logs in realtime only.

You can read the logs from the beginning and in realtime at the same time;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic filebeat --from-beginning

Configure Filebeat to Send Logs to Kafka with SSL/TLS

Configure Filebeat-Kafka Output SSL/TLS Connection

What if your Kafka is configured with SSL/TLS encryption, how can you configure Filebeat to send logs to Kafka with SSL/TLS?

Simply update the SSL connection settings;


# ------------------------------ Kafka Output -------------------------------
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka.kifarunix-demo.com:9092"]
  ssl:
    enabled: true
    certificate_authorities: ["/etc/filebeat/kafka.ca.crt"]
    certificate: "/etc/filebeat/kafka/server.crt"
    key: "/etc/filebeat/kafka/server.key"

  # message topic selection + partitioning
  topic: "filebeat-ssl"
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  

You need to copy the CA, server certificate and key from the Kafka server.

Ensure the Kafka hostname is resolvable.

Save and exit file after making changes.

Test Filebeat configuration;

filebeat test config

Unfortunately, in Filebeat, Kafka output doesn’t support TLS testing.

Start Filebeat, in the foreground, just to verify the connection;

filebeat -e

If you see such an output, then you are good.

"Connection to kafka(kafka.kifarunix-demo.com:9092) established"

You can start Filebeat as a service;

systemctl start filebeat

Confirm Creation of Kafka Topics on Kafka Broker

On Kafka broker/server terminal, execute this command to check if your topic has been created;

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9094

(Note that we have configure port 9094 on the Kafka server for local Kafka connections without SSL)

My sample output;

__consumer_offsets
filebeat
filebeat-ssl
kafka-topic-test

Reading Logs from Kafka Topic

Read the logs on the topic, use the command;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic filebeat-ssl

Sample events;


"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"service":{"type":"system"},"input":{"type":"log"},"ecs":{"version":"8.0.0"},"host":{"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","name":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"]},"agent":{"ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2"},"log":{"offset":2602826,"file":{"path":"/var/log/auth.log"}},"message":"Jul 17 04:51:01 mx CRON[35424]: pam_unix(cron:session): session opened for user root by (uid=0)","fileset":{"name":"auth"},"event":{"dataset":"system.auth","timezone":"+00:00","module":"system"}}
{"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"fileset":{"name":"auth"},"input":{"type":"log"},"ecs":{"version":"8.0.0"},"service":{"type":"system"},"host":{"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux"},"name":"mx.kifarunix-demo.com","id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false},"agent":{"type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com"},"message":"Jul 17 04:51:01 mx CRON[35425]: pam_unix(cron:session): session opened for user sogo by (uid=0)","log":{"offset":2602922,"file":{"path":"/var/log/auth.log"}},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"}}
{"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"event":{"module":"system","dataset":"system.auth","timezone":"+00:00"},"fileset":{"name":"auth"},"ecs":{"version":"8.0.0"},"message":"Jul 17 04:51:01 mx CRON[35424]: pam_unix(cron:session): session closed for user root","log":{"offset":2603018,"file":{"path":"/var/log/auth.log"}},"service":{"type":"system"},"input":{"type":"log"},"host":{"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu"},"name":"mx.kifarunix-demo.com"},"agent":{"name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047"}}
{"@timestamp":"2023-07-17T04:51:02.761Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-auth-pipeline"},"host":{"hostname":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"name":"mx.kifarunix-demo.com"},"message":"Jul 17 04:51:02 mx CRON[35425]: pam_unix(cron:session): session closed for user sogo","log":{"offset":2603103,"file":{"path":"/var/log/auth.log"}},"event":{"dataset":"system.auth","module":"system","timezone":"+00:00"},"fileset":{"name":"auth"},"ecs":{"version":"8.0.0"},"input":{"type":"log"},"service":{"type":"system"},"agent":{"type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com"}}
{"@timestamp":"2023-07-17T04:51:09.813Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-syslog-pipeline"},"fileset":{"name":"syslog"},"ecs":{"version":"1.12.0"},"host":{"architecture":"x86_64","os":{"kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu","version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"name":"mx.kifarunix-demo.com","ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com"},"log":{"file":{"path":"/var/log/syslog"},"offset":87749},"event":{"module":"system","dataset":"system.syslog","timezone":"+00:00"},"input":{"type":"log"},"agent":{"type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d","id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com"},"message":"Jul 17 04:51:01 mx CRON[35426]: (sogo) CMD (/usr/sbin/sogo-tool expire-sessions 30 >/dev/null 2>&1; /usr/sbin/sogo-ealarms-notify >/dev/null 2>&1)","service":{"type":"system"}}
{"@timestamp":"2023-07-17T04:51:09.813Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.8.2","pipeline":"filebeat-8.8.2-system-syslog-pipeline"},"fileset":{"name":"syslog"},"ecs":{"version":"1.12.0"},"agent":{"id":"08b49b73-3585-41ba-9ef5-c0c731203047","name":"mx.kifarunix-demo.com","type":"filebeat","version":"8.8.2","ephemeral_id":"02d93da9-5449-4333-9e6e-26250b28456d"},"host":{"name":"mx.kifarunix-demo.com","architecture":"x86_64","os":{"version":"20.04.6 LTS (Focal Fossa)","family":"debian","name":"Ubuntu","kernel":"5.4.0-153-generic","codename":"focal","type":"linux","platform":"ubuntu"},"id":"e46f0b2ecf5841be8df054ae93c48214","containerized":false,"ip":["10.0.2.15","fe80::a00:27ff:fee4:5139","192.168.57.33","fe80::a00:27ff:fe89:7e5f"],"mac":["08-00-27-89-7E-5F","08-00-27-E4-51-39"],"hostname":"mx.kifarunix-demo.com"},"input":{"type":"log"},"event":{"module":"system","dataset":"system.syslog","timezone":"+00:00"},"service":{"type":"system"},"log":{"offset":87896,"file":{"path":"/var/log/syslog"}},"message":"Jul 17 04:51:01 mx CRON[35427]: (root) CMD (/bin/bash /usr/local/bin/fail2ban_banned_db unban_db)"}
^CProcessed a total of 6 messages

And that is it! You should now have encrypted connection between your Filebeat and Kafka broker, for enhanced security.

Configure Consumers

The logs are now hitting your Kafka topics. Similarly, configure your consumers to use SSL/TLS connection while reading log data from the Kafka topics.

That closes our guide on configure Filebeat to send logs to Kafka via un-encrypted and encrypted channels.

Other Tutorials

Check SSL Certificate Expiry Date from Certificate File

Monitor SSL/TLS Certificate Expiry with Prometheus and Grafana

SUPPORT US VIA A VIRTUAL CUP OF COFFEE

We're passionate about sharing our knowledge and experiences with you through our blog. If you appreciate our efforts, consider buying us a virtual coffee. Your support keeps us motivated and enables us to continually improve, ensuring that we can provide you with the best content possible. Thank you for being a coffee-fueled champion of our work!

Photo of author
Kifarunix
Linux Certified Engineer, with a passion for open-source technology and a strong understanding of Linux systems. With experience in system administration, troubleshooting, and automation, I am skilled in maintaining and optimizing Linux infrastructure.

Leave a Comment