Introduction and Installation of Logstash

So far in this series on ELK (Elasticsearch-Logstash-Kibana) stack, we have talked about Elasticsearch and Kibana(here and here). It’s about time we look at the third pillar of this stack – Logstash.

So, what is Logstash?

Logstash is an open source data collection engine with real-time pipelining capabilities.  This implies that Logstash can dynamically put data from different sources together and clean/filter the data and send it to any output destinations we need. It’s a pluggable architecture that allows great flexibility. Logstash can take a wide variety of unstructured data and then clean it and transform it to gain near real-time insights immediately.

What is Grok?

As mentioned in Logstash documentation -“Grok is the bread and butter of Logstash filters and is used ubiquitously to derive structure out of unstructured data.”  A bit more details on Grok will help you understand Logstash better, please see :

https://www.elastic.co/guide/en/logstash/6.4/plugins-filters-grok.html

We will now jump directly into Logstash installation.

Run the following at command line of your ubuntu server

$sudo apt-get update && sudo apt-get install logstash

Information about updated versions of packages is checked with “update” command and hence is run before every installation.

Once the installation is complete, we can test Logstash with a very basic Logstash pipeline (the concept of Logstash pipeline is like so : {inputs->filters->outputs}).

$/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'

The input is stdin and the output is stdout.  Once Logstash is up and running, we can enter “hello world” on the command line and Logstash prints it back with the timestamp, version and host. This confirms that the setup is working.

If you prefer to test with a sample log file, then we can do so by  downloading apache log file and creating the configuration file required for Logstash.

Elasticsearch documentation provides sample apache log files to test Logstash. To download, type the following at command prompt

$wget https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/apache_logs/apache_logs

The file can also be accessed from  https://github.com/elastic/examples/tree/master/Common%20Data%20Formats/apache_logs

The configuration file can be created like so

$sudo vi /etc/logstash/conf.d/logstash.conf

This will open an empty file. Press “Esc and then i” to get into “insert” mode and copy the following:

input {
       file {
           path => "/home/<YOUR_USER_NAME"/apache_logs"
           start_position => "beginning"
           ignore_older => 0
          }
}
filter {
         grok {
                 match =>  { "message" => "%{COMBINEDAPACHELOG}" }
                }
          date{
                 match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z " ]
                }
}
output {
          elasticsearch {
                     hosts => [ "localhost:9200" ]
                   }
          stdout {
                  codec => rubydebug
                }
}

Please replace the placeholders above with the relevant values for you. Once done, press “ESC + wq!”. This will save the data into the file and exit the editor.

A Logstash pipeline accepts inputs and filters of our choice and outputs to destination/destinations of our choice. This is exactly what we are defining in the config file above.  In the input section, we specified the file we want Logstash to monitor and then explicitly mentioned that we want it to start reading from the beginning. Otherwise, Logstash only checks for the updates (i,e. tails the file) and by mentioning ignore_older, we are telling Logstash to consider old files too.

In the output section, we are specifying that we want the output redirected to Elasticsearch that is running on port 9200 on our local machine. Apart from this, we also want it to display the output to standard output i,e. our screen. codec is the name of Logstash codec. Codecs provide a way to encode and decode input and output data.

Run Logstash with the config file like so

$/usr/share/logstash/bin/logstash  -f  /etc/logstash/conf.d/logstash.conf

(note – Logstash dumps a lot of messages on the screen and may take sometime to start up)

Successfully started pipeline implies Logstash started correctly and is listening to any changes on the log file. Any new updates to the log file will be displayed here.

For exhaustive list of options for the config file, please check the Elasticsearch official documentation –  https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html

Examples for config file can be referenced on –  https://www.elastic.co/guide/en/logstash/current/config-examples.html

Troubleshooting Logstash installation:

It’s common to run into installation issues while setting up Logstash. We have described some common cases here.

Unrecognized VM option ‘UseParNewGC’

This indicates that the Java version on the system is not compatible with Logstash. At the time of writing of this post, Logstash requires java 8 and is not compatible with higher versions. So, first let’s check the version present on our system

$java -version

To check all the java versions available on the system, you can type

$update-java-alternatives --list

This will display the available java versions. From the screenshot below, we can see that java version 8 is already present on the system.

So, we can either make version 8 as default version or export the java_home variable. We will export java_home and then run logstash.

$echo $JAVA_HOME

$export JAVA_HOME="/usr/lib/jvm/java-8-oracle"

echo $JAVA_HOME will display the path of java home if its set. A blank implies java home variable is not set.  This command gives the complete path to java installation and we can use it in the export command to set new java_home variable.

Once export runs successfully, you can use “echo” command as shown above and check if the java_home is set to version 8. You can then start Logstash and it should run successfully.

