MapReduce Cluster (MRCluster)
A single node multi-core pseudo-MapReduce implementation on NodeJS. Input files are automatically broken into blocks and distributed to the Mappers and Reducers.
Examples of implementations can be found in the README.
npm install mrcluster
.file: input file or arrays of files for the MR task.
.lineDelimiter: delimiter for linebreaks in the data.
.blockSize: size in Mb for each block.
.sample: number of sample chunks to run (to test ur codes).
.fn: pre-load customs functions into the
.mapOnly: Perform mapping only.
.numMappers: number of Mappers to use.
.numReducers: number of Reducers to use.
.partition: Custom function to control how the Mapper outputs are distributed to the Reducers. The function takes in a key, and returns a integer corresponding to the respective Reducer.
.map: Map function. Takes in a line and return a key-value pair array (1-1 mapping) or a hashtable of key-value pairs (1-n mapping).
.mapCSV: Alternative Map function. Takes in an array and return a key-value pair array (1-1 mapping) or a hashtable of key-value pairs (1-n mapping).
.combine: Combine function applied after the Map task in the Mapper. Takes in 2 values with same key, and return a value for the key.
.reduce: Reduce function. Takes in 2 values with same key, and return a value for the key.
.drain: Drain function. Takes in a hashtable of keys and values. Return a new hashtable of keys and values. Used to free up memory in the Reducer.
.post_reduce: Aggregate function after all the Reduce tasks are completed for each Reducer. Takes in an hashtable of key and values. Return a value to the master node.
.aggregate: Aggregate function at the master node. Aggregates all the values returned by all the Reducers. Takes in an array of values (same as number of Reducers).
.fnfunction. Similar to
.fnfunction allows the user to send custom functions to all the Mappers and Reducers.
- Fixed bug when
.combineis not defined.
.mapCSVis functionally equivalent to
.mapexcept the input is an array instead of a line. This array is extracted from a single line of csv using the method described here.
function) into all the
.partitionfunction. Custom function to control how the Mapper outputs are distributed to the Reducers.
.blockSizefunction to allow user to define the size of each block in Mb. Default is
mapfunction to allow 1-n mapping - mapping 1 line of data into multiple key-value pairs in the form of a hashtable.
.combinefunction to allow user to define the
.reducefunction to run at the
.drainfunction to allow user to clear the memory of the
Reducerafter each reduce task.
- Added example on how to rehash "long" user ids into unique integers.
Create a new instance
var mrcluster = ;
The module is written to be chainable. All settings are set via function call chains.
Specify the csv file or files to read in.
If an array of files are given, the files will be broken into their respective blocks and will pushed to the Mappers in a FIFO manner.
Specify the delimiter to indicate a new line. Default is
Specify the size of each block (in Mb) to break the file into. Default is
As each NodeJs process (aka each
Reducer) is limited to ~1 Gb RAM (x64), you might want to break up the file into sufficiently small blocks.
mrcluster; // each block will be ~64 Mb
Specify the number of Blocks to sample. The min number of samples must be >= number of
Mappers. Default is
-1 (Do not sample - run everything).
This function is useful to have a quick test of your codes before actually running through the entire dataset.
Pre-load an Object or variable to all
Reducers. E.g. An array of weights. This Object can be called in any of the callbacks (e.g.
.reduce) via the variable
ctx._cache variable for each
Reducers is mutable and persistent. Each
ctx._cache variable starts off identical but is independent from each other in the subsequent operations.
Pre-load an function to all
Reducers - so that you can call them within the
.drain functions. This function can be called multiple times, each time it appends a new custom function to the env.
The custom function can be called via
_fn variable as shown in the example below.
Specify whether to run only Mappers. Default is
Note that you still need to specify your
Reduce function as the
Reduce step is also performed in the
Specify the number of mappers to create. Default is
Specify the number of reducers to create. Default is
The underlying codes will hash all key-values pairs produced by the mappers into the respective reducers. Hence, each chunks of key-values pairs in each reducer is independent of each other. This reduces memory usage when doing the reduce operation.
Specify a custom hash function to distribute the Mapper outputs to the respective Reducers. Takes in
numReducers as 1st input, and custom hash function as 2nd input.
The custom hash function takes in a
key and returns an integer (representing which Reducer to send this key pair to). Note that the number of Reducers must match the output of the hash function. Custom hash function allows you to perform some ordering functions in map-reduce, but you have to take care that the hash function is able to evenly distribute the loads among the Reducers.
For example, if most key-value pairs have the keys starting with "A", then the hash function in the example below will allocate all the key-value pairs to the same reducer (as the hash function distribute the key-value pairs by the 1st character of the key).
First input specifies the mapping function to be applied on each line of data.
Second input (optional) is a flag to specify whether to write the content of each Mapper to disk. This is often used with the
mapOnly options when you are only doing
Map tasks (e.g. remapping data).
The function should take in a
String representing a line of data, and returns an
Array representing the resultant key-value pair.
Alternatively, the function can also return a
Hashtable of key-value pairs, aka, instead of mapping a line of data into a single key-value pair, the map function can also map a line of data into multiple key-value pairs represented in a hashtable.
This is a CSV replacement for the
.map function, where the input variable is an array instead of a line. This array is automatically extracted from the line using the method described here. The main advantage is that quotes and double quotes in the csv are automatically handled, and converted into an array. However, this come at the cost of extra computation time as
regex is used to extract the values. For simple CSV, it may be a better alternative to do a simple line split in the
combine function is essentially the
reduce operation to perform at the mapper, as some reduce jobs can be done at the mapper instead of the reducer.
By default the
combine function will be the same as the
reduce function. However, you can use this function call to specify a different
.reduce function at the mapper.
First input specifies the reduce function to be applied. The second input (optional) specifies whether to write the result of each Reduce jobs to disk.
This function is applied once in the
Mapper and once in the
Reducer. It is applied at the end of the
Mapper execution, just before returning the mapped results to the master node.
The function should take 2 variables representing the the values for the two key-value pairs. And returns a value representing the resultant value for the two key-value pairs. E.g. The following codes demonstrate the summing of the values for 2 key-value pairs - ['A',1] + ['A',1] = ['A',2]
In the MR task, each reducer actually hold in memory the hashtable of key-value pairs it has received so far. For some reduce tasks (e.g. concat, or append tasks), the size of the value in the key-value pair increases after each reduce task which might lead to out of memory problems.
.drain function can be used to free up memory in some of these situation. The
.drain function takes in a
hashtable of the current key-value pairs held in memory by the reducer, and returns the new
For example, you wish to rehash long user ids into unqiue integers. You can set the key as the user id and the value as remaining data at the Mapper. Your
.combine function can concat the data by keys.
The memory usage for the reducers will monotonically increase with each reduce pass if you keep appending new data by keys. So you can specify a
.drain function where you write the current data to file, and return a new hashtable with the keys but without the data. Hence, you can continue appending new data in your
Specify the function to be applied at the end of each
The function should take in an
hashtable holding all the key-values produced by the
Reducer. And can return any value to the master node for further collation (e.g. sum).
Specify the function to be applied at the end of all tasks.
The function should take in an
Array (representing the hash bins) holding all the returned Values produced by the
.post_reduce function (e.g. You can do a summation of all the returned sums of all the
Counting Unique Ids
A simple count of number of unique domains in the email list.
var mrcluster = ;mrcluster// line delimiter is \n// each block is 1 Mb// 2 mappers// 3 reducers// function to map a line of data to a key-value pair// simple reduce function which return a value of 1// sum the values of all key-value pairs in the Reducer// sum the results returned by all the Reducers// start MapReduce jobstart;
Finding similar users
Finding users share same domain for their emails.
var mrcluster = ;mrclusterstart;
Rehashing long user ids
Assuming you have very long user ids (e.g. md5 hashes), and you wish to replace these ids with unique integers. You can achieve this by setting the key to be the id, and the value to be the data for the
mapper. Then concat the data by keys in the
As you concat data, memory usage is monotonically increasing. So you will want to free up memory by writing out data that you have already grouped by id. And as the keys in each
reducer is independent to other
reducers, you can replace the key with an integer with base of the number of reducers (needs to be prime).
E.g. Assuming 7 reducers, 1st key in reducer 1 = 1, 2nd key in reducer 1 = 8, n key in reducer m = (n-1)*(number of reducers) + m
var mrcluster = ;mrclusterstart;