Madhav Ganesan
Posted on November 24, 2024
Hadoop is an open-source software framework designed to handle and process large volumes of data across distributed computing environments. It is designed to be scalable, fault-tolerant, and capable of handling vast amounts of data efficiently.
Cluster
It refers to a group of machines running HDFS and YARN.
Nodes
It refers to individual machines within the cluster.
Types of Nodes
Master Nodes (MN)
It handles coordination tasks and manage the cluster’s overall operations. They serve as the primary interfaces for users to interact with the Hadoop cluster.
Worker Nodes (WN)
It performs the actual data processing and storage operations based on instructions received from master nodes.
Key Components of Hadoop
Hadoop Distributed File System (HDFS):
It is the storage layer of Hadoop. It is a distributed file system that provides high-throughput access to data and is designed to store large files across multiple machines.
It breaks down large files into smaller blocks (typically 128 MB or 256 MB) and distributes these blocks across a cluster of machines. Each block is replicated multiple times to ensure fault tolerance. It provides high availability and reliability, as data is replicated across different nodes.
Components:
NameNode: (MN)
Manages the metadata and directory structure of the file system.
Process:
When a client wants to access a file, it first contacts the NameNode.
The NameNode provides a list of DataNodes that hold the data blocks of the file.
The client then communicates directly with these DataNodes to read or write the data blocks.
(Note: The NameNode acts only as a traffic cop, directing clients to the appropriate DataNodes. It does not store or transfer data itself)
DataNode: (WN)
Stores the actual data blocks of files.
Secondary NameNode: (MN)
Periodically merges the edits logs with the file system image to prevent the NameNode’s metadata from growing too large.
MapReduce
It is the processing layer of Hadoop. It is a programming model used for processing and generating large data sets with a parallel, distributed algorithm on a cluster. It allows for parallel processing of data across a cluster, which speeds up the data processing tasks.
Yet Another Resource Negotiator (YARN):
It is the resource management and job scheduling layer of Hadoop. It manages and schedules resources and job execution across the Hadoop cluster.
Components:
ResourceManager: (MN)
It is the master daemon responsible for managing the resources of the Hadoop cluster. It allocates system resources (CPU, memory, etc.) to various applications running on the cluster
NodeManager: (WN)
Each node in the Hadoop cluster runs a NodeManager daemon. It monitors the resource usage (CPU, disk, network, memory) on its node and reports this information back to the ResourceManager.
ApplicationMaster: (MN)
Manages the lifecycle of applications and coordinates with ResourceManager for resource allocation.
Resource Container
They are isolated units in which tasks are executed on the node.
It allocates resources such as CPU, memory, and disk space based on the application's requirements.
Process:
1) A client application submits a job and requests resources from the ResourceManager
2) It allocates resources based on availability and the job's requirements.
3) The ResourceManager assigns an ApplicationMaster for the job
4) The ApplicationMaster splits the job into multiple tasks and submits tasks to the ResourceManager
5) Each NodeManager on the cluster receives task assignments from the ResourceManager and each node in the cluster runs a NodeManager.
6) The NodeManager creates containers for running the tasks assigned by the ApplicationMaster
Working of YARN:
Job Submission: Jobs are submitted to the ResourceManager.
Resource Allocation: ResourceManager allocates resources and starts ApplicationMasters for each job.
Resource Requests: ApplicationMasters request containers from NodeManagers.
Task Execution: NodeManagers execute tasks in containers and report status.
Completion and Cleanup: ApplicationMasters notify completion, and resources are released.
Hadoop Ecosystem
In addition to these core components, the Hadoop ecosystem includes various tools and frameworks that enhance its capabilities:
HBase: A distributed, scalable, NoSQL database that runs on top of HDFS.
Hive: A data warehouse infrastructure that provides SQL-like querying capabilities on Hadoop.
Pig: A high-level platform for creating MapReduce programs using a scripting language called Pig Latin.
Sqoop: A tool for transferring data between Hadoop and relational databases.
Flume: A service for collecting, aggregating, and moving large amounts of log data into HDFS.
Oozie: A workflow scheduler system to manage Hadoop jobs.
Distributed system must meet the following requirements:
Fault tolerance
If a component fails, it should not result in the failure of the entire system. The system should gracefully degrade into a lower performing state. If a failed component recovers, it should be able to rejoin the system.
Recoverability
In the event of failure, no data should be lost.
Consistency
The failure of one job or task should not affect the final result.
Scalability
Adding load (more data, more computation) leads to a decline in performance, not failure; increasing resources should result in a proportional increase in capacity.
Commands for Hadoop
List Files:
hadoop fs -ls <path>
Create Directory:
hadoop fs -mkdir <path>
Copy From Local:
hadoop fs -copyFromLocal <local-file> <hdfs-path>
Remove Files:
hadoop fs -rm <path>
Read File:
hadoop fs -cat <file-path>
Move To Local:
hadoop fs -moveToLocal <hdfs-file> <local-path>
Get Merge:
hadoop fs -getmerge <source-path> <local-file>
Change Mode:
hadoop fs -chmod <permissions> <file-or-directory>
Change Group:
hadoop fs -chgrp <group> <file-or-directory>
Change Owner:
hadoop fs -chown <user>:<group> <file-or-directory>
Complete process
cat file_name.csv | ./mapper.py | sort | ./reducer.py
Mapreduce Components/Functions
1) Mapper
The mapper reads input data, processes it, and emits intermediate key-value pairs. This is the first stage of the MapReduce process.
2) Reducer
The reducer takes intermediate key-value pairs from the mapper, aggregates them based on the key, and outputs the final results.
3) Combiners
It performs partial aggregation of data on the map side before sending it to the reducers. This can reduce the amount of data shuffled across the network.
4) Partitioners
The partitioner determines how the intermediate key-value pairs are distributed among the reducers. It ensures that data is evenly distributed and processed efficiently by the reducers.
The default partitioner, HashPartitioner, uses the hash value of the key to determine the reducer. The hash value of the key is computed and then modulo the number of reducers is taken to assign the key-value pair to a specific reducer.
5) Job Chaining
Job chaining is a technique used in Hadoop MapReduce to handle complex workflows by linking multiple MapReduce jobs in sequence.
This approach is particularly useful when the desired computation cannot be accomplished in a single MapReduce job but can be broken down into several smaller tasks
Represents the flow of data between jobs in a directed, acyclic manner (DAG).
Ex. Pearson Correlation
First Job (Mean and Standard Deviation):
import sys
def mapper():
# Emit intermediate key-value pairs for mean and standard deviation
for line in sys.stdin:
# Process input line
# Emit key-value pairs for mean and standard deviation calculation
pass
if __name__ == "__main__":
mapper()
############################################
import sys
def reducer():
# Aggregate results to compute mean and standard deviation
pass
if __name__ == "__main__":
reducer()
Second Job (Covariance and Correlation):
import sys
def mapper():
# Take mean and std deviation from the first job and compute covariance
pass
if __name__ == "__main__":
mapper()
########################################
import sys
def reducer():
# Compute Pearson correlation coefficient
pass
if __name__ == "__main__":
reducer()
# First Job
hadoop jar /path/to/hadoop-streaming.jar \
-input /path/to/input \
-output /path/to/intermediate_output \
-mapper mean_std_mapper.py \
-reducer mean_std_reducer.py
# Second Job
hadoop jar /path/to/hadoop-streaming.jar \
-input /path/to/intermediate_output \
-output /path/to/final_output \
-mapper cov_corr_mapper.py \
-reducer cov_corr_reducer.py
Examples for mapper and reducer:
1) Word Counting:
mapper.py
#!/usr/bin/env python
import sys
def mapper():
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print(f"{word}\t1")
reducer.py
#!/usr/bin/env python
import sys
from collections import defaultdict
def reducer():
word_count = defaultdict(int)
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t')
word_count[word] += int(count)
for word, count in word_count.items():
print(f"{word}\t{count}")
2) Shared Friendships
mapper.py
import sys
def mapper():
for line in sys.stdin:
line = line.strip()
if ':' in line:
user, friends_str = line.split(':', 1)
friends = friends_str.split(',')
friends = [friend.strip() for friend in friends]
# Emit all pairs of friends
for i in range(len(friends)):
for j in range(i + 1, len(friends)):
friend_pair = tuple(sorted([friends[i], friends[j]]))
print(f"{friend_pair}\t1")
if __name__ == "__main__":
mapper()
reducer.py
import sys
from collections import defaultdict
def reducer():
pair_counts = defaultdict(int)
for line in sys.stdin:
line = line.strip()
if '\t' in line:
pair, count = line.split('\t', 1)
count = int(count)
pair_counts[pair] += count
# Output pairs and their counts
for pair, count in pair_counts.items():
print(f"{pair}\t{count}")
if __name__ == "__main__":
reducer()
3) Counting bigrams
mapper.py
import sys
import string
# Define a set of stopwords (this can be expanded as needed)
STOPWORDS = set([
"a", "an", "the", "this", "that", "my", "his", "her", "they", "on", "for", "with", "in", "at"
])
def clean_token(token):
"""Normalize token by lowercasing and removing punctuation."""
return token.lower().strip(string.punctuation)
def mapper():
previous_word = None
for line in sys.stdin:
line = line.strip()
words = line.split()
# Normalize and filter stopwords
words = [clean_token(word) for word in words if clean_token(word) not in STOPWORDS]
# Generate bigrams and output them
for i in range(len(words) - 1):
bigram = (words[i], words[i + 1])
sys.stdout.write(f"{bigram[0]}\t{bigram[1]}\n")
if __name__ == "__main__":
mapper()
reducer.py
import sys
from collections import defaultdict
def reducer():
bigram_counts = defaultdict(int)
for line in sys.stdin:
line = line.strip()
if not line:
continue
word1, word2 = line.split('\t')
# Increment the count of the bigram
bigram_counts[(word1, word2)] += 1
# Output all bigram counts
for bigram, count in bigram_counts.items():
sys.stdout.write(f"{bigram[0]}\t{bigram[1]}\t{count}\n")
if __name__ == "__main__":
reducer()
Hadoop Streaming file
Java is the primary language used in Hadoop. The core components of Hadoop, including the Hadoop Distributed File System (HDFS) and the MapReduce framework, are written in Java.
The Hadoop Streaming JAR file is a key component in Hadoop’s MapReduce ecosystem that facilitates the use of non-Java languages for writing MapReduce jobs.
It is a utility that allows you to create and run MapReduce jobs using any executable programs or scripts, such as those written in Python, Perl, Ruby, or even shell scripts.
It facilitates the process of converting the standard input/output streams used by these programs into the format expected by Hadoop.
The Streaming JAR file manages the streaming of data between Hadoop and the user-provided scripts. It handles reading from HDFS, feeding the data to the mapper scripts via stdin, collecting the output from the mapper scripts via stdout, shuffling and sorting the intermediate data, and then passing it to the reducer scripts.
hadoop jar /path/to/hadoop-streaming.jar \
-input /path/to/input \
-output /path/to/output \
-mapper /path/to/mapper.py \
-reducer /path/to/reducer.py
Debugging
In Hadoop Streaming, leveraging standard error (stderr) to communicate with the Hadoop framework is an advanced technique that provides enhanced monitoring and debugging capabilities.
Messages printed to stderr are captured by Hadoop and can be viewed in the logs of the Hadoop JobTracker or ResourceManager
import sys
def mapper():
# Initialize a counter for progress updates
line_count = 0
progress_interval = 1000 # Update progress every 1000 lines (adjust as needed)
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print(f"{word}\t1")
# Increment the line counter
line_count += 1
# Print progress updates to stderr
if line_count % progress_interval == 0:
sys.stderr.write(f"reporter:status:Processed {line_count} lines...\n")
# Optionally, you can also add custom counters or additional logging
# Example: Increment a custom counter
sys.stderr.write(f"reporter:counter:MapperCounters,LinesProcessed,1\n")
if __name__ == "__main__":
mapper()
Data stream
It refers to a continuous flow of data elements that are generated and transmitted in real-time. These streams are typically large-scale, high-speed sequences of data that need to be processed and analyzed as they arrive.
Characteristics of data stream
Continuous and Unbounded
Sequential Access not randomly
Limited Time to Process Each Element
Real-Time Processing
High Volume and Speed
Dynamic and Variable content handled effectively
Types of Queries on Data Streams
1) Standard Queries
Queries that run continuously always, monitoring the stream in real-time and producing outputs as new data arrives.
Ex.
Alert whenever the temperature exceed a certain value
Produce the average of last 24 reading when new data arrives
2) Ad-hoc Queries
Queries that are executed on demand to answer specific questions about the data stream at a particular point in time.
The query is run only once to get a snapshot or specific piece of information.
Ex.
Construct a random sample
Examples
Network Traffic
Sensor Data
Social Media Feeds
Financial Transactions
Management of Data Streams
Bloom Filter
It is a space-efficient probabilistic data structure that is used to test whether an element is a member of a set
They can produce false positives but never false negatives.
They do not support deletion due to their probabilistic nature.
The filter’s accuracy depends on the size of the bit array and the number of hash functions used.
Example
Website handles thousands of sign-ups per minute. Storing all user IDs in a database and checking each new sign-up against this database can become slow and resource-intensive.
Steps
1) Initialization
An array of n bits, all initialized to 0.
A set of k hash functions. Each hash function maps a key (e.g., a URL) to an index in the bit array
2) Hashing
For each URL, it is hashed using hash function and the corresponding indexes are set in the array
3) Checking
If for other URL, the indexes are set already, it denotes that the URL has been taken already. Else it is set in the array
(Note: Using more hash functions reduces the false positive rate but increases the complexity and may lead to more bits being set, impacting the Bloom Filter's efficiency. Balancing the number of hash functions with the desired false positive rate and available memory is crucial for optimizing the Bloom Filter's performance)
Stay Connected!
If you enjoyed this post, don’t forget to follow me on social media for more updates and insights:
Twitter: madhavganesan
Instagram: madhavganesan
LinkedIn: madhavganesan
Posted on November 24, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.