In case java version 8 is not available on your system, you can download either from oracle or from openjdk.

$sudo apt-get install oracle-java8-installer

Installation will dump a lot of messages on the screen as shown below

You can now export this path into java_home and run Logstash again and it should be good to go.

With this we are moving closer to our final post in the ELK series, wherein we will talk about a case study outlining a real life use case involving ELK stack. Stay tuned!

Introduction to Visualizations using Kibana with Elasticsearch

We talked about Kibana installation on Ubuntu in the previous blog post, let’s look at how to setup some basic visualizations using Kibana with Elasticsearch.

Log into Kibana using  http://127.0.0.1:5601. Once the Kibana page opens up, from the left side Menu, click on “Management” and then choose Index Patterns->create Index Pattern and fill in the data as shown below (Once you start typing, it should display “bank” as an option). Go ahead and click next on step 2 and the index will be created and the fields will be displayed.

All the fields that are part of the accounts.json file will be displayed here (the file can be downloaded as per steps in the first blog post of this Elasticsearch series)

Let’s get a basic understanding of what Kibana offers. From left sidebar Menu, choose the first option “Discover”. Kibana will load data for index “bank”. If you already have more than one index, you can choose from the dropdown as shown below

You can use the filters on top of the screen to fetch the data of your choice

You can type them in the field provided above as follows (we are querying for all those accounts where the balance is greater than 40000)

Now, let’s see how many accounts are present for different age groups. So, select  age field and click on visualize

You will get an age wise descending graph depicting the number of accounts present for a certain age with an account balance greater than 40,000

We can also use “Visualize” option from the left side Menu to create useful visualizations for the data. Let’s create a pie chart.

Click on Visualize->Pie

Choose split slices and then aggregation=Range, Field=balance, From and To fields you can give as below or however you wish your data to be divided. The  balance ranges are all shown in different colors, depicting the number of accounts in each balance range.

Let’s try another visualization. Click on Visualization->Area(in basic charts). choose the index “bank” and then under “buckets” choose “split chart”->Aggregation=Range, Field=balance, and enter the different ranges for balance. You get to see something like this

Hmm, not very useful.  Let’s change it a bit. Choose “X-axis”under “buckets” and then repeat the same data for aggregation, field and balance ranges.

Well, that’s definitely more readable. It’s obvious that the number of accounts are higher for 10000-20000 range than 0-10000. Then, there’s a very slight increase for range 20000-30000 and then it reduces for 30000-40000.  Kibana offers a rich suite of visualizations, we just need to learn how to select the right option to make sense of the data that we have.

We can read more about the visualizations, fields, parameters and advanced options offered by Kibana on the official documentation page. You can check it out here:

Now that we are familiar with Kibana interface, lets do some basic search operations using Elasticsearch and replicate the same using Kibana. To do this, we shall use the accounts.json file again. Let’s use that data and gather some insights.

So, let’s say the bank wishes to know about the number of customers it has in each age group. We need to group together data based on customers’ age. This type of clustering of data is called aggregation and is one of the most powerful ways for data analysis.

On the command line, you can invoke this aggregation using curl (on the server running Elasticsearch)

curl -H -XGET 'localhost:9200/bank/account/_search?size=0&pretty' -d '
{
  "aggs": {
           "age": {
                   "terms": {
                             "field": "age"
                             }
                  }
          }
}'

Description of the query:

_search in the Curl request says our request is about searching the data, Size=0 specifies that we do not want the entire output of matched documents printed onto our screen.  This makes sure that only the result of our aggregation is displayed onto the screen. “pretty” indicates, the output of the query should be in readable format, otherwise, we will have a log file kind of output which is user friendly.

“aggs” says its an aggregate function that we are requesting and “age” is the name of our custom aggregate and “terms” indicates the fields we will be using for running our aggregation.  “field” implies each field we will be using.

and the output will look like

The output shows that the data has been grouped by age along with the number of customers who fall in each age group. (“key” refers to the age of the customer and “doc_count” to the total number of customers who are of the mentioned age).

Let’s visualize the same output on Kibana (you can cross check with the data on the screen above)

To sort by the age of the customer, choose Alphabetical as shown in the screen below:

Let’s say we want to know the number of customers with bank balance greater than 10000, for different age groups. This requires us to filter our search for balance greater than 10,000 and then aggregate them by the age.

If you wish to see all the documents that are being considered for aggregation, you can run the above command without the “size=0” option. It will display all the documents being considered for aggregation along with the aggregation results.

The output will look like so (Each document whose search criteria is met, will be displayed):

Querying for the same data using Python

import requests
res = requests.get('http://localhost:9200')
print(res.content)

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

