Hadoop and Pig for Large-Scale Web Log Analysis

Hadoop and Pig for Large-Scale Web Log Analysis

The World Wide Web has an estimated 2 billion users and contains anywhere from 15 to 45 billion Web pages, with around 10 million pages added each day. With such large numbers, almost every website owner and developer who has a decent presence on the Internet faces a complex problem: how to make sense of their web pages and all the users who visit their websites.

Every Web server worth its salt logs the user activities for the websites it supports and the Web pages it serves up to the virtual world. These Web logs are mostly used for debugging issues or to get insight into the details, which are interesting from a business or performance point of view. Over time, the size of the logs keeps increasing until it becomes very difficult to manually extract any important information out of them, particularly for busy websites. The Hadoop framework does a good job at tackling this challenge in a timely, reliable, and cost-efficient manner.

This article explains the formidable task of Web log analysis using the Hadoop framework and Pig scripting language, which are well suited to handle large amounts of unstructured data. We propose a solution based on the Pig framework that aggregates data at an hourly, daily or yearly granularity. The proposed architecture features a data-collection and a database layer as an end-to-end solution, but in this article we focus on the analysis layer, which is implemented in the Pig Latin language.

Figure 1 shows an illustration of the layered architecture.

Analyzing Logs Generated by Web Servers

The challenge for the proposed solution is to analyze Web logs generated by Apache Web Server. Apache Web logs follow a standard pattern that is customizable. Their description and sample Web logs can be found easily on the Web. These logs contain information in various fields, such as timestamp, IP address, page visited, referrer, and browser, among others. Each row in the Web log corresponds to a visit or event on a Web page.

