Introduction to Hadoop:)

madgan95

Madhav Ganesan

Posted on November 24, 2024

Introduction to Hadoop:)

Hadoop is an open-source software framework designed to handle and process large volumes of data across distributed computing environments. It is designed to be scalable, fault-tolerant, and capable of handling vast amounts of data efficiently.

Cluster

It refers to a group of machines running HDFS and YARN.

Nodes

It refers to individual machines within the cluster.

Types of Nodes

Master Nodes (MN)

It handles coordination tasks and manage the cluster’s overall operations. They serve as the primary interfaces for users to interact with the Hadoop cluster.

Worker Nodes (WN)

It performs the actual data processing and storage operations based on instructions received from master nodes.

Image description

Key Components of Hadoop

Hadoop Distributed File System (HDFS):

It is the storage layer of Hadoop. It is a distributed file system that provides high-throughput access to data and is designed to store large files across multiple machines.
It breaks down large files into smaller blocks (typically 128 MB or 256 MB) and distributes these blocks across a cluster of machines. Each block is replicated multiple times to ensure fault tolerance. It provides high availability and reliability, as data is replicated across different nodes.

Components:

Image description

NameNode: (MN)
Manages the metadata and directory structure of the file system.

Process:

When a client wants to access a file, it first contacts the NameNode.
The NameNode provides a list of DataNodes that hold the data blocks of the file.
The client then communicates directly with these DataNodes to read or write the data blocks.
(Note: The NameNode acts only as a traffic cop, directing clients to the appropriate DataNodes. It does not store or transfer data itself)

DataNode: (WN)
Stores the actual data blocks of files.

Secondary NameNode: (MN)
Periodically merges the edits logs with the file system image to prevent the NameNode’s metadata from growing too large. 

MapReduce

It is the processing layer of Hadoop. It is a programming model used for processing and generating large data sets with a parallel, distributed algorithm on a cluster. It allows for parallel processing of data across a cluster, which speeds up the data processing tasks.

Yet Another Resource Negotiator (YARN):

It is the resource management and job scheduling layer of Hadoop. It manages and schedules resources and job execution across the Hadoop cluster.

Components:
ResourceManager: (MN)
It is the master daemon responsible for managing the resources of the Hadoop cluster. It allocates system resources (CPU, memory, etc.) to various applications running on the cluster

NodeManager: (WN)
Each node in the Hadoop cluster runs a NodeManager daemon. It monitors the resource usage (CPU, disk, network, memory) on its node and reports this information back to the ResourceManager.

ApplicationMaster: (MN)
Manages the lifecycle of applications and coordinates with ResourceManager for resource allocation.

Resource Container
They are isolated units in which tasks are executed on the node.
It allocates resources such as CPU, memory, and disk space based on the application's requirements.

Process:
1) A client application submits a job and requests resources from the ResourceManager
2) It allocates resources based on availability and the job's requirements.
3) The ResourceManager assigns an ApplicationMaster for the job
4) The ApplicationMaster splits the job into multiple tasks and submits tasks to the ResourceManager
5) Each NodeManager on the cluster receives task assignments from the ResourceManager and each node in the cluster runs a NodeManager.
6) The NodeManager creates containers for running the tasks assigned by the ApplicationMaster

Image description

Working of YARN:

Job Submission: Jobs are submitted to the ResourceManager.
Resource Allocation: ResourceManager allocates resources and starts ApplicationMasters for each job.
Resource Requests: ApplicationMasters request containers from NodeManagers.
Task Execution: NodeManagers execute tasks in containers and report status.
Completion and Cleanup: ApplicationMasters notify completion, and resources are released.

Hadoop Ecosystem

In addition to these core components, the Hadoop ecosystem includes various tools and frameworks that enhance its capabilities:

HBase: A distributed, scalable, NoSQL database that runs on top of HDFS.
Hive: A data warehouse infrastructure that provides SQL-like querying capabilities on Hadoop.
Pig: A high-level platform for creating MapReduce programs using a scripting language called Pig Latin.
Sqoop: A tool for transferring data between Hadoop and relational databases.
Flume: A service for collecting, aggregating, and moving large amounts of log data into HDFS.
Oozie: A workflow scheduler system to manage Hadoop jobs.

Distributed system must meet the following requirements:

Fault tolerance

If a component fails, it should not result in the failure of the entire system. The system should gracefully degrade into a lower performing state. If a failed component recovers, it should be able to rejoin the system.

Recoverability

In the event of failure, no data should be lost.

Consistency

The failure of one job or task should not affect the final result.

Scalability

Adding load (more data, more computation) leads to a decline in performance, not failure; increasing resources should result in a proportional increase in capacity.

Commands for Hadoop

List Files:

hadoop fs -ls <path>
Enter fullscreen mode Exit fullscreen mode

Create Directory:

hadoop fs -mkdir <path>
Enter fullscreen mode Exit fullscreen mode

Copy From Local:

hadoop fs -copyFromLocal <local-file> <hdfs-path>
Enter fullscreen mode Exit fullscreen mode

Remove Files:

hadoop fs -rm <path>
Enter fullscreen mode Exit fullscreen mode

Read File:

hadoop fs -cat <file-path>
Enter fullscreen mode Exit fullscreen mode

Move To Local:

hadoop fs -moveToLocal <hdfs-file> <local-path>
Enter fullscreen mode Exit fullscreen mode

Get Merge:

hadoop fs -getmerge <source-path> <local-file>
Enter fullscreen mode Exit fullscreen mode

Change Mode:

hadoop fs -chmod <permissions> <file-or-directory>
Enter fullscreen mode Exit fullscreen mode

Change Group:

hadoop fs -chgrp <group> <file-or-directory>
Enter fullscreen mode Exit fullscreen mode

Change Owner:

hadoop fs -chown <user>:<group> <file-or-directory>
Enter fullscreen mode Exit fullscreen mode

Complete process

cat file_name.csv | ./mapper.py | sort | ./reducer.py
Enter fullscreen mode Exit fullscreen mode

Mapreduce Components/Functions

1) Mapper
The mapper reads input data, processes it, and emits intermediate key-value pairs. This is the first stage of the MapReduce process.

2) Reducer
The reducer takes intermediate key-value pairs from the mapper, aggregates them based on the key, and outputs the final results.

3) Combiners
It performs partial aggregation of data on the map side before sending it to the reducers. This can reduce the amount of data shuffled across the network.

4) Partitioners
The partitioner determines how the intermediate key-value pairs are distributed among the reducers. It ensures that data is evenly distributed and processed efficiently by the reducers.

The default partitioner, HashPartitioner, uses the hash value of the key to determine the reducer. The hash value of the key is computed and then modulo the number of reducers is taken to assign the key-value pair to a specific reducer.

5) Job Chaining
Job chaining is a technique used in Hadoop MapReduce to handle complex workflows by linking multiple MapReduce jobs in sequence.

This approach is particularly useful when the desired computation cannot be accomplished in a single MapReduce job but can be broken down into several smaller tasks

Represents the flow of data between jobs in a directed, acyclic manner (DAG).

Image description

Ex. Pearson Correlation

First Job (Mean and Standard Deviation):

import sys

def mapper():
    # Emit intermediate key-value pairs for mean and standard deviation
    for line in sys.stdin:
        # Process input line
        # Emit key-value pairs for mean and standard deviation calculation
        pass

if __name__ == "__main__":
    mapper()

############################################
import sys

def reducer():
    # Aggregate results to compute mean and standard deviation
    pass

if __name__ == "__main__":
    reducer()
Enter fullscreen mode Exit fullscreen mode

Second Job (Covariance and Correlation):

import sys

def mapper():
    # Take mean and std deviation from the first job and compute covariance
    pass

if __name__ == "__main__":
    mapper()

########################################
import sys

def reducer():
    # Compute Pearson correlation coefficient
    pass

if __name__ == "__main__":
    reducer()
Enter fullscreen mode Exit fullscreen mode
# First Job
hadoop jar /path/to/hadoop-streaming.jar \
  -input /path/to/input \
  -output /path/to/intermediate_output \
  -mapper mean_std_mapper.py \
  -reducer mean_std_reducer.py