body = {
"query": {
"term": {
"account_number": 99
}
},
}
output=es.search(index='bank', doc_type='account', body=body)
for doc in output['hits']['hits']:
print("%s) %s" % (doc['_id'], doc['_source']))

I would highly recommend that you practice these visualizations (and some more) on your local dev machine so that you can get a much deeper understanding of these concepts.

Kibana Installation and Troubleshooting Tips

In the previous blog post, we gave a bird’s eye introduction to Elasticsearch, which is a highly scalable open-source full-text search and analytics engine.  But, everyone does not get a kick out of typing commands for querying data using Elasticsearch. Is there another way around?

What is Kibana? Why is it required when I already have Elasticsearch?

Well, as they say – a picture is worth a thousand words. We have an open source analytics and visualization platform called Kibana that plays nice with Elasticsearch. Additionally, setting up Kibana is easy and you can access the underlying data without having to write a single line of code.  Also, Kibana uses web browser as its interface, so you do not need to install any tools for accessing Kibana.

Let’s get started with Kibana and you will see how easy it is to set it up.

Installing Kibana and troubleshooting

Command for installing Kibana on the same server as Elasticsearch:

$sudo apt-get install kibana

Once kibana installs, we change the host entry as in elasticsearch to open it for access from anywhere. This should not be done for production systems. Type the following:

$sudo vi /etc/kibana/kibana.yml

Use arrow keys to move down the file. We need to change the following line to open access from anywhere (again, this configuration should not be used on production systems).

Server.host : “localhost” this needs to changed to server.host : “0.0.0.0”

Now, we will add kibana to the daemon services, so it starts automatically on system startup

$sudo /bin/systemctl daemon-reload

$sudo /bin/systemctl enable kibana.service

$sudo /bin/systemctl start kibana.service

We have now started the service, we can access kibana from web browser by typing 127.0.0.1:5601 (kibana runs on port 5601). If everything goes fine, you will see the kibana home page

Troubleshooting:

What if kibana home page doesn’t open up as shown above, but you get a page with a “cannot be displayed” error!

In that case, first we need to check if kibana is up and running. To do this, type the following command

$netstat -a -n | grep tcp | grep 5601

You will see something like so (Kibana is up and running on the port 5601. LISTEN implies, it is listening for incoming requests and ready to process them)

What if nothing comes up? It means, kibana service has not started for some reason.  One way to check for the underlying cause :

$journalctl -u kibana.service

journalctl Linux command is used to query the contents of journal written by Linux service manager. Earlier, we started Kibana as a service (using command ‘systemctl start kibana.service’ ) and hence the logging data can be retrieved using journalctl.

You will see a log similar to the one below. As you see, kibana has logged its activity here and we can read through the log to figure out what went wrong with the service we started. In this case, kibana did not start since device seems to be running out of space.

The above command might output a lot of information which might not even be of recent attempted runs of kibana. To check what has been logged for the most recent run, the following command can be used

$journalctl -u kibana.service --since today

To display a set number of lines(n) you can use the syntax below (we are displaying last 20 lines)

$journalctl -n 20

Coming to the issue – one way to deal with space scarcity is to check for archives data that fills up quite fast on the server if left unchecked. Type the following at the prompt

$sudo du -sh /var/cache/apt/archives

Output will be something similar to as shown above (496M is the space occupied). I have  cleared my archives with the command below and as you see, the updated occupied space is only 16k now).

If the space occupied is quite high, you can clear them using

$sudo apt-get clean

Sometimes logs (that we check through journalctl command) also take up a lot of space. Run the following command to check the space used

$journalctl --disk-usage

If you wish to clear them, the following command can be used (to clear all logs except for last 1 day. You can give 1 year if you wish to retain all year old logs and so on, based on your requirement)

$sudo journalctl --vacuum-time=1day

Now rerun the above command to check the updated space occupied by archives.

You can try restarting kibana as shown below and see if it works:

$sudo /bin/systemctl restart kibana

To check status of kibana you can use:

$sudo /bin/systemctl status kibana.service

As we are using Virtualbox to run Ubuntu and kibana didn’t start even after clearing archives, we had to allocate more device space and I have outlined the steps as shown below.

Before making any changes to device space , please logout of ubuntu and then the virtualBox.  We need the exact location of the virtual box space.

Goto VirtualBox -> ubuntu ->settings->storage->select vdi as shown below from left panel.

Copy everything it shows in the location and exit out of VirtualBox.

Now, open terminal and type the following

(The actual Syntax of the command is

sudo VBoxManage modifyhd  “location_where_disk_is” -resize “to new size in MBs”)

$sudo VBoxManage modifyhd /Users/<my_name>/VirtualBox\ VMs/ubuntu-elastic/ubuntu-elastic.vdi –resize 25600

This will increase the disk size to the new size. Once this completes successfully, repeat steps above for restarting the services and check if they have run successfully.