The size of Web logs can range anywhere from a few KB to hundreds of GB. We have to design solutions that based on different dimensions such as timestamp, browser and country can extract patterns and information out of these logs and provide us vital bits of information, such as the number of hits for a particular website or Web page, the number of unique users, and so on. Each potential problem can be divided into a particular use case and can then be solved. For our purpose here, we will take two use cases:

  • Find number of hits and number of unique users for Year dimension.
  • Find number of hits and number of unique users for Time, Country, and City dimensions.
  • These use cases will demonstrate the basic approach and steps required to solve these problems. Other problems can be tackled easily by making slight modifications to the implementations and/or approach used in solving the above-mentioned use cases.

    The technologies used are the Apache Hadoop framework, Apache Pig, the Java programming language, and regular expressions (regex). Although this article focuses on Apache Web logs, the approach is generic enough so that it can be implemented to logs generated by any other server or system.

    Hadoop Solution Architecture

    The proposed architecture is a layered architecture, and each layer has components. It scales according to the number of logs generated by your Web servers, enables you to harness the data to get key insights, and is based on an economical, scalable platform. You can continue adding and retaining data for longer periods to enrich your knowledge base.

    The Log Analysis Software Stack

    • Hadoop is an open source framework that allows users to process very large data in parallel. It’s based on the framework that supports Google search engine. The Hadoop core is mainly divided into two modules:
      1. HDFS is the Hadoop Distributed File System. It allows you to store large amounts of data using multiple commodity servers connected in a cluster.
      2. Map-Reduce (MR) is a framework for parallel processing of large data sets. The default implementation is bonded with HDFS.
    • The database can be a NoSQL database such as HBase. The advantage of a NoSQL database is that it provides scalability for the reporting module as well, as we can keep historical processed data for reporting purposes. HBase is an open source columnar DB or NoSQL DB, which uses HDFS. It can also use MR jobs to process data. It gives real-time, random read/write access to very large data sets — HBase can save very large tables having million of rows. It’s a distributed database and can also keep multiple versions of a single row.
    • The Pig framework is an open source platform for analyzing large data sets and is implemented as a layered language over the Hadoop Map-Reduce framework. It is built to ease the work of developers who write code in the Map-Reduce format, since code in Map-Reduce format needs to be written in Java. In contrast, Pig enables users to write code in a scripting language.
    • Flume is a distributed, reliable and available service for collecting, aggregating and moving a large amount of log data (src flume-wiki). It was built to push large logs into Hadoop-HDFS for further processing. It’s a data flow solution, where there is an originator and destination for each node and is divided into Agent and Collector tiers for collecting logs and pushing them to destination storage.

    Data Flow and Components

    • Content will be created by multiple Web servers and logged in local hard discs. This content will then be pushed to HDFS using FLUME framework. FLUME has agents running on Web servers; these are machines that collect data intermediately using collectors and finally push that data to HDFS.
    • Pig Scripts are scheduled to run using a job scheduler (could be cron or any sophisticated batch job solution). These scripts actually analyze the logs on various dimensions and extract the results. Results from Pig are by default inserted into HDFS, but we can use storage implementation for other repositories also such as HBase, MongoDB, etc. We have also tried the solution with HBase (please see the implementation section). Pig Scripts can either push this data to HDFS and then MR jobs will be required to read and push this data into HBase, or Pig scripts can push this data into HBase directly. In this article, we use scripts to push data onto HDFS, as we are showcasing the Pig framework applicability for log analysis at large scale.
    • The database HBase will have the data processed by Pig scripts ready for reporting and further slicing and dicing.
    • The data-access Web service is a REST-based service that eases the access and integrations with data clients. The client can be in any language to access REST-based API. These clients could be BI- or UI-based clients.

    Hadoop Solution Implementation

    For our Pig layer implementation, the Pig scripts push data in CSV format so that the results can be verified using shell scripts or Web UIs (that can render the CSV data in tables or graphs).

    Use Case 1: Find number of hits and number of unique users for Year dimension

    We use Pig scripts for sifting through the data and to extract useful information from the Web logs. We load the log file into Pig using the LOAD command.

    raw_logs = LOAD 'apacheLog.log' USING TextLoader AS (line:chararray);

    Using the above line, we load an ApacheLog.log text file as a chararray using Pig’s TextLoader facility and assign the whole records to a variable raw_logs. As discussed, Apache Web logs come in the form of records, with each row corresponding to one record. A sample record would be something like this: - Joe [21/Jul/2009:13:14:17 -0700] "GET / HTTP/1.1" 
    200 35942 "-" "IE/4.0 (compatible; MSIE 7.0; Windows NT 6.0;
    Trident/4.0; SLCC1; .NET CLR 2.0.50727; .NET CLR 3.5.21022; InfoPath.2;
    .NET CLR 3.5.30729; .NET CLR 3.0.30618; OfficeLiveConnector.1.3;
    OfficeLivePatch.1.3; MSOffice 12)"

    In this record, values are space separated, and each value corresponds to a specific column. The first task in analysis is to assign different fields in an individual record to a different variable in Pig. Pig works on data sets one row at a time, and assigns different columns (fields) in a row to different variables. For example, (IP Address field) can be assigned to a chararray variable, number 200 (status) can be assigned to an int variable, and so on. In order to assign different fields to different variables, we will have to first apply regex to the above mentioned line.

    logs_base = FOREACH raw_logs GENERATE FLATTEN (
    REGEX_EXTRACT_ALL(line,'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-
    ]\d{4})\] "(.+?)" (\S+) (\S+) "([^"]*)" "([^"]*)"') ) AS
    (remoteAddr: chararray, remoteLogname: chararray, user: chararray, time:
    chararray, request: chararray, status: int, bytes_string: chararray,
    referrer: chararray, browser: chararray);

    In the above line, we traverse each record in the apacheLog file, one record at a time, using the FOREACH statement. On each record, we use REGEX_EXTRACT_ALL to apply regex to the whole line and extract individual fields and then assign those individual fields to individual variables. For example, is assigned to remoteAddr, which is of type chararray; Joe is assigned to remoteLogname and so on. The benefit of assigning each field to a variable is that we can now work at an individual variable level without regards to the whole row as such. For example, the 'time' variable contains time in the format of dd/mmm/yyyy:hh:mm:ss. We would need to decompose this variable and take out individual components of date, month, year, and so on. This allows us to work at the finest level of granularity. We decompose this using functions built in to the Pig library. We can specify a function that we want to use by specifying an alias for it, and the package path of the function. For example:

    'dd/MMM/yyyy:HH:mm:ss Z','yyyy');

    The above line specifies the function DateExtractor to be used in our Pig script. In order to avoid mentioning the full name, we can define an alias (DATE_EXTRACT_YY). We can then extract years from the 'time' variable by running the following statement on each record.

    PiggyBank is the bank user defined functions (UDF). It has a lot of common usable functions for Pig scripts. We can also write our own UDF to implement something that is not available and solution specific.

    DATE_EXTRACT(time) as year

    It is clear to us that we do not need all the variables in the record. We need only year (year), IP Address (remoteAddr), and user (remoteLogname). So we extract these three variables for each record and assign them to a placeholder.

    logs =  FOREACH logs_base GENERATE remoteAddr,remoteLogname, year;

    Now we need to find out the number of hits and number of unique users based on year. We can achieve this in Pig by grouping all the records based on some variable or combination of variables. In our case, it would be datetime.

    group_year = GROUP logs BY (year);

    In this grouping, we need to find out the count of number of hits and number of unique users. In order to find out unique users, we use the DISTINCT command, which leaves out duplicate names. In order to find out the number of hits, we simply take count of the number of IP addresses in a given year using the COUNT utility in Pig. Putting it all together, we can find out the number of hits and number of unique users for each year using this statement.

    X = FOREACH group_year {
         unique_users = DISTINCT logs.remoteLogname;
         GENERATE FLATTEN(group), COUNT(unique_users) AS UniqueUsers,
    COUNT(logs) as counts;}

    This gives us the number of hits and the number of unique users for each year in a tabular format. We can then either dump the results on screen, or we can store the results for further processing. Below are the results for our use case. (Results are in the form of Year, Month, Unique Users, No. of Hits):

    This use case shows how the complex task of grouping and processing the data can be completed with the use of a few commands in Pig.

    Use Case 2: Find number of hits and number of unique users for Time, Country, and City dimensions

    In this use case, we include some more additional complexities. Earlier, we found out number of hits and unique users only for years. Now we want to find out for time, country, and city combined. This presents the complex challenge of how to group and sort data so that the results could be achieved. The starting steps would be the same in this case as well. We will load the apacheLog data, extract individual fields in each record using regex, and decompose time field into individual fields by year, month, day, hour. After that, we take out variables for our use (remoteAddr, user, year, month, day).

    year_month_separate = FOREACH logs GENERATE $0 as ipAddress, 
    DATE_EXTRACT_YY(datetime) as year, DATE_EXTRACT_MM(datetime) as month,
    DATE_EXTRACT_DD(datetime) as day, user as users;

    We have to do grouping based on multiple fields, which is not an easy task. But Pig makes it a piece of cake by allowing us to use the GROUP command on multiple variables separated by comma.

    group_fields = GROUP year_month_separate BY (ipAddress, year, month, day);

    The above statement groups records by IPAddress, year, month and day, so that our final records will display the number of users and hits for each day of each month and year and that too for each country and city (which we do by mapping each IP address to a corresponding city and country in a separate Pig file).

    X = FOREACH group_fields
         {     unique_users = DISTINCT year_month_separate.users;
              GENERATE FLATTEN(group), COUNT(unique_users) AS UniqueUsers,
    COUNT(year_month_separate) as counts;     }

    Once we are done grouping, we will get the count for hits and users using the same mechanism that we used in first use case.

    Now we have the results grouped by IPAddress and time. But we need results grouped by Country, City, and Time. So we need a way to map each IPAddress to a Country and City and then replace each IPAddress by the corresponding country and city. We have done IPAddress to country mapping in a simple tab-separated file, which we will load using TextLoader.

    ip_country = LOAD 'ip_country_mapping.dat'  USING PigStorage(',') as 

    We then join these two results by using the COGROUP statement. COGROUP in Pig joins records in two different data sets based on some common key. Here, the common key is IPAddress.

    join_ip_country_records = COGROUP concat_timestamp by ipaddress, ip_country by ip;

    Now all that remains is to retrieve the variables we need from 'join_ip_country_records' and display them. But there is a potential problem here. What would happen if your list of IP Addresses is not complete. In that case, the COGROUP statement in Pig will just leave all the records from the final data set where the IP values are not common in both files. This will result in incomplete data sets and a loss of information. So we use a rule that if the IP address matches to a country and city, then display that country and city. Otherwise, display some other user defined string, such as Unknown City. We can do this in Pig with the help of the ternary operator.

    ( (Condition) ? Expression1 : Expression2 )

    The ternary operator evaluates a condition, and if the condition is true, it evaluates expression1, otherwise it evaluates expression2. So in our case, if the IPAddress and corresponding city, country mapping is not there, we will simply replace the null value with the string 'Unknown City.' Otherwise, we will display the city and country that are specified in the mapping.

    display_bag = FOREACH join_ip_country_records GENERATE $0, $1, 
    ((IsEmpty($2)) ? {('unknown','unknown','unknown')} : $2);

    Then, we simply flatten the records and either display them or store them using the STORE command.

    final_records = FOREACH flattened_records GENERATE $1 AS TimeStamp, $5 AS 
    countryName, $6 AS cityName, $2 AS unique_user, $3 AS totat_hits;
    STORE final_records INTO 'countryhourlyrecords';

    The results for our use case are shown below:

    This enables us to do complex data processing on individual records and fields in an efficient and concise manner and to produce results with minimal code.

    When working with large data sets with many fields and dimensions, Pig proves to be a very helpful language, as it's a higher-level language over Map-Reduce language. It's like an Agile language for MR use cases and develops solutions in less time compared with MR jobs.


    This article attempts to show how large amounts of unstructured data can be easily analyzed and how important information can be extracted from the data using the Apache Hadoop framework and its related technologies. The approach described here can be used to effectively address data analysis challenges such as traffic log analysis, user consumption patterns, best-selling products, and so on. Information gleaned from the analysis can have important economic and societal value, and can help organizations and people to operate more efficiently. The limit to implementations is set only by human imagination.

    For now, Hadoop does a good job of pushing the limits on the size of data that can be analyzed and of extracting valuable information from seemingly arbitrary data. However, a lot of work still needs to be done to improve the latency time in Hadoop solutions so that Hadoop will be applicable to scenarios where almost real-time responses are required.


    Share the Post: