Friday, August 10, 2018

Reset password for postgres user and load sample database

In this post, I want talk about three things, first, how to create a password for a postgres user. Second, how to load a csv into a sample database and finally how to access this table using python/psycopg package.
Setup password for Postgres user:
In my previous post, I created a user krishna in Postgres but I haven't come across during installation a password setup step. Quick search in StackOverflow showed this approach
psql (10.4 (Ubuntu 10.4-0ubuntu0.18.04))
Type "help" for help.

krishna=# \password
Enter new password: 
Enter it again: 
Load csv into a Postgres database.
Found this example of postgres site, it has details of land sales in the UK, going back several decades, and is 3.5GB as of August 2016,click here for the file
-- Executing query:
CREATE TABLE land_registry_price_paid_uk(
  transaction uuid,
  price numeric,
  transfer_date date,
  postcode text,
  property_type char(1),
  newly_built boolean,
  duration char(1),
  paon text,
  saon text,
  street text,
  locality text,
  city text,
  district text,
  county text,
  ppd_category_type char(1),
  record_status char(1));
Query returned successfully with no result in 273 msec.

-- Executing query:
COPY land_registry_price_paid_uk FROM '/home/krishna/Downloads/pp_100k.csv' 
with (format csv, encoding 'win1252', header false, null '', quote '"', force_null (postcode, saon, paon, street, locality, city, district));
Query returned successfully: 100000 rows affected, 1.1 secs execution time.

Install psycopg2 pip install psycopg2
$ pip install psycopg2
Collecting psycopg2
  Downloading (2.7MB)
    100% |████████████████████████████████| 2.7MB 316kB/s 
Installing collected packages: psycopg2
Successfully installed psycopg2-2.7.5
Example call from python
import psycopg2
import sys
import pprint
def main():
	conn_string = "host='localhost' dbname='oflc' user='krishna' password='*****'"
	# print the connection string we will use to connect
	print "Connecting to database\n	->%s" % (conn_string)
	# get a connection, if a connect cannot be made an exception will be raised here
	conn = psycopg2.connect(conn_string)
	# conn.cursor will return a cursor object, you can use this cursor to perform queries
	cursor = conn.cursor()
	# execute our Query
	cursor.execute("select * from land_registry_price_paid_uk limit 10")
	# retrieve the records from the database
	records = cursor.fetchall()
	# print out the records using pretty print
	# note that the NAMES of the columns are not shown, instead just indexes.
	# for most people this isn't very useful so we'll show you how to return
	# columns as a dictionary (hash) in the next example.
if __name__ == "__main__":

Tuesday, July 31, 2018

Binary Tree in Python - Part 1

In this post I want to write a little bit about the implementation details of Binary Tree's in my favorite language Python. Before we jump in, conceptually what is a binary tree?  A binary tree is a data structure with a node and each node having at most two child items.
Example binary tree from wikipedia
But wait isn't this a tree?

A tree in real life has trunk, and roots typically in the ground, branches, stems and leaves but in comp science we look at an inverted tree structure, we start with the root and go down to the leaves as shown in the numeric example above. Every circle is called a Node and each Node can have left and right child items.

How to materialize a Node?

In Python, we materialize a Node by defining a class as shown below

class Node(object):
  def __init__(self, data):

Why should we define a class called Node?

Think of this node as our abstract or user defined data type, this is similar to int, float etc. By defining a Node we can leverage the left and right child items which are also of type Node. Remember, everything in Python is an object.  Take for example a variable assignment statement a=1, if you check the type of a we see its of type int, int itself is a class and every class has a constructor so when you create a variable b = int(), this automatically defaults to value zero.

print type(a)

print b

print c

Going back to our Node class, we can instantiate objects of type Node this way

print, root.left, root.right
# 5 None None
We can manually create a Binary tree structure by assigning left and right child items of root node with a variable of type Node
print, root.left, root.right
# 5 None None
# 4 None None 
# 3 None None
# 5 3 4
If we examine the type of root.left or root.right we see its an object of class '__main__.Node'. But why should we assign root.left=n3, why can't we say root.left=3? The latter assignment although assigns value three to root's left child its of type int and cannot attach any other Nodes to root.left. Also if you notice, and are same. Originally root.left was None, later we assigned root.left = n3 so root.left is of type Node and hence we can access the data attribute through the dot notation.
#3 3 
In part-2, I will cover how to automatically insert a new Node and other interesting methods and some common interview questions around binary tree

Saturday, July 28, 2018

Setup/Install Postgres 10 in Ubuntu

Just to make sure we have a clean install, check and purge any existing  postgres tools.

krishna@dev:~$ dpkg -l | grep postgres
ii  postgresql-client                          10+190                                     all          front-end programs for PostgreSQL (supported version)
ii  postgresql-client-10                       10.4-0ubuntu0.18.04                        amd64        front-end programs for PostgreSQL 10
ii  postgresql-client-common                   190                                        all          manager for multiple PostgreSQL client versions

Run purge

krishna@dev:~$ sudo apt-get --purge remove postgresql-client postgresql-client-10 postgresql-client-common
Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following packages will be REMOVED:
  postgresql-client* postgresql-client-10* postgresql-client-common*
0 upgraded, 0 newly installed, 3 to remove and 2 not upgraded.
After this operation, 3,423 kB disk space will be freed.
Do you want to continue? [Y/n] y
(Reading database ... 203926 files and directories currently installed.)
Removing postgresql-client (10+190) ...
Removing postgresql-client-10 (10.4-0ubuntu0.18.04) ...
Removing postgresql-client-common (190) ...
Processing triggers for man-db (2.8.3-2) ...
(Reading database ... 203671 files and directories currently installed.)
Purging configuration files for postgresql-client-common (190) ...

Install postgres10 using apt-get

krishna@dev:~$ sudo apt-get install postgresql-10
Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  postgresql-client-10 postgresql-client-common postgresql-common sysstat
Suggested packages:
  locales-all postgresql-doc-10 isag
The following NEW packages will be installed:
  postgresql-10 postgresql-client-10 postgresql-client-common postgresql-common sysstat
0 upgraded, 5 newly installed, 0 to remove and 2 not upgraded.
Need to get 4,204 kB/5,167 kB of archives.
After this operation, 20.3 MB of additional disk space will be used.
Do you want to continue? [Y/n] y
Get:1 bionic/main amd64 postgresql-common all 190 [157 kB]
Get:2 bionic-updates/main amd64 postgresql-10 amd64 10.4-0ubuntu0.18.04 [3,752 kB]
Get:3 bionic/main amd64 sysstat amd64 11.6.1-1 [295 kB]
Fetched 4,204 kB in 3s (1,497 kB/s)
Preconfiguring packages ...
Selecting previously unselected package postgresql-client-common.
(Reading database ... 203669 files and directories currently installed.)
Preparing to unpack .../postgresql-client-common_190_all.deb ...
Unpacking postgresql-client-common (190) ...
Selecting previously unselected package postgresql-client-10.
Preparing to unpack .../postgresql-client-10_10.4-0ubuntu0.18.04_amd64.deb ...
Unpacking postgresql-client-10 (10.4-0ubuntu0.18.04) ...
Selecting previously unselected package postgresql-common.
Preparing to unpack .../postgresql-common_190_all.deb ...
Adding 'diversion of /usr/bin/pg_config to /usr/bin/pg_config.libpq-dev by postgresql-common'
Unpacking postgresql-common (190) ...
Selecting previously unselected package postgresql-10.
Preparing to unpack .../postgresql-10_10.4-0ubuntu0.18.04_amd64.deb ...
Unpacking postgresql-10 (10.4-0ubuntu0.18.04) ...
Selecting previously unselected package sysstat.
Preparing to unpack .../sysstat_11.6.1-1_amd64.deb ...
Unpacking sysstat (11.6.1-1) ...
Setting up sysstat (11.6.1-1) ...

Creating config file /etc/default/sysstat with new version
update-alternatives: using /usr/bin/sar.sysstat to provide /usr/bin/sar (sar) in auto mode
Created symlink /etc/systemd/system/ → /lib/systemd/system/sysstat.service.
Processing triggers for ureadahead (0.100.0-20) ...
Setting up postgresql-client-common (190) ...
Processing triggers for systemd (237-3ubuntu10.3) ...
Setting up postgresql-common (190) ...
Adding user postgres to group ssl-cert