That’s it folks, look forward to your comments or questions on Kibana.

Beginners Guide to Elasticsearch

What is Elasticsearch and why should I know about it?

Elasticsearch is a highly scalable open-source full-text search and analytics engine. It allows you to store, search and analyze big volumes of data quickly and in near real time (Numbers, text, geo, structured, unstructured. All data types are welcome). It is generally used as the underlying engine/technology that powers applications that have complex search features and requirements.

OK, so what can I do with it?

Well, you can develop a technology product using Elasticsearch, that can help in solving following use cases for –

  • A Blogger – Are a lot of people talking about me on Facebook/Twitter/Linkedin? (Number of people who tagged/shared/mentioned me)
  • A Budding Star – How popular Am I? Only my fans know me or the rest of the world as well?
  • A Musician – I would like to hold a concert outside my country, how do I know where I’m popular?
  • A Sales Manager – Are people talking about our product? Which social media platform is good for us ? Twitter? Facebook? Linkedin? (Breaking up and analyzing the data geography-wise in order to give a bird’s eye view of the scenario)
  • A B2C manager – Once in a while, we end up with a customer who is unable to complete his order. We are forced to manually step in and complete his process. Is there a better way? (Analyzing and making sense of thousands of log files and pinpoint the cause of issue)
It is useful to look at some of the common terms in the world of Elasticsearch, and understand their definitions. If you would like, please look at the source here:
(https://www.elastic.co/guide/en/elasticsearch/reference/current/_basic_concepts.html)

Cluster

A cluster is a collection of one or more nodes (servers) that holds together your entire data and provides federated indexing and search capabilities across all nodes. A cluster is identified by a unique name which by default is “elasticsearch” for an elasticsearch cluster. Grouping together of nodes (or servers or individual computers) helps in faster querying and load balancing.

Node

A node is a single server that is part of your cluster, stores your data, and participates in the cluster’s indexing and search capabilities. Just like a cluster, a node is identified by a name which by default is a random Universally Unique Identifier (UUID) that is assigned to the node at startup.

Index

An index is a collection of documents that have somewhat similar characteristics. For example, you can have an index for customer data, another index for a product catalog, and yet another index for order data.

If you already have an idea of structured Databases, you can equate an Index to a Database.

Type

A type used to be a logical category/partition of your index to allow you to store different types of documents in the same index, eg one type for users, another type for blog posts. It is no longer possible to create multiple types in an index, and the whole concept of types has been removed in Elasticsearch 6.

A type stands equal to a table of a database.

Why are mapping types being removed?

(taken from official elasticsearch documentation https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html)

Initially, we spoke about an “index” being similar to a “database” in an SQL database, and a “type” being equivalent to a “table”.

This was a bad analogy that led to incorrect assumptions. In an SQL database, tables are independent of each other. The columns in one table have no bearing on columns with the same name in another table. This is not the case for fields in a mapping type.

In an Elasticsearch index, fields that have the same name in different mapping types are backed by the same Lucene field internally. In other words, using the example above, the user_name field in the user type is stored in exactly the same field as the user_name field in the tweet type, and both user_name fields must have the same mapping (definition) in both types.

On top of that, storing different entities that have few or no fields in common in the same index leads to sparse data and interferes with Lucene’s ability to compress documents efficiently.

For these reasons, the concept of mapping types from Elasticsearch has been removed.

Document

A document is a basic unit of information that can be indexed. For example, you can have a document for a single customer, another document for a single product, and yet another for a single order. This document is expressed in JSON (JavaScript Object Notation) which is a ubiquitous internet data interchange format.

A document stands equal to a row of a table.

Within an index/type, you can store as many documents as you want. Note that although a document physically resides in an index, a document actually must be indexed/assigned to a type inside an index.

Shards & Replicas

An index can potentially store a large amount of data that can exceed the hardware limits of a single node. And result in a node that may be too slow to serve search requests alone.

To solve this problem, Elasticsearch provides the ability to subdivide your index into multiple pieces called shards. When you create an index, you can simply define the number of shards that you want. Each shard is in itself a fully-functional and independent “index” that can be hosted on any node in the cluster.

The mechanics of how a shard is distributed and also how its documents are aggregated back into search requests are completely managed by Elasticsearch and is transparent to you as the user.

In a network/cloud environment where failures can be expected anytime, it is very useful and highly recommended to have a failover mechanism in case a shard/node somehow goes offline or disappears for whatever reason. To this end, Elasticsearch allows you to make one or more copies of your index’s shards into what are called replica shards, or replicas for short.

How are Lucene and Elasticsearch connected?

Lucene or Apache Lucene is a high-performance, full-featured text search engine library written entirely in Java. It is a technology suitable for nearly any application that requires full-text search, especially cross-platform. Elasticsearch is built over Lucene.

For now, this glossary of terms is good enough to get up and running with a basic setup of Elasticsearch. 

Installing Elasticsearch on ubuntu

Official Debian package installation can be followed from

https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html

This page talks in detail about all the steps, configurations and issues.

If you wish to fast forward to a quick installation, you can follow the steps below (to install Elasticsearch on Ubuntu):

  • Download and install the public signing key:
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  • Installing apt-transport-https will find the appropriate versions on the repositories.
sudo apt-get install apt-transport-https
  • Save the repository definition to /etc/apt/sources.list.d/elastic-6.x.list
echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list
  • Install Elasticsearch package
 sudo apt-get update && sudo apt-get install elasticsearch
  • To be able to access Elasticsearch from anywhere, make the following changes (not for production systems)
sudo vi /etc/elasticsearch/elasticsearch.yml

(this opens up the file in an editor to help us make changes)

Press the arrow keys to move down in the file and locate the following line

Network.host : 0.0.0.0 (this line is generally commented, uncomment it and update the host to all zeros as shown)

Press esc and then press :wq and enter. This will save the changes to the file and exit the file.

  • To configure Elasticsearch to start automatically when the system boots up, run the following commands:
sudo /bin/systemctl daemon-reload

sudo /bin/systemctl enable elasticsearch.service
  • To start Elasticsearch
sudo systemctl start elasticsearch.service
  • Let’s test our Elasticsearch by typing
curl 127.0.0.1:9200

     You will see something on similar lines.

Sample data sets are available on

https://www.elastic.co/guide/en/kibana/current/tutorial-load-dataset.html

Download the accounts json zip file (ready to use as it doesn’t require mapping) on the server using the command

wget https://download.elastic.co/demos/kibana/gettingstarted/accounts.zip

Unzip using

unzip accounts.zip

If Unzip is not available, download Unzip software using the command

sudo apt install unzip

(you might be asked for sudo password-enter and continue)

Once Unzip is successfully installed, you can run the above command and unzip the accounts.zip file. On successful completion, this creates the accounts.json file. We will use this file as our input, to enter data for our search queries.

Run the following command to insert the json created above

curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/bank/account/_bulk?pretty' --data-binary @accounts.json

(here, our index is ‘bank’ and our document is ‘account’)

Accessing our Elasticsearch from Python Client:

To install Elasticsearch package on Python, run the following command from terminal of Python installation

$pip install elasticsearch

Let’s now look at a very simple python program to connect to our Elasticsearch engine and query for data.

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

es.get(index='bank', doc_type='account', id=990)

Output on the console will look like so : (data is retrieved from the accounts.json file that we had setup into our Elasticsearch cluster in the previous step). 

Linear regression using Apache Spark MLlib

What is linear Regression?

Wikipedia states – In statistics, linear regression is a linear approach to modeling the relationship between  dependent variable and one or more  independent variables.

Linear regression is a basic and commonly used type of predictive analysis.

Back to school math, every straight line can be represented by the equation: y = mx + b, where y is dependent variable and X is the independent variable on which y depends. 

How can we use regression for a real life use case?! Let’s take an example – what if I have data of past 5-10 years on the quantity of wheat produced annually. With Linear regression, I will be able to predict what the wheat production would be this year or a year from now.

Why is prediction so important? It helps us plan and be prepared.  It helps us deal with unforeseen situations. In the context of above example, a country will know the quantity of wheat it can export/import in a year.  It means, if global demand seems much lower than the quantity we foresee producing,  we can help farmers choose some other crop, since less demand means a bare minimum selling rate.

Some additional use cases:

  1. A consistent increase in Blood pressure and sugar levels, Is the patient heading towards heart attack?
  2. A distinctive seismic activity, are we in for a tsunami /earth quake?
  3. Inward migration of birds increasing on yearly basis. Are a certain species of trees responsible for alluring birds?
  4. Will a certain steel stock move up or down this year?

These are all some basic uses cases of Linear regression model. We call it regression, because we will be predicting continuous values (as opposed to a yes or no result). Graphically, a linear equation (with one dependent and one independent variable) looks similar to this

So, for every X , we can derive the value of Y from the equation.

What happens if Y is dependent on , not just one independent variable (X) but few more variables.  The graph above will have not just X,Y axis but Z axis too to represent the second independent variable. More than two independent variables are hard to depict graphically. But, can be represented quite easily with equation as

y=   β + β1×1 + β2×2….

where β 1,β2…βn are coefficients of X1, X2,X3

β  is the y-intercept

What this implies for our wheat production example is:

y(total wheat produced in a year) =  β +  β1*Total available land in acres + β2 * rainfall received for that year +  β3 * fertiliser availability in the market….. Here, rainfall received, fertiliser availability are the added independent variables apart from the size of land in acres.

Where does Machine Learning come into picture?

In the above equation – β1, β2, β3…are all coefficients whose value we need to compute to form the linear equation.  This is where learning happens. Based on past data, ML learns the values of these coefficients through a number of iterations.

How?

To start with, we feed in data of past few years to ML packages, this is called training data, since it helps train the ML model. Through a large chunk of data and a number of iterations, the model learns the values of each of the coefficients.

We can then evaluate the model, ML packages offers a lot of functions and related parameters to know how the model has performed. Based on these values we decide if the model has learnt “enough” and then use this on the data we need to predict for (test data).

What is the error people talk about in ML?

If all data points (Y1,Y2,Y3….) form a perfect linear line as shown above, we can derive an exact output (prediction) we are looking. But, in the real world, this is not the case. The data points do not exactly form a line.  They are a bit scattered on the graph. So what do we do?  We make a line in such a way that its at a least possible distance from the points as shown below:

this is where root mean square error or any other error reduction method is used. So, we use the chosen method, and come out with a line (which best depicts the given data points)as output. Based on this line we predict the outcome, this is where our final values come from.

Let’s look at a very basic example of linear regression in PySpark

I have downloaded the dataset from  https://www.kaggle.com/leonbora/analytics-vidhya-loan-prediction

  1. creating a Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('lr_example').getOrCreate()

2.  Importing Linear Regression packages

from pyspark.ml.regression import LinearRegression

3.  We will read the input data and see its structure

data = spark.read.csv("train.csv",inferSchema=True,header=True)
data.printSchema()
#output looks similar to this
root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)

