Class ShuffleChannelHandler

java.lang.Object
io.netty.channel.ChannelHandlerAdapter
io.netty.channel.ChannelInboundHandlerAdapter
io.netty.channel.SimpleChannelInboundHandler<io.netty.handler.codec.http.FullHttpRequest>
org.apache.hadoop.mapred.ShuffleChannelHandler
All Implemented Interfaces:
io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler

public class ShuffleChannelHandler extends io.netty.channel.SimpleChannelInboundHandler<io.netty.handler.codec.http.FullHttpRequest>
ShuffleChannelHandler verifies the map request then servers the attempts in a http stream. Before each attempt a serialised ShuffleHeader object is written with the details.
 Example Request
 ===================
 GET /mapOutput?job=job_1111111111111_0001&reduce=0&
     map=attempt_1111111111111_0001_m_000001_0,
     attempt_1111111111111_0002_m_000002_0,
     attempt_1111111111111_0003_m_000003_0 HTTP/1.1
 name: mapreduce
 version: 1.0.0
 UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=

 Example Response
 ===================
 HTTP/1.1 200 OK
 ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
 name: mapreduce
 version: 1.0.0
 connection: close
 content-length: 138

 +--------+-------------------------------------------------+----------------+
 |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
 |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 |111111_0001_m_00|
 |00000020| 30 30 30 31 5f 30 05 0a 00                      |0001_0...       |
 +--------+-------------------------------------------------+----------------+
 |00000000| 61 61 61 61 61                                  |aaaaa           |
 +--------+-------------------------------------------------+----------------+
 |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
 |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 |111111_0002_m_00|
 |00000020| 30 30 30 32 5f 30 05 0a 00                      |0002_0...       |
 +--------+-------------------------------------------------+----------------+
 |00000000| 62 62 62 62 62                                  |bbbbb           |
 +--------+-------------------------------------------------+----------------+
 |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
 |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 |111111_0003_m_00|
 |00000020| 30 30 30 33 5f 30 05 0a 00                      |0003_0...       |
 +--------+-------------------------------------------------+----------------+
 |00000000| 63 63 63 63 63                                  |ccccc           |
 +--------+-------------------------------------------------+----------------+
 
  • Nested Class Summary

    Nested Classes
    Modifier and Type
    Class
    Description
    static class 
    Maintain parameters per messageReceived() Netty context.

    Nested classes/interfaces inherited from interface io.netty.channel.ChannelHandler

    io.netty.channel.ChannelHandler.Sharable
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    channelActive(io.netty.channel.ChannelHandlerContext ctx)
     
    void
    channelInactive(io.netty.channel.ChannelHandlerContext ctx)
     
    void
    channelRead0(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request)
     
    void
    exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause)
     
    protected org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo
    getMapOutputInfo(String mapId, int reduce, String jobId, String user)
     
    protected void
    populateHeaders(List<String> mapIds, String jobId, String user, int reduce, io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, Map<String,org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo> mapOutputInfoMap)
     
    protected void
    sendError(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpResponseStatus status)
     
    protected void
    sendError(io.netty.channel.ChannelHandlerContext ctx, String message, io.netty.handler.codec.http.HttpResponseStatus status)
     
    protected void
    sendError(io.netty.channel.ChannelHandlerContext ctx, String msg, io.netty.handler.codec.http.HttpResponseStatus status, Map<String,String> headers)
     
    void
    Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend and increments it.
    protected io.netty.channel.ChannelFuture
    sendMapOutput(io.netty.channel.Channel ch, String user, String mapId, int reduce, org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo mapOutputInfo)
     
    protected void
    setResponseHeaders(io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, long contentLength)
     
    static io.netty.buffer.ByteBuf
    shuffleHeaderToBytes(org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader header)
     
    protected void
    verifyRequest(String appid, io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpRequest request, io.netty.handler.codec.http.HttpResponse response, URL requestUri)
     

    Methods inherited from class io.netty.channel.SimpleChannelInboundHandler

    acceptInboundMessage, channelRead

    Methods inherited from class io.netty.channel.ChannelInboundHandlerAdapter

    channelReadComplete, channelRegistered, channelUnregistered, channelWritabilityChanged, userEventTriggered

    Methods inherited from class io.netty.channel.ChannelHandlerAdapter

    ensureNotSharable, handlerAdded, handlerRemoved, isSharable

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

    Methods inherited from interface io.netty.channel.ChannelHandler

    handlerAdded, handlerRemoved
  • Method Details

    • channelActive

      public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      channelActive in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelActive in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • channelInactive

      public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception
      Specified by:
      channelInactive in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      channelInactive in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception
    • channelRead0

      public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request)
      Specified by:
      channelRead0 in class io.netty.channel.SimpleChannelInboundHandler<io.netty.handler.codec.http.FullHttpRequest>
    • sendMap

      public void sendMap(ShuffleChannelHandler.ReduceContext reduceContext)
      Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend and increments it. This method is first called by messageReceived() maxSessionOpenFiles times and then on the completion of every sendMapOutput operation. This limits the number of open files on a node, which can get really large(exhausting file descriptors on the NM) if all sendMapOutputs are called in one go, as was done previous to this change.
      Parameters:
      reduceContext - used to call sendMapOutput with correct params.
    • getMapOutputInfo

      protected org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo getMapOutputInfo(String mapId, int reduce, String jobId, String user) throws IOException
      Throws:
      IOException
    • populateHeaders

      protected void populateHeaders(List<String> mapIds, String jobId, String user, int reduce, io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, Map<String,org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo> mapOutputInfoMap) throws IOException
      Throws:
      IOException
    • setResponseHeaders

      protected void setResponseHeaders(io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, long contentLength)
    • verifyRequest

      protected void verifyRequest(String appid, io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpRequest request, io.netty.handler.codec.http.HttpResponse response, URL requestUri) throws IOException
      Throws:
      IOException
    • shuffleHeaderToBytes

      public static io.netty.buffer.ByteBuf shuffleHeaderToBytes(org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader header) throws IOException
      Throws:
      IOException
    • sendMapOutput

      protected io.netty.channel.ChannelFuture sendMapOutput(io.netty.channel.Channel ch, String user, String mapId, int reduce, org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo mapOutputInfo) throws IOException
      Throws:
      IOException
    • sendError

      protected void sendError(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpResponseStatus status)
    • sendError

      protected void sendError(io.netty.channel.ChannelHandlerContext ctx, String message, io.netty.handler.codec.http.HttpResponseStatus status)
    • sendError

      protected void sendError(io.netty.channel.ChannelHandlerContext ctx, String msg, io.netty.handler.codec.http.HttpResponseStatus status, Map<String,String> headers)
    • exceptionCaught

      public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception
      Specified by:
      exceptionCaught in interface io.netty.channel.ChannelHandler
      Specified by:
      exceptionCaught in interface io.netty.channel.ChannelInboundHandler
      Overrides:
      exceptionCaught in class io.netty.channel.ChannelInboundHandlerAdapter
      Throws:
      Exception