Wondering what’s next for npm?Check out our public roadmap! »


    0.0.4 • Public • Published


    This allows a very simple node.js endpoint to be setup that will receive messages from flume. This is mostly useful for interoperability between flume and some other node.js-based process.

    There is also a matching source that can be used to send data into flume. This was mostly developed to test the sink, and is as such less supported.

    It's a very, very thin wrapper around thrift's output for the IDL files included in flume.

    Be warned: it's highly alpha at the moment.


    npm install flume-rpc

    Please note that there are bugs in the latest version of node-thrift (0.7.0) that cause the sink to throw exceptions under load and/or to suddenly jam up and eventually run out of memory. Please see https://github.com/wadey/node-thrift/pull/13 for details. I have created a fork of that project with the appropriate fixes in it; in order to install that you need to get the patched version at https://github.com/recoset/node-thrift. To do this, run

    npm install http://github.com/recoset/node-thrift/tarball/v0.7.0-recoset

    I'll update this readme once the fixes have been merged and a new release made.

    Synopsis (Sink)

    var flume = require('flume-rpc');
    var Sink = flume.Sink;
    var sink = new Sink;
    sink.on('message', function(msg) { console.log(msg.body); });
    sink.on('close', function(success) { this.close();  success(); });
    sink.listen(35861);  // this is the default flume RPC port

    To test (assuming there's a properly set up flume instance running):

    echo "hello" | flume sink 'rpcSink("localhost")'

    Synopsis (Source)

    var flume = require('flume-rpc');
    var Source = flume.Source;
    var source = new Source('localhost', 35861);
    source.on('connect', function () {
       source.log('hello', flume.Priority.INFO, function () {
           console.log('send done');  source.close();

    To test (assuming there's a properly set up flume instance running):

    flume dump 'rpcSource("localhost")'

    and then run the script above.

    Sink Reference


    var flume = require('flume-rpc');
    var Sink = flume.Sink;

    Creating a Sink

    var sink = new Sink;

    There are no constructor arguments; the configuration is done later on.

    Listening for messages

    sink.listen(port, [hostname], [callback]);

    This method will listen on the given port, binding to the given hostname. For the moment, for some unknown reason, the callback argument won't actually be called on a successful bind; you should use the 'listen' event instead. On an error, the 'error' event will be emitted.

    Closing down the sink


    This will close down the sink, asynchronously. The 'close' event will be emitted once it's finished shutdown.

    Getting log messages

    sink.on('message', function (msg) { ... });

    Registers a handle to be called whenever a message is received.

    Responding to an RPC close request

    As part of the protocol, a source can ask its sink to close via RPC. Personally, I haven't found a use for this but it's exposed nonetheless.

    sink.on('rpcClose', function (onSuccess) { ... ; onSuccess(); });

    The onSuccess() function should be called back once the close has succeeded. TODO: errors?

    Message format

    The sink receives messages that look like this:

    { timestamp: 1529023563,     // Timestamp in seconds
      nanos: 2506809501,         // nanosecond part of timestamp
      priority: 3,               // see flume.Priority for values
      body: 'hello',             // string or Buffer containing the data from the body
      host: 'host.name.com',     // host that it came from
      fields: {}                 // metadata associated with the event

    The fields structure may contain more information if the flume flow that produced the message is more complicated.

    Other Events

    sink.on('error', function (err) { ... });

    Called with the details of an error when one occurs.

    sink.on('connection', function (sock) { ... });

    Called with the created socket once a connection is made (something connects to the sink). See http://nodejs.org/docs/v0.4.9/api/net.html#event_connection_

    sink.on('listening', function () { ... });

    Called once the socket is bound and has started listening.

    sink.on('close', function () { ... });

    Called when the server closes. See http://nodejs.org/docs/v0.4.9/api/net.html#event_close_

    Accessing the underlying server

    These are not part of the API, but are exposed.


    This is ths server created by Thrift. It's derived from net.Server.

    Source Reference


    The RPC messages are sent with thrift, and so version 0.7.0 or greater of node thrift support is required. (Earlier versions don't allow the transport to be set).


    • TODO: list commands used to regenerate thrift bindings
    • TODO: discussion of selection of different transport


    npm i flume-rpc-x

    DownloadsWeekly Downloads






    Unpacked Size

    20.9 kB

    Total Files


    Last publish


    • avatar