Friday, January 11, 2019

Hadoop quick look

Comparison with other systems :

RDBMS : Structured data, better for write/query of speific rows, can't scale linearly, as horizontal scaling not easy, maybe due to ACID constraints.

MapReduce : Better for batch processing of entire contents. Can be unstructured data. Can scale linearly. Can't work iteratively on changed data, i.e starts from scratch every time, but spark can do this.

HDD seek times are not growing as much as transfer speeds, so reading large data set from seeks performs
poorer than full scan, similar to indexes in RDBMS

SANs provide block-based n/w access to storage for servers. Earlier cloud computing used Message Passing Interfaces and SANS to distribute tasks to
    nodes, but when reading large amounts of data for processing, the SAN becomes a b/w bottleneck.

Hadoop shines here, as it co-locates data and processing on nodes.
*** This feature, known as data locality, is at the heart of data processing in Hadoop and is the reason for its good performance.
Also, Hadoop manages execution of the mapreduce jobs, leaving only business-logic to the programmer.
By contrast, MPI programs have to explicitly manage their own communication, checkpointing and recovery,
which gives more control to the programmer but makes them more difficult to write

HDFS architecture:

HDFS has a master or name-node and multiple slave or data-nodes. The data-nodes store the actual data as bocks.
The namenode has the hdfs file-system tree and metadata for files and dirs on it. This includes a list of data-nodes for each entry.
Usually, the namenode writes are also copied to another NFS mounted location as backup. Also, a secondary name-node helps in merging
edit log entries into the main entries for the name node. Since file-system entries can be too huge for a single namenode to handle,
a Federation facility provides multiple name-nodes, each managing a portion of the name-space, e.g. /user. A High Availability configuration
is also available, which allows a pair of name-nodes in active/standy mode.
There are command line tools as well apis to access hdfs.

YARN architecture

YARN comprises of a single cluster level process called the resource manager to use cluster resources,
and nodemanagers running on individual nodes, to launch and monitor containers on the node.
A container executes an application specific process with a constrained set of resources( memory, cpu etc)
A client contacts the resource manager, sending it the binaries to run an application master process.
Hadoop has its own application master to run map-reduce jobs, spark has its own to run DAGs etc.
YARN does not itself provide any means of communication betw client, master, process. Its done by the application.
YARN can use different types of schedulers : FIFO, Capacity which has buckets per job type,and Fair which allocated resources
evenly between jobs

Map Reduce Jobs

The Map step processes input data across the cluster. The Reduce steps collects the map output to create the results.
The Map step implemented by Mapper, creates data with key value fields. Input data to be processed is divided into (ideally) equal parts called splits, and those many instances of mappers are instantiated to process the splits,
usually on different nodes. InputFormat->RecordReader->deserialize into key,value pairs. TextInputFormat is the default input format, and it provides record num as key and record data as Text value. Its also possible to use a combiner to further group the mapping output. The output key-values of the mapper are the intermediate results. These correspond to the input for the Reducer. These are stored on the cluster. There is a shuffle stage, which now sorts the intermediate data by key, and depending on the size and partitioning, sends off to one or multiple reducers.
Each reducer is guaranteed to get sorted data for one or more keys. The partitioner controls this. Further there is a grouping control too, to decide which values are send to one invocation of reduce(). The reducer then produces the output specific to that key, producing a (potentially new) set of key value pairs. The default way for map and reduce tasks to create output is using context.write( key, value).
Both map and reduce methods get the input params( key, value, context). In case of reduce, it is multiple values against a key. They can also write directly the file-system for needs that do not exactly fit the key-value paradigm.
On the output side, we have key, value serialized using OutputFormat ->RecordWriter.
Its possible to have a job with only a Map task and no reducers. This defaults to the IdentityReducer. The output of the map stage then becomes the final output. Its even possible in this case to specify the number of reducers, and these many output files are created. shuffle/sort will happen in this case. However, if we specify reducers as 0, no shuffle/sort will take place. Its also possible to provide a custom shuffle/sort implementation from hadoop 2.9.2 onwards. This can be useful for e.g., when you don't need a sort, or a different type of sort.

Can reducers start running when some but not all maps have run ? -No, coz all values for a key are guaranteed to go to a single reducer, and this can't be known till all maps finish generating the key value pairs.

There is an OutputCommiter API to handle pre-post custom actions on tasks.
There are inbuilt counters to track the job execution, tasks, file-system, adn input-output formats.
Its possible to create user-defined counters as well.

There is a distributed cache, where frequently used files and jars can be stored.

The Hadoop Streaming API allows us to use any executable script like python or shell to execute map-reduce jobs.
We can pass data from/to our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

Questions
Related tools :

Avro

Language-neutral data-serialization system. Described using a language independent schema, usually json. The spec describes the binary format all implementations must support. Similar to SequenceFile, but portable.
A data file contains header with schema and metadata. Sync markers make the file splittable. Can use different schema version for reading and writing a file, making the schema easy to evolve. This can also be useful to read a few fields from a large number of fields.Record classes can be generated from avro schema, or the GenericRecord type can be used.

Parquet

Columnar storage format that can efficiently store nested data. i.e. objects within objects. Can reduce file size by compressing data of a column better. Can improve query performance if only a small subset of columns are read, since data is stored in columnar fashion. Supported by large number of tools. Uses a schema with small set of pre-defined types. A parquet file consists of a header with a magic number and a footer which has the meta-data along with block boundaries. Hence splittable. Organized as blocks -> row-groups -> column-chunks->pages. Each column-chunk has data only for a single column. Compression is achieved by using encodings like delta, run-length, dictionary etc. To write an object data, we need a schema, a writer and the object. For reading, its a reader. Classes like AvroParquetReader/Writer are available to interoperate between Avro and Parquet.
Parquet-tools are available to work with these files, e.g dump contents.

Flume
Event processing framework with sources and sinks for Hadoop.

Sqoop
Tool to import/export data from/to databases. Supports text,binary files and formats like Avro, Parquet, SequenceFiles etc. Can work with RBMS, as well as others like Hbase, Hive. Sqoop 2 has REST, java APIs, Web UI etc. Sqoop uses Map-reduce tasks to execute the import/exports in parallel. The check-column option splits the data to be imported into hadoop based on ranges of values in the specified column. Similar options are available for exporting from hadoop into a DB, along with last-update option for incremental updates. It can also generate java classes from tables to hold row data. It also allows a direct-mode for databases that support it, to import/export rows faster.  Support storing LOBs in separate files.