# Second Job
hadoop jar /path/to/hadoop-streaming.jar \
  -input /path/to/intermediate_output \
  -output /path/to/final_output \
  -mapper cov_corr_mapper.py \
  -reducer cov_corr_reducer.py
Enter fullscreen mode Exit fullscreen mode

Examples for mapper and reducer:

1) Word Counting:

mapper.py

#!/usr/bin/env python 
import sys

def mapper():
    for line in sys.stdin:
        line = line.strip()
        words = line.split()
        for word in words:
            print(f"{word}\t1")
Enter fullscreen mode Exit fullscreen mode

reducer.py

#!/usr/bin/env python
import sys
from collections import defaultdict

def reducer():
    word_count = defaultdict(int)
    for line in sys.stdin:
        line = line.strip()
        word, count = line.split('\t')
        word_count[word] += int(count)

    for word, count in word_count.items():
        print(f"{word}\t{count}") 

Enter fullscreen mode Exit fullscreen mode

2) Shared Friendships

mapper.py

import sys

def mapper():
    for line in sys.stdin:
        line = line.strip()
        if ':' in line:
            user, friends_str = line.split(':', 1)
            friends = friends_str.split(',')
            friends = [friend.strip() for friend in friends]

            # Emit all pairs of friends
            for i in range(len(friends)):
                for j in range(i + 1, len(friends)):
                    friend_pair = tuple(sorted([friends[i], friends[j]]))
                    print(f"{friend_pair}\t1")

if __name__ == "__main__":
    mapper()
Enter fullscreen mode Exit fullscreen mode

reducer.py

import sys
from collections import defaultdict

def reducer():
    pair_counts = defaultdict(int)

    for line in sys.stdin:
        line = line.strip()
        if '\t' in line:
            pair, count = line.split('\t', 1)
            count = int(count)
            pair_counts[pair] += count

    # Output pairs and their counts
    for pair, count in pair_counts.items():
        print(f"{pair}\t{count}")

if __name__ == "__main__":
    reducer()
Enter fullscreen mode Exit fullscreen mode

3) Counting bigrams
mapper.py

import sys
import string

# Define a set of stopwords (this can be expanded as needed)
STOPWORDS = set([
    "a", "an", "the", "this", "that", "my", "his", "her", "they", "on", "for", "with", "in", "at"
])

def clean_token(token):
    """Normalize token by lowercasing and removing punctuation."""
    return token.lower().strip(string.punctuation)

def mapper():
    previous_word = None

    for line in sys.stdin:
        line = line.strip()
        words = line.split()

        # Normalize and filter stopwords
        words = [clean_token(word) for word in words if clean_token(word) not in STOPWORDS]

        # Generate bigrams and output them
        for i in range(len(words) - 1):
            bigram = (words[i], words[i + 1])
            sys.stdout.write(f"{bigram[0]}\t{bigram[1]}\n")

if __name__ == "__main__":
    mapper()
Enter fullscreen mode Exit fullscreen mode

reducer.py

import sys
from collections import defaultdict

def reducer():
    bigram_counts = defaultdict(int)

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue

        word1, word2 = line.split('\t')

        # Increment the count of the bigram
        bigram_counts[(word1, word2)] += 1

    # Output all bigram counts
    for bigram, count in bigram_counts.items():
        sys.stdout.write(f"{bigram[0]}\t{bigram[1]}\t{count}\n")

if __name__ == "__main__":
    reducer()
Enter fullscreen mode Exit fullscreen mode

Hadoop Streaming file

Java is the primary language used in Hadoop. The core components of Hadoop, including the Hadoop Distributed File System (HDFS) and the MapReduce framework, are written in Java.

The Hadoop Streaming JAR file is a key component in Hadoop’s MapReduce ecosystem that facilitates the use of non-Java languages for writing MapReduce jobs.

It is a utility that allows you to create and run MapReduce jobs using any executable programs or scripts, such as those written in Python, Perl, Ruby, or even shell scripts.