Creating config file /etc/postgresql-common/createcluster.conf with new version
Building PostgreSQL dictionaries from installed myspell/hunspell packages...
Removing obsolete dictionary files:
Created symlink /etc/systemd/system/ → /lib/systemd/system/postgresql.service.
Processing triggers for man-db (2.8.3-2) ...
Setting up postgresql-client-10 (10.4-0ubuntu0.18.04) ...
update-alternatives: using /usr/share/postgresql/10/man/man1/psql.1.gz to provide /usr/share/man/man1/psql.1.gz (psql.1.gz) in auto mode
Setting up postgresql-10 (10.4-0ubuntu0.18.04) ...
Creating new PostgreSQL cluster 10/main ...
/usr/lib/postgresql/10/bin/initdb -D /var/lib/postgresql/10/main --auth-local peer --auth-host md5
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.

The database cluster will be initialized with locale "en_US.UTF-8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".

Data page checksums are disabled.

fixing permissions on existing directory /var/lib/postgresql/10/main ... ok
creating subdirectories ... ok
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting dynamic shared memory implementation ... posix
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok

Success. You can now start the database server using:

    /usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile start

Ver Cluster Port Status Owner    Data directory              Log file
10  main    5432 down   postgres /var/lib/postgresql/10/main /var/log/postgresql/postgresql-10-main.log
update-alternatives: using /usr/share/postgresql/10/man/man1/postmaster.1.gz to provide /usr/share/man/man1/postmaster.1.gz (postmaster.1.gz) in auto mode
Processing triggers for systemd (237-3ubuntu10.3) ...
Processing triggers for ureadahead (0.100.0-20) ...

Postgres is installed at this point

Data file is stored here/var/lib/postgresql/10/main
Transaction logs are here: /var/log/postgresql/postgresql-10-main.log
Postgres application userid and group:
id postgres

uid=126(postgres) gid=133(postgres) groups=133(postgres),117(ssl-cert)
Postgres is listening on port: 5432
Database encoding is set to "UTF8" and cluster locale is "en_US.UTF-8"

Start script is 
/usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile start

For some reason, when I kick off the instance I kept getting this error 

postgres@dev:~$ /usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile start
pg_ctl: another server might be running; trying to start server anyway
waiting for server to start.... stopped waiting
pg_ctl: could not start server

Examine the log output.

Ended up restarting the server 

postgres@dev:~$ /usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile status
pg_ctl: no server running
postgres@dev:~$ /usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile restart
pg_ctl: PID file "/var/lib/postgresql/10/main/" does not exist
Is server running?
starting server anyway
waiting for server to start.... done
server started
postgres@dev:~$ /usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile status
pg_ctl: server is running (PID: 15574)
/usr/lib/postgresql/10/bin/postgres "-D" "/var/lib/postgresql/10/main" "-c" "config_file=/etc/postgresql/10/main/postgresql.conf"

let's create a role/user, so you are not using the application user id 

postgres@dev:~$ psql

psql (10.4 (Ubuntu 10.4-0ubuntu0.18.04))

Type "help" for help.

postgres=# CREATE DATABASE krishna OWNER krishna;
postgres=# \q
postgres@dev:~$ exit

Create database

krishna=# create database oflc;

List all available databases

krishna=# \l 
                                  List of databases
   Name    |  Owner   | Encoding |   Collate   |    Ctype    |   Access privileges   
 krishna   | krishna  | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 oflc      | krishna  | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 postgres  | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 template0 | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres          +
           |          |          |             |             | postgres=CTc/postgres
 template1 | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres          +
           |          |          |             |             | postgres=CTc/postgres
(5 rows)

Connect to oflc database and create test table 

krishna=# \c oflc
You are now connected to database "oflc" as user "krishna".

oflc=# create table test (c1 int);

oflc=# insert into test values (1);

oflc=# select * from test;
(1 row)

oflc=# drop table test;


Other useful commands

oflc=# \c
You are now connected to database "oflc" as user "krishna".
oflc=# \l
                                  List of databases
   Name    |  Owner   | Encoding |   Collate   |    Ctype    |   Access privileges   
 krishna   | krishna  | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 oflc      | krishna  | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 postgres  | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 template0 | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres          +
           |          |          |             |             | postgres=CTc/postgres
 template1 | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres          +
           |          |          |             |             | postgres=CTc/postgres
(5 rows)

oflc=# \conninfo
You are connected to database "oflc" as user "krishna" via socket in "/var/run/postgresql" at port "5432".
oflc=# select * from information_schema.tables where table_schema='public';
oflc=# \q