Machine Learning packages expect only numerical inputs and cannot accept strings. A lot of packages are present to help us transform our data to suit ML requirements. This, we will look at this aspect in another post. As of now, we will just consider two numerical fields from above data schema( ‘ApplicantIncome, CoapplicantIncome’) and we will try to predict ‘LoanAmount’ with these inputs.

4. All our input data needs to be in the form of Vectors, an input to ML packages.  So, lets get this sorted first:

from pyspark.ml.feature import (VectorAssembler, VectorIndexer)
#lets define what inputs will go into our vector and give a name for the output of it
lassembler = VectorAssembler(
    inputCols=['ApplicantIncome','CoapplicantIncome'],
    outputCol='features')

5. We will now transform our data to ML standard input

output = lassembler.transform(data)

If you have downloaded the same set, running the above command will output something similar to:

Error is pretty clear, while transforming data ML has encountered nulls in the data and is asking for a clarification of what needs to be done. We have two options, one, clean the data of null values and feed it back or tell ML packages to skip the null values. We will take the second route by adding an extra line in step 5

output = lassembler.setHandleInvalid("skip").transform(data)

Now, the code executes.  Let’s see what’s there in the “features” output

output.select("features").show()
#output looks similar to below
+-----------------+
|         features|
+-----------------+
|  [4583.0,1508.0]|
|     [3000.0,0.0]|
|  [2583.0,2358.0]|
|     [6000.0,0.0]|
|  [5417.0,4196.0]|
|  [2333.0,1516.0]|
|  [3036.0,2504.0]|
|  [4006.0,1526.0]|
|[12841.0,10968.0]|
|   [3200.0,700.0]|
|  [2500.0,1840.0]|
|  [3073.0,8106.0]|
|  [1853.0,2840.0]|
|  [1299.0,1086.0]|
|     [4950.0,0.0]|
|     [3596.0,0.0]|
|     [3510.0,0.0]|
|     [4887.0,0.0]|
|  [2600.0,3500.0]|
|     [7660.0,0.0]|
+-----------------+
only showing top 20 rows