It facilitates the process of converting the standard input/output streams used by these programs into the format expected by Hadoop.

The Streaming JAR file manages the streaming of data between Hadoop and the user-provided scripts. It handles reading from HDFS, feeding the data to the mapper scripts via stdin, collecting the output from the mapper scripts via stdout, shuffling and sorting the intermediate data, and then passing it to the reducer scripts.

hadoop jar /path/to/hadoop-streaming.jar \
    -input /path/to/input \
    -output /path/to/output \
    -mapper /path/to/mapper.py \
    -reducer /path/to/reducer.py

Enter fullscreen mode Exit fullscreen mode

Debugging

In Hadoop Streaming, leveraging standard error (stderr) to communicate with the Hadoop framework is an advanced technique that provides enhanced monitoring and debugging capabilities.

Messages printed to stderr are captured by Hadoop and can be viewed in the logs of the Hadoop JobTracker or ResourceManager

import sys

def mapper():
    # Initialize a counter for progress updates
    line_count = 0
    progress_interval = 1000  # Update progress every 1000 lines (adjust as needed)

    for line in sys.stdin:
        line = line.strip()
        words = line.split()
        for word in words:
            print(f"{word}\t1")

        # Increment the line counter
        line_count += 1

        # Print progress updates to stderr
        if line_count % progress_interval == 0:
            sys.stderr.write(f"reporter:status:Processed {line_count} lines...\n")

        # Optionally, you can also add custom counters or additional logging
        # Example: Increment a custom counter
        sys.stderr.write(f"reporter:counter:MapperCounters,LinesProcessed,1\n")

if __name__ == "__main__":
    mapper()
Enter fullscreen mode Exit fullscreen mode

Data stream

It refers to a continuous flow of data elements that are generated and transmitted in real-time. These streams are typically large-scale, high-speed sequences of data that need to be processed and analyzed as they arrive.

Characteristics of data stream

Continuous and Unbounded
Sequential Access not randomly
Limited Time to Process Each Element
Real-Time Processing
High Volume and Speed
Dynamic and Variable content handled effectively

Types of Queries on Data Streams

1) Standard Queries
Queries that run continuously always, monitoring the stream in real-time and producing outputs as new data arrives.
Ex.
Alert whenever the temperature exceed a certain value
Produce the average of last 24 reading when new data arrives

2) Ad-hoc Queries
Queries that are executed on demand to answer specific questions about the data stream at a particular point in time.
The query is run only once to get a snapshot or specific piece of information.
Ex.
Construct a random sample

Examples

Network Traffic
Sensor Data
Social Media Feeds
Financial Transactions

Management of Data Streams

Bloom Filter
It is a space-efficient probabilistic data structure that is used to test whether an element is a member of a set

They can produce false positives but never false negatives.
They do not support deletion due to their probabilistic nature.
The filter’s accuracy depends on the size of the bit array and the number of hash functions used.

Example
Website handles thousands of sign-ups per minute. Storing all user IDs in a database and checking each new sign-up against this database can become slow and resource-intensive.

Steps
1) Initialization
An array of n bits, all initialized to 0.
A set of k hash functions. Each hash function maps a key (e.g., a URL) to an index in the bit array

2) Hashing
For each URL, it is hashed using hash function and the corresponding indexes are set in the array

3) Checking
If for other URL, the indexes are set already, it denotes that the URL has been taken already. Else it is set in the array

(Note: Using more hash functions reduces the false positive rate but increases the complexity and may lead to more bits being set, impacting the Bloom Filter's efficiency. Balancing the number of hash functions with the desired false positive rate and available memory is crucial for optimizing the Bloom Filter's performance)

Stay Connected!
If you enjoyed this post, don’t forget to follow me on social media for more updates and insights:

Twitter: madhavganesan
Instagram: madhavganesan
LinkedIn: madhavganesan

💖 💪 🙅 🚩
madgan95
Madhav Ganesan

Posted on November 24, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Introduction to Hadoop:)
hadoop Introduction to Hadoop:)

November 24, 2024