{"id":8878,"date":"2021-05-20T22:44:18","date_gmt":"2021-05-20T19:44:18","guid":{"rendered":"https:\/\/kifarunix.com\/?p=8878"},"modified":"2024-03-18T22:20:40","modified_gmt":"2024-03-18T19:20:40","slug":"logstash-write-specific-events-to-specific-index","status":"publish","type":"post","link":"https:\/\/kifarunix.com\/logstash-write-specific-events-to-specific-index\/","title":{"rendered":"Logstash: Write Specific Events to Specific Index"},"content":{"rendered":"\n<p>In this blog post, you will learn how to write specific events to specific index using Logstash. <a aria-label=\"Logstash (opens in a new tab)\" class=\"rank-math-link\" href=\"https:\/\/www.elastic.co\/logstash\" target=\"_blank\" rel=\"noreferrer noopener nofollow\">Logstash<\/a> <em>is a free and open server-side data processing pipeline that ingests data from a multitude of sources, transforms it, and then sends it to your favorite &#8220;stash&#8221;<\/em>, which in this example setup is an <strong>Elasticsearch<\/strong>.<\/p>\n\n\n\n<p>Logstash can be configured to write specific events to specific indices based on conditionals. Follow through to learn how to.<\/p>\n\n\n\n<div class=\"wp-block-rank-math-toc-block\" id=\"rank-math-toc\"><h2>Table of Contents<\/h2><nav><ul><li><a href=\"#configure-logstash-to-write-specific-events-to-specific-index\">Configure Logstash to Write Specific Events to Specific Index<\/a><ul><li><a href=\"#install-and-configure-logstash\">Install and Configure Logstash<\/a><\/li><li><a href=\"#configure-logstash-to-write-specific-events-to-specific-index-1\">Configure Logstash to Write Specific Events to Specific Index<\/a><ul><li><a href=\"#tag-the-event-logs-and-write-to-specific-elasticsearch-index\">Tag the Event Logs and Write to Specific Elasticsearch Index<\/a><\/li><li><a href=\"#logstash-write-events-to-specific-elasticsearch-index-based-on-source-host\">Logstash: Write Events to Specific Elasticsearch Index Based on Source Host<\/a><\/li><\/ul><\/li><li><a href=\"#check-logstash-configuration-syntax\">Check Logstash Configuration Syntax<\/a><\/li><li><a href=\"#verify-logstash-specific-event-index-writing-on-kibana\">Verify Logstash Specific Event Index Writing on Kibana<\/a><\/li><li><a href=\"#other-related-tutorials\">Other Related Tutorials<\/a><\/li><\/ul><\/li><\/ul><\/nav><\/div>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"configure-logstash-to-write-specific-events-to-specific-index\">Configure Logstash to Write Specific Events to Specific Index<\/h2>\n\n\n\n<p>So, how can you configure Logstash to write specific events to specific Elasticsearch index?<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"install-and-configure-logstash\">Install and Configure Logstash<\/h3>\n\n\n\n<p>Well, if you are forwarding your events to Logstash, then definitely you must be having some filters to parse or extract specific fields before being forwarded to stash such Elasticsearch for indexing.<\/p>\n\n\n\n<p>In our previous guides, we covered various tutorials on processing various logs with Logstash;<\/p>\n\n\n\n<p><a href=\"https:\/\/kifarunix.com\/install-logstash-7-on-fedora-30-fedora-29-centos-7\/#logstash-configuration\" target=\"_blank\" aria-label=\"How to parse SSH logs with Logstash (opens in a new tab)\" rel=\"noreferrer noopener\" class=\"rank-math-link\">How to parse SSH logs with Logstash<\/a><\/p>\n\n\n\n<p><a href=\"https:\/\/kifarunix.com\/process-and-visualize-modsecurity-logs-on-elk-stack\/\" target=\"_blank\" aria-label=\" (opens in a new tab)\" rel=\"noreferrer noopener\" class=\"rank-math-link\">Process and Visualize ModSecurity Logs on ELK Stack<\/a><\/p>\n\n\n\n<p><a aria-label=\" (opens in a new tab)\" class=\"rank-math-link\" href=\"https:\/\/kifarunix.com\/visualize-wordpress-user-activity-logs-on-elk-stack\/\" target=\"_blank\" rel=\"noreferrer noopener\">Visualize WordPress User Activity Logs on ELK Stack<\/a><\/p>\n\n\n\n<p>For the purposes of demonstrating how to write specific events to specific index using Logstash, we will consider our guide on <a rel=\"noreferrer noopener\" href=\"https:\/\/kifarunix.com\/install-logstash-7-on-fedora-30-fedora-29-centos-7\/#logstash-configuration\" target=\"_blank\" class=\"rank-math-link\">how to parse SSH logs with Logstash<\/a>.<\/p>\n\n\n\n<p>Consider the Logstash configuration file  below which parses SSH authentication events;<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Failed login by invalid users<\/li>\n<\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>May 20 20:18:59 elk sshd&#91;1831]: Failed password for invalid user admin from 192.168.59.1 port 41150 ssh2<\/code><\/pre>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Accepted Password<\/li>\n<\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>May 20 20:20:32 elk sshd&#91;1863]: Accepted password for root from 192.168.59.1 port 41174 ssh2<\/code><\/pre>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Failed Password for valid users<\/li>\n<\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>May 20 20:22:05 elk sshd&#91;1967]: Failed password for kifarunix from 192.168.59.1 port 41190 ssh2<\/code><\/pre>\n\n\n\n<pre class=\"scroll-box\"><code>\ninput {\n  beats {\n    port => 5044\n  }\n}\nfilter {\n  # parses Successful login events\n  grok {\n    match => { \"message\" => \"%{SYSLOGTIMESTAMP:timestamp}\\s+%{IPORHOST:dst_host}\\s+%{WORD:syslog_program}\\[\\d+\\]:\\s+(?&lt;status>Accepted\\s+password)\\s+for\\s+%{USER:auth_user}\\s+from\\s+%{SYSLOGHOST:src_host}.*\" }\n    add_field => { \"activity\" => \"SSH Logins\" }\n    }\n  # parses Failed login events for valid users\n  grok {\n    match => { \"message\" => \"%{SYSLOGTIMESTAMP:timestamp}\\s+%{IPORHOST:dst_host}\\s+%{WORD:syslog_program}\\[\\d+\\]:\\s+(?&lt;status>Failed\\s+password)\\s+for\\s+%{USER:auth_user}\\s+from\\s+%{SYSLOGHOST:src_host}.*\" }\n    add_field => { \"activity\" => \"SSH Logins\" }\n    }\n  # parses Failed login events for invalid users\n  grok {\n    match => { \"message\" => \"%{SYSLOGTIMESTAMP:timestamp}\\s+%{IPORHOST:dst_host}\\s+%{WORD:syslog_program}\\[\\d+\\]:\\s+(?&lt;status>Failed\\s+password)\\s+for\\s+invalid\\s+user\\s+%{USER:auth_user}\\s+from\\s+%{SYSLOGHOST:src_host}.*\" }\n    add_field => { \"activity\" => \"SSH Logins\" }\n    }\n}   \noutput {\n   elasticsearch {\n     hosts => [\"localhost:9200\"]\n     manage_template => false\n     index => \"ssh_auth-%{+YYYY.MM}\"\n }   \n}\n<\/code><\/pre>\n\n\n\n<p>As per the Logstash Elasticsearch output configuration section;<\/p>\n\n\n\n<pre class=\"scroll-box\"><code>\noutput {\n   elasticsearch {\n     hosts =&gt; [\"localhost:9200\"]\n     manage_template =&gt; false\n     index =&gt; \"ssh_auth-%{+YYYY.MM}\"\n }\n}\n<\/code><\/pre>\n\n\n\n<p>All the events are sent to a single index, called <strong><code>ssh_auth-%{+YYYY.MM}<\/code><\/strong>.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"configure-logstash-to-write-specific-events-to-specific-index-1\">Configure Logstash to Write Specific Events to Specific Index<\/h3>\n\n\n\n<h4 class=\"wp-block-heading\" id=\"tag-the-event-logs-and-write-to-specific-elasticsearch-index\">Tag the Event Logs and Write to Specific Elasticsearch Index<\/h4>\n\n\n\n<p>Now, we would like to write the events for <strong>failed login for valid users<\/strong>, <strong>failed login events for invalid users<\/strong>, and <strong>successful login events<\/strong> to individual indices.<\/p>\n\n\n\n<p>Logstash supports the use <a aria-label=\"conditionals (opens in a new tab)\" href=\"https:\/\/www.elastic.co\/guide\/en\/logstash\/current\/event-dependent-configuration.html#conditionals\" target=\"_blank\" rel=\"noreferrer noopener\" class=\"rank-math-link\">conditionals<\/a> that can help with this.<\/p>\n\n\n\n<p>To easily demonstrate how to tag the event log and write them to specific index on Elasticsearch, configure each grok pattern regex to <strong>add a tag to the event<\/strong> that will separate the events from each other.<\/p>\n\n\n\n<p>For example, for successful login events, a tag, <strong>successful_login<\/strong>, will be added to the event, same applies to <strong>failed login for valid users<\/strong> and <strong>failed login events for invalid users<\/strong> events which adds the <strong>failed_login<\/strong> and <strong>invalid_users<\/strong> tags respectively.<\/p>\n\n\n\n<pre class=\"scroll-box\"><code>\ninput {\n  beats {\n    port => 5044\n  }\n}\nfilter {\n  # parses Successful login events\n  grok {\n    match => { \"message\" => \"%{SYSLOGTIMESTAMP:timestamp}\\s+%{IPORHOST:dst_host}\\s+%{WORD:syslog_program}\\[\\d+\\]:\\s+(?&lt;status>Accepted\\s+password)\\s+for\\s+%{USER:auth_user}\\s+from\\s+%{SYSLOGHOST:src_host}.*\" }\n    add_field => { \"activity\" => \"SSH Logins\" }\n    add_tag => \"successful_login\"\n    }\n  # parses Failed login events for valid users\n  grok {\n    match => { \"message\" => \"%{SYSLOGTIMESTAMP:timestamp}\\s+%{IPORHOST:dst_host}\\s+%{WORD:syslog_program}\\[\\d+\\]:\\s+(?&lt;status>Failed\\s+password)\\s+for\\s+%{USER:auth_user}\\s+from\\s+%{SYSLOGHOST:src_host}.*\" }\n    add_field => { \"activity\" => \"SSH Logins\" }\n    add_tag => \"failed_login\"\n    }\n  # parses Failed login events for invalid users\n  grok {\n    match => { \"message\" => \"%{SYSLOGTIMESTAMP:timestamp}\\s+%{IPORHOST:dst_host}\\s+%{WORD:syslog_program}\\[\\d+\\]:\\s+(?&lt;status>Failed\\s+password)\\s+for\\s+invalid\\s+user\\s+%{USER:auth_user}\\s+from\\s+%{SYSLOGHOST:src_host}.*\" }\n    add_field => { \"activity\" => \"SSH Logins\" }\n    add_tag => \"invalid_users\"\n    }\n}\n<\/code><\/pre>\n\n\n\n<p>Now that we have tagged the events, we will then modify our Logstash output with the if statement as shown below;<\/p>\n\n\n\n<pre class=\"scroll-box\"><code>\noutput {\nif \"successful_login\" in [tags] {\n   elasticsearch {\n     hosts =&gt; [\"localhost:9200\"]\n     index =&gt; \"ssh-success-logins-%{+YYYY.MM}\"\n     }\n   }\n   else if \"failed_login\" in [tags] {\n   elasticsearch {\n     hosts =&gt; [\"localhost:9200\"]\n     index =&gt; \"ssh-failed-logins-%{+YYYY.MM}\"\n     }\n   }\n   else if \"invalid_users\" in [tags] {\n   elasticsearch {\n     hosts =&gt; [\"localhost:9200\"]\n     index =&gt; \"ssh-invalid-users-%{+YYYY.MM}\"\n     }\n   }\n   else {\n   elasticsearch {\n     hosts =&gt; [\"localhost:9200\"]\n     index =&gt; \"other-ssh-events-%{+YYYY.MM}\"\n     }\n   }\n}\n<\/code><\/pre>\n\n\n\n<p>This will cause Logstash to write <strong>successful login events<\/strong> to Elasticsearch index called <strong>ssh-success-logins-*<\/strong>, <strong>failed login events<\/strong> to index called <strong>ssh-failed-logins-*<\/strong>, <strong>invalid user login events<\/strong> to index called <strong>ssh-invalid-users-*<\/strong> and <strong>any other events<\/strong> to <strong>other-ssh-events-*<\/strong> index.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\" id=\"logstash-write-events-to-specific-elasticsearch-index-based-on-source-host\">Logstash: Write Events to Specific Elasticsearch Index Based on Source Host<\/h4>\n\n\n\n<p>What if you want to sent logs from different host to their respective indices on Elasticsearch?<\/p>\n\n\n\n<p>The use of the source host metadata information such as agent name (<strong>agent.name<\/strong>), host name (<strong>host.name or host.hostname<\/strong>), host IP (<strong>host.ip<\/strong>) can be used together with Logstash conditionals to filter the logs and write them to specific indices based on the metadata.<\/p>\n\n\n\n<p>See example Logstash output configs below;<\/p>\n\n\n\n<p>Write logs to specific index based on the information on the <strong>agent.name<\/strong> field on the logs.<\/p>\n\n\n\n<pre class=\"scroll-box\"><code>\noutput {\nif [agent][name] =~ \"dev-env\" {\n   elasticsearch {\n     hosts => [\"https:\/\/elk.kifarunix-demo.com:9200\"]\n     index => \"dev-env-%{+YYYY.MM}\"\n     }\n   }\n   else if [agent][name] =~ \"mx\" {\n   elasticsearch {\n     hosts => [\"https:\/\/elk.kifarunix-demo.com:9200\"]\n     index => \"mx-%{+YYYY.MM}\"\n     }\n   }\n}\n<\/code><\/pre>\n\n\n\n<p>Write logs to specific index based on the information on the <strong>host.name<\/strong> field on the logs.<\/p>\n\n\n\n<pre class=\"scroll-box\"><code>\noutput {\nif [host][name] =~ \"dev-env\" {\n   elasticsearch {\n     hosts => [\"https:\/\/elk.kifarunix-demo.com:9200\"]\n     index => \"dev-env-%{+YYYY.MM}\"\n     }\n   }\n   else if [host][name] =~ \"mx\" {\n   elasticsearch {\n     hosts => [\"https:\/\/elk.kifarunix-demo.com:9200\"]\n     index => \"mx-%{+YYYY.MM}\"\n     }\n   }\n}\n<\/code><\/pre>\n\n\n\n<p>Based on source host IP;<\/p>\n\n\n\n<pre class=\"scroll-box\"><code>\noutput {\nif \"192.168.56.112\" in [host][ip] {\n   elasticsearch {\n     hosts => [\"https:\/\/elk.kifarunix-demo.com:9200\"]\n     index => \"dev-env-%{+YYYY.MM}\"\n     }\n   }\n   else if \"192.168.56.111\" in [host][ip] {\n   elasticsearch {\n     hosts => [\"https:\/\/elk.kifarunix-demo.com:9200\"]\n     index => \"mx-%{+YYYY.MM}\"\n     }\n   }\n}\n<\/code><\/pre>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"check-logstash-configuration-syntax\">Check Logstash Configuration Syntax<\/h3>\n\n\n\n<p>Save the Logstash configuration file and run syntax check using the command;<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">sudo -u logstash \/usr\/share\/logstash\/bin\/logstash --path.settings \/etc\/logstash -t<\/pre>\n\n\n\n<pre class=\"wp-block-code\"><code>Configuration OK\n&#91;2021-05-20T21:50:10,111]&#91;INFO ]&#91;logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash<\/code><\/pre>\n\n\n\n<p>If you get&nbsp;<strong>Configuration OK<\/strong>&nbsp;then you are good to go.<\/p>\n\n\n\n<p>Start Logstash;<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>systemctl start logstash<\/code><\/pre>\n\n\n\n<p>You can read logstash logs;<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>tail -f \/var\/log\/logstash\/logstash-plain.log<\/code><\/pre>\n\n\n\n<p>Logstash is now ready to receive events from Filebeat.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"verify-logstash-specific-event-index-writing-on-kibana\">Verify Logstash Specific Event Index Writing on Kibana<\/h3>\n\n\n\n<p>Assuming you already setup Filebeat and performed the ssh authentication events, then you navigate to Kibana and verify if you have Elasticsearch indices created;<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"1856\" height=\"781\" src=\"https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/elasticsearch-index.png\" alt=\"Logstash: Write Specific Events to Specific Index\" class=\"wp-image-8886\" title=\"\" srcset=\"https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/elasticsearch-index.png?v=1621538575 1856w, https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/elasticsearch-index-768x323.png?v=1621538575 768w, https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/elasticsearch-index-1536x646.png?v=1621538575 1536w\" sizes=\"(max-width: 1856px) 100vw, 1856px\" \/><\/figure>\n\n\n\n<p>You can now go ahead and create Kibana indices, <strong>Kibana &gt; Index Patterns &gt; Create index patterns<\/strong>.<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"1854\" height=\"652\" src=\"https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/kibana-indices.png\" alt=\"\" class=\"wp-image-8887\" title=\"\" srcset=\"https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/kibana-indices.png?v=1621539133 1854w, https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/kibana-indices-768x270.png?v=1621539133 768w, https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/kibana-indices-1536x540.png?v=1621539133 1536w\" sizes=\"(max-width: 1854px) 100vw, 1854px\" \/><\/figure>\n\n\n\n<p>From the Discovery;<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"1882\" height=\"720\" src=\"https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/getting-events-from-discovery-menu.png\" alt=\"\" class=\"wp-image-8888\" title=\"\" srcset=\"https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/getting-events-from-discovery-menu.png?v=1621539310 1882w, https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/getting-events-from-discovery-menu-768x294.png?v=1621539310 768w, https:\/\/kifarunix.com\/wp-content\/uploads\/2021\/05\/getting-events-from-discovery-menu-1536x588.png?v=1621539310 1536w\" sizes=\"(max-width: 1882px) 100vw, 1882px\" \/><\/figure>\n\n\n\n<p>And there you go, you have written different events to different Elasticsearch indices.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"other-related-tutorials\">Other Related Tutorials<\/h3>\n\n\n\n<p><a aria-label=\" (opens in a new tab)\" href=\"https:\/\/kifarunix.com\/easy-way-to-configure-filebeat-logstash-ssl-tls-connection\/\" target=\"_blank\" rel=\"noreferrer noopener\" class=\"rank-math-link\">Easy way to configure Filebeat-Logstash SSL\/TLS Connection<\/a><\/p>\n\n\n\n<p><a href=\"https:\/\/kifarunix.com\/how-to-debug-logstash-grok-filters\/\" target=\"_blank\" aria-label=\" (opens in a new tab)\" rel=\"noreferrer noopener\" class=\"rank-math-link\">How to Debug Logstash Grok Filters<\/a><\/p>\n\n\n\n<p><a href=\"https:\/\/kifarunix.com\/integrate-wazuh-manager-with-elk-stack\/\" target=\"_blank\" aria-label=\" (opens in a new tab)\" rel=\"noreferrer noopener\" class=\"rank-math-link\">Integrate Wazuh Manager with ELK Stack<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this blog post, you will learn how to write specific events to specific index using Logstash. Logstash is a free and open server-side data<\/p>\n","protected":false},"author":1,"featured_media":8886,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"rank_math_lock_modified_date":false,"footnotes":""},"categories":[910,121,72],"tags":[3567,3566,3564,3568,3565],"class_list":["post-8878","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-elastic-stack","category-howtos","category-monitoring","tag-elasticsearch-index","tag-logstash-write-to-specific-index","tag-logstash-write-specific-events-to-specific-index","tag-send-specific-events-to-specific-index","tag-write-data-to-specific-index","generate-columns","tablet-grid-50","mobile-grid-100","grid-parent","grid-50","resize-featured-image"],"_links":{"self":[{"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/posts\/8878"}],"collection":[{"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/comments?post=8878"}],"version-history":[{"count":5,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/posts\/8878\/revisions"}],"predecessor-version":[{"id":21808,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/posts\/8878\/revisions\/21808"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/media\/8886"}],"wp:attachment":[{"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/media?parent=8878"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/categories?post=8878"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/kifarunix.com\/wp-json\/wp\/v2\/tags?post=8878"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}