6. Let’s now feed this input Vector and our prediction value(Y), which is ‘LoanAmount’

loan_data = output.select('features','LoanAmount')
loan_data.show()
#and Output is
+-----------------+----------+
|         features|LoanAmount|
+-----------------+----------+
|  [4583.0,1508.0]|       128|
|     [3000.0,0.0]|        66|
|  [2583.0,2358.0]|       120|
|     [6000.0,0.0]|       141|
|  [5417.0,4196.0]|       267|
|  [2333.0,1516.0]|        95|
|  [3036.0,2504.0]|       158|
|  [4006.0,1526.0]|       168|
|[12841.0,10968.0]|       349|
|   [3200.0,700.0]|        70|
|  [2500.0,1840.0]|       109|
|  [3073.0,8106.0]|       200|
|  [1853.0,2840.0]|       114|
|  [1299.0,1086.0]|        17|
|     [4950.0,0.0]|       125|
|     [3596.0,0.0]|       100|
|     [3510.0,0.0]|        76|
|     [4887.0,0.0]|       133|
|  [2600.0,3500.0]|       115|
|     [7660.0,0.0]|       104|
+-----------------+----------+
only showing top 20 rows

7. Standard practice is to divide the test data into 2 parts- train the ML model with the first and let it predict the values on second set, so we can crosscheck the performance.