Finally, shut down server

postgres@dev:~$ /usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile stop
waiting for server to shut down.... done
server stopped

 To make life easier, wrap start|stop commands into a shell script 

#set -x

pg_start() {
sudo su - postgres -c '/usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile start'

sudo su - postgres -c '/usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile status' 

sudo su - postgres -c '/usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile restart'

sudo su - postgres -c '/usr/lib/postgresql/10/bin/pg_ctl -D /var/lib/postgresql/10/main -l logfile restart'

while getopts ":srtk" OPTION; do
echo ${OPTION}
    case ${OPTION} in
                echo "Usage: [-s | START] [-r | RESTART] [-t | STATUS] [-k | STOP]"
                exit 1

Sunday, June 18, 2017

How to read HBase table from Scala Spark

Step 1: Create a dummy table called customers in HBase, please refer this link on how to populate this table

hbase(main):004:0> scan '/user/user01/customer'
ROW                                  COLUMN+CELL                                                                                              
 amiller                             column=addr:state, timestamp=1497809527266, value=TX                                                     
 jsmith                              column=addr:city, timestamp=1497809526053, value=denver                                                  
 jsmith                              column=addr:state, timestamp=1497809526080, value=CO                                                     
 jsmith                              column=order:date, timestamp=1497809490021, value=10-18-2014                                             
 jsmith                              column=order:numb, timestamp=1497809526118, value=6666                                                   
 njones                              column=addr:city, timestamp=1497809525860, value=miami                                                   
 njones                              column=addr:state, timestamp=1497809526151, value=TX                                                     
 njones                              column=order:numb, timestamp=1497809525941, value=5555                                                   
 tsimmons                            column=addr:city, timestamp=1497809525998, value=dallas                                                  
 tsimmons                            column=addr:state, timestamp=1497809526023, value=TX                                                     
4 row(s) in 0.0310 seconds

Step 2:  Next is reading this table in Spark, I used spark-shell to read the table and keyValueRDD is what we are looking for 

[mapr@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/bin/spark-shell 
Spark context Web UI available at
Spark context available as 'sc' (master = local[*], app id = local-1497831718510).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-mapr-1703
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.


scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.rdd.NewHadoopRDD

scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

scala> import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HBaseAdmin

scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

scala> import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Path

scala> import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HColumnDescriptor

scala> import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Bytes

scala> import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Put

scala> import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTable



scala> val tableName="/user/user01/customer"
tableName: String = /user/user01/customer



scala> val hconf = HBaseConfiguration.create()
hconf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, org.apache.hadoop.conf.CoreDefaultProperties, core-site.xml, mapred-default.xml, org.apache.hadoop.mapreduce.conf.MapReduceDefaultProperties, mapred-site.xml, yarn-default.xml, org.apache.hadoop.yarn.conf.YarnDefaultProperties, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml


scala> hconf.set(TableInputFormat.INPUT_TABLE, tableName)




scala> val admin = new HBaseAdmin(hconf)
warning: there was one deprecation warning; re-run with -deprecation for details
admin: org.apache.hadoop.hbase.client.HBaseAdmin = org.apache.hadoop.hbase.client.HBaseAdmin@2093bb6c


scala> val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[], classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD: org.apache.spark.rdd.RDD[(, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[0] at newAPIHadoopRDD at :38

scala> val result = hBaseRDD.count()
result: Long = 4


scala> val resultRDD = => tuple._2) 
resultRDD: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> resultRDD
res1: org.apache.spark.rdd.RDD[org.apache.hadoop.hbase.client.Result] = MapPartitionsRDD[1] at map at :40

scala> val keyValueRDD = =>
     | (Bytes.toString(result.getRow()).split(" ")(0),
     | Bytes.toString(result.value)))
keyValueRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at :44

scala> keyValueRDD.collect()
res2: Array[(String, String)] = Array((amiller,TX), (jsmith,denver), (njones,miami), (tsimmons,dallas))

Friday, April 3, 2015

Bill Gates 2030 Vision

I recently watched this video, Verge interviewed Bill Gates about his vision for 2030,  this is the man who predicted every home will have a PC, which turned out to be true, what is Mr.Gates vision fifteen years from would be..

Four key areas for improvement health care, farming, banking and education.

Key take aways:
  • This man is serious about his goals/visions, each sector has very specific goals, its very hard to come up with goals/vision
  • Health
    • Upstream: inventing new vaccines specifically for kids less than five years
    • Downstream: How do you get them out to kids around the worls
    • Goal: Currently one out of twenty kids dies before age of 5, this should increase to one in forty 
  • Farming
    • Better seeds with resistance to heat & low water, which hints GMO stuff but at least educating farmers about the benefits
    • Improved credit & loan systems for farmers
    • Increase world food productivity 
  • Education 
    • Emphasis on online learning 
    • Improve critical programming skills
    • Basics of reading & writing 
    • Tablet computing should have a sandbox to test new code  
  • Banking
    • In my view banking vision is radical and I see it coming soon, for small transactions banks are loosing money
    • Banking digital infrastructure should be revised to create a utility type service that lets you move money to pay someone else or switch from one bank to another. This calls in for a new regulatory infrastructure where money transfer system is licensed like phone numbers with banks (switching bank accounts should be like switching phone services)  

Monday, December 8, 2014

Hadoop Definitive Guide Byte Sized Notes - 6

Chapter 7 MapReduce Types and Formats

General form of map & reduce 
  • map: (k1,v1) -> list(k2,v2)
  • reduce: (k2,list(v2)) ->list(k3,v3)
  • Keys emitted during the map phase must implement WritableComparable so that the keys can be sorted during the shuffle and sort phase.
  • Reduce input must have the same types as the map output
  • The number of map tasks are not set, the number of map tasks are equal to the number of splits that the input is turned 
Input formats 
  • Data is passed to the mapper by InputFormat, Input Format is a factory of RecordReader object to extract the (key, value) pairs from input source
  • Each mapper deals with single input split no matter the file size
  • Input Split is the chunk of the input processed by a single map, represented by InputSplit
  • InputSplits created by InputFormat, FileInputFormat default for file types 
  • Can process Text, XML, Binary 
  • Ensure key, value pair is processed
  • Ensure (k, v) not processed more than once
  • Handle (k,v) which get split
Output formats: Text output, binary output 

Chapter 8 MapReduce Features

  • Useful for QC, problem diagnostics
  • Two counter groups task & job
    • Task Counter
      • gather information about task level statistics over execution time
      • counts info like, input records, skipped records, output bytes etc
      • they are sent across the network
    • Job Counter
      • measure job level statistics
    • User defined counters 
      • Counters are defined by a Java enum
      • Counters are global
  • SortOrder controlled by RawComparator
  • Partial sort, secondary sort (Lab)

To run map-side join use CompositeInputFormat from the org.apache.hadoop.mapreduce.joinpackage

Side Data: Read only data required to process main data set, ex skip words for wordcount

Distributed Cache: To save network bandwidth, side data files are normally copied to any particular node once per job.

Map only Jobs: Image Processing, ETL, Select * from table, distcp, I/p data sampling 


Map/reduce job will contains most of the time more than  1 reducer.  So basically, when a mapper emits a key value pair, it has to be sent to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning (the key-value pairs space is partitioned among the reducers). A Partitioner is responsible to perform the partitioning.

For more info this article has detailed explanation

Zero reducers only map tasks
  • One reducer when the amount of data is small enough to be processed comfortably by one reducer
  • Used for reducing intermediate data
  • Runs locally on single mapper’s  o/p
  • Best suited for commutative and associative operations
  • Code identical to Reducer 
  • conf.setCombinerClass(Reducer.class);
  • May or may not run on the output from some or all mappers

Hadoop Definitive Guide Byte Sized Notes - 5

Chapter 6 Anatomy of a MapReduce Job Run

Frameworks used for execution is set by property

  • local -> local job runner
  • classic -> MR1
  • yarn -> MR2

MR1 - Classic

  • Client submits MR job
  • JT coordinates the job run
  • JT resides on master node
  • TT is actually responsible for instantiating map/reduce tasks

Map Task

  • May completely choose to ignore the input key
  • Map outputs zero or more K/V pairs
  • Typical mappers 
    • Convert to Upper Case
    • Explode Mapper 
    • Filter Mapper
    • Changing Keyspace

Reduce Task

  • All values associated with a particular intermediate key are guaranteed to go the same reducer
  • Intermediate keys and values are passed to reducer in sorted key order  
  • Typical Reducers
    • Sum Reducer
    • Identity Reducer 

Progress in MR1 means

  • Reading an input record (in a mapper or reducer)
  • Writing an output record (in a mapper or reducer)
  • Setting the status description on a reporter (using Reporter’s setStatus()method)
  • Incrementing a counter (using Reporter’s incrCounter()method)
  • Calling Reporter’s progress()method
Handling Job/Failures in MR1

Mainly three failure modes

  • Failure to run task
    • If task (map/reduce) produces run time error then TT sends signal to JT to notify failed task attempt, JT will retry for 4 times
    • The # of attempts is set by
    • JT will abort task and report failure 
    • A task attempt may also be killed, which is different from failing, during speculative execution task attempt is killed
    • Killed task attempts do not count against the number of attempts to run the task
  • Failure of task tracker
    • if TT is running slow or crashes, TT wont send heartbeat signal to JT
    • If JT did not receive heartbeat signal for 10 min then JT will respawn a diff TT to complete the failed TT map/reduce task
    • A TT is blacklisted if the number of faults is over some minimum threshold (the default is four)
  • Failure of JT
    • Restarting a jobtracker, any jobs that were running at the time it was stopped will need to be resubmitted.
Limitations of MR1

  • JT single point of failure
  • JT bottleneck; Hive, PIG, MR all get converted to MR jobs and are submitted to JT
  • JT does job scheduling & task progress, restarting failed nodes


  • YARN splits JT into two daemons Resource Manager RM & Application Master AM, Node Managers, yarn-site.xml
  • RM negotiates AM for cluster resources and allocates resources. Node Managers run on data nodes to ensure the application is using only the allocated resources
  • MR has dedicated AM which runs for duration of app

Failure in YARN

Mainly four failure modes

  • Failure to run task
    • hanging/failed tasks reported to AM 
  • Failure in Application Master
    • Apps in YARN are tried multiple times in the event of failure
    • AM sends periodic heartbeats to the RM, if RM detects failure, a new AM is started
    • The client polls the application master for progress reports
  • Failure in Node Manager
    • It will stop sending heartbeats to the resource manager
    • Node manager will be removed from the resource manager’s pool (default 10 min)
    • Node managers may be blacklisted if the number of failures for the application is high. Blacklisting is done by the application master. resource manager does not do blacklisting
  • Failure in Resource Manager
    • SPOF, may be recovered from saved state. The state consists of the node managers in the system as well as the running applications

Job Scheduling

  • Scheduling users jobs, FIFO queue based scheduler
  • Multi user -Fair & Capacity

Fair Scheduler

  • Every user has a fair share of the cluster
  • Supports preemption
  • Scheduler will kill tasks in pools running over capacity in order to give more slots to the pool running under capacity

Capacity Scheduler 
Simulate a separate MapReduce cluster with FIFO scheduling for each user or organization. Within each queue, jobs are scheduled using FIFO scheduling

Shuffle and Sort

Shuffle (Map): The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs in a sorted order is known as shuffle.

Map Side 

  • Map outputs are written to a circular memory buffer, a background thread sorts the keys in memory, when the buffer reaches 80% then a thread will spill the contents to disk.
  • Before the spill combiner can be run
  • Its better to compress the output of the map file
  • The output partitions are transferred over the HTTP to reducer

Reduce Side 

  • How do reducers know which machines to fetch map output from?
    • After map output is completed the TT notifies JT (AM)
    • JT(AM) keeps a log of host:map_output
    • Thread in reducer asks JT(AM) for host:map_output
    • Once we have host:map_output the copy phase begins by the reducer thread node, map outputs are copied over HTTP to reducer
    • Map outputs are deleted only when they are told to by JT(AM)

Sort (Reduce)

  • Merges the map outputs, maintaining their sort ordering
  • Ex: 50 map outputs and the merge factor is 10, then we have 5 intermediate files
  • Finally reduce function is invoked for each key in the sorted output.

Configuration Tuning

  • Avoiding multiple spills to disk during shuffle
  • On Reduce try to write intermediate data entirely in memory, See Table 6-1

Speculative Execution: Detect when a task is running slower than expected and launches another equivalent task as a backup. This is termed speculative execution of tasks.


  • I/P received through stdin
  • O/P written to stdout
  • All key values in reducer are iterators so user must write logic to keep track of key change