#splitting train and test data into 70% and 30% of total data available
train_data,test_data = loan_data.randomSplit([0.7,0.3])

#finally, calling the linear regression package we imported
lr = LinearRegression(labelCol='LoanAmount')
#we are specifying our 'Y' by explicitly mentioned it with 'labelCol'

#fitting the model to train data set
lrModel = lr.fit(train_data)

When you run the above, you should get an output like below

The error says – “Params must be either a param map…”. Reason for this error is our dependent variable (LoanAmount) has null values and ML cannot fit a model which as null as output values.  There are lot of ways to clean this kind of data, we will not consider null values in our example.  Let’s filter out null ‘LoanAmount’ values when we read the data from csv itself like so:

data = spark.read.csv("train.csv",inferSchema=True,header=True)
#we will add a filter to remove the null values from our dependent variable
data = data.filter("LoanAmount is not NULL")
data.printSchema()

Repeat the steps above and the error will go away. So, our Linear regression Model is ready now.

8. Let’s check the residuals (residual = observed value(the value in our input) – predicted value (value predicted by the model) of Y).  Each data point will have one residual.  So, number of residuals will be equal to the number of records we fed as input.

test_results = lrModel.evaluate(test_data)
test_results.residuals.show()
#output looks similar to below
+--------------------+
|           residuals|
+--------------------+
|5.684341886080801...|
|-1.42108547152020...|
|-4.26325641456060...|
|-1.42108547152020...|
|-1.42108547152020...|
|-2.84217094304040...|
|-1.42108547152020...|
|-1.42108547152020...|
|-1.42108547152020...|
|-1.42108547152020...|
|                 0.0|
|-1.42108547152020...|
|-2.13162820728030...|
|-1.42108547152020...|
|-2.84217094304040...|
|                 0.0|
|2.842170943040400...|
|-1.42108547152020...|
|-1.42108547152020...|
|-3.55271367880050...|
+--------------------+
only showing top 20 rows

9. Voila, we can now use this model to predict output of our test data

predictions_data = test_data.select('features')
predictions = lrModel.transform(predictions_data)
predictions.show()
#output looks similar to below
+----------------+------------------+
|        features|        prediction|
+----------------+------------------+
|  [150.0,1800.0]| 102.2878796393209|
| [1299.0,1086.0]|104.99451369589991|
| [1378.0,1881.0]|113.43100472341747|
| [1500.0,1800.0]|113.66768427384854|
|[1600.0,20000.0]|292.40273753359924|
| [1782.0,2232.0]|120.26729293510716|
| [1800.0,1213.0]|110.45902065483665|
| [1820.0,1719.0]|115.57340183734365|
| [1820.0,1769.0]|116.06211641088294|
|[1836.0,33837.0]| 429.6389670546782|
|    [1880.0,0.0]| 99.27716389393049|
| [1907.0,2365.0]|122.62095931502981|
| [1926.0,1851.0]|117.75713371242067|
| [2031.0,1632.0]|116.50165979633736|
| [2130.0,6666.0]|166.53996206680586|
| [2132.0,1591.0]|116.95229182239609|
| [2137.0,8980.0]| 189.2166789246058|
|    [2221.0,0.0]|102.15161824976303|
|    [2237.0,0.0]|102.28649000839447|
| [2253.0,2033.0]|122.29249632713373|
+----------------+------------------+
only showing top 20 rows

That’s it folks.

Introduction to Filebeat

What is Filebeat?

Official documentation states that “Filebeat is a lightweight shipper for forwarding and centralizing log data. Installed as an agent on your servers, Filebeat monitors the log files or locations that you specify, collects log events and forwards them to either Elasticsearch or Logstash for indexing.

When you kickoff Filebeat, it starts one or more inputs that look in the locations you’ve specified for log data. For each log that Filebeat locates, Filebeat starts a harvester. Each harvester reads a single log for new content and sends the new log data to libbeat, which aggregates the events and sends the aggregated data to the output that you’ve configured for Filebeat. Here is a simple diagram to explain the working mechanism of Filebeat.”

image taken from https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html

What is a Harvester?

A harvester is responsible for reading the content of a single file. The harvester reads each file, line by line, and sends the content to the output. One harvester is started for each file. The harvester is responsible for opening and closing the file, which means that the file descriptor remains open while the harvester is running.

What is an Input?

An input is responsible for managing the harvesters and finding all sources to read from.

If the input type is log, the input finds all files on the drive that match the defined glob paths and starts a harvester for each file. Each input runs in its own Go routine.

You can read in detail from official documentation: https://www.elastic.co/guide/en/beats/filebeat/current/how-filebeat-works.html

Is Filebeat mandatory for analysing logs with Kibana-Elasticsearch?

Short answer – No. However, it is the recommended architecture to take load off Logstash and Elasticsearch in real time environment. We have already read log files using Logstash in an earlier post and analysed them using Kibana-Elasticsearch.

Let’s check the installation steps and go through a basic example of working with Filebeat-Kibana-Elasticsearch. Installing Filebeat is straight forward and single line, really.

$sudo apt-get update && sudo apt-get install filebeat

and Filebeat should install without fuss.

We had also discussed earlier that with few a plugins, we will be able to retrieve IP addresses of the users from the access logs. Let’s now install these plugins to unleash the potential of Kibana. These are Elasticsearch plugins and do not need filebeat for using them. We will use two of these plugins

We will need to install them with Elasticsearch, so lets get to the home directory of Elasticsearch. We also need to refresh Elasticsearch to pick the new plugins installed.  Filebeat has dashboards in kibana that we can install to unleash the power of Filebeat-kibana-Elasticsearch.

cd /usr/share/elasticsearch

$sudo /bin/elasticsearch-plugin install ingest-geoip

$sudo /bin/elasticsearch-plugin install ingest-user-agent

$sudo /bin/systemctl stop elasticsearch.service

$sudo /bin/systemctl start elasticsearch.service

$cd /usr/share/filebeat/bin

$sudo filebeat setup --dashboards

Reference output screens are shown below:

Filebeat installation already loads the module files for us. They are all in disabled state so that Filebeat won’t go about reading them during startup. We can enable the ones we want. Here, we are using the apache access log we downloaded in the earlier post. So, we will enable the apache.yml file and make necessary changes to read our access files.

$cd /etc/filebeat/modules.d
$ls (you will see .yml files for different modules in disabled state
$sudo mv apache2.yml.disabled apache2.yml (enabling yml for apache module, since our access logs are from apache)

Once enabled, we need to make few changes in the configuration file to read logs from home directory and log errors in the same directory. Of course, this is a configurable item and its upto you which directory to choose for logs and errors.

$sudo vi apache2.yml (press i to get into insert mode)

We need to uncomment  and set path for var.paths : [“/home/<your name>/apache*”]  in the “#Access logs” section. In our previous post we downloaded apache access log to our home directory and hence we are setting this path. Our file name was “apache_log” and hence we are giving “apache*” as the matching criteria for filename. Similarly, we will set var.paths : [“/home/<your name>/error*”]  in the “Error logs section” as shown (press Esc+wq! to save the changes to the file and exit)

And we will start filebeat as:

$sudo /bin/systemctl start filebeat.service

If you now access the Elasticsearch indices http://localhost:9200/_cat/indices , you will see Filebeat index added to the existing list of indices.

Let’s now log into Kibana and create index pattern to access the data we just read into Elasticsearch using Filebeat. Log into Kibana using  http://localhost:5601 . Navigate to Management->Index Patterns-> + Create Index Pattern. Type the name “filebeat” and Kibana will show the Indexes already present on the server, as below. 

Follow the next steps and create the index pattern. You will come to this page where all the fields present are displayed

There are a lot of fields here,  we can see geoip fields too have come in because of the geoip plugin we installed. Get familiar with the fields.  Lets now navigate to “Discover” page and choose the “filebeat” index pattern from the drop down. And you will see a lot of data coming in. In case, the screen remains blank, change the date from right corner page. These all have been discussed in detail in the last post. Please refer to it in case of any issue.

Click on the “geoip” field and click on Visualise. Let’s analyse our data based on location and let’s see what we get

We will see something like this

That’s cool. We get to see the number of requests coming from each region. We can choose country wise selection or IP wise and drill down the data for analysis.

While installing Filebeat, we also installed Kibana dashboards. Let’s see what the dashboard offers. From left side menu, choose “Dashboard” (this menu option got added after we installed dashboards from command line).

Initially when dashboard is blank, click on “create new dashboard” -> “add” and a lot of default options open up.

Here we choose “Unique IPs map [Filebeat Apache2]” and the output is as shown below. There are a lot of options to choose from and they help us visualize the data we have in our access logs.

We can even choose some of the existing ones:

I have chosen “[Filebeat Apache2] Access and error logs” and we can see something like this

Anything you point at in the screen will show data and help us to drill now. I pointed at “Browsers breakdown [Filebeat Apache2” data and it shows as:

That’s it for now. Play around and enjoy the powerful analytics features that Filebeat-Kibana offers.