Package org.apache.hadoop.mapred
Class ShuffleChannelHandler
java.lang.Object
io.netty.channel.ChannelHandlerAdapter
io.netty.channel.ChannelInboundHandlerAdapter
io.netty.channel.SimpleChannelInboundHandler<io.netty.handler.codec.http.FullHttpRequest>
org.apache.hadoop.mapred.ShuffleChannelHandler
- All Implemented Interfaces:
io.netty.channel.ChannelHandler,io.netty.channel.ChannelInboundHandler
public class ShuffleChannelHandler
extends io.netty.channel.SimpleChannelInboundHandler<io.netty.handler.codec.http.FullHttpRequest>
ShuffleChannelHandler verifies the map request then servers the attempts in a http stream.
Before each attempt a serialised ShuffleHeader object is written with the details.
Example Request
===================
GET /mapOutput?job=job_1111111111111_0001&reduce=0&
map=attempt_1111111111111_0001_m_000001_0,
attempt_1111111111111_0002_m_000002_0,
attempt_1111111111111_0003_m_000003_0 HTTP/1.1
name: mapreduce
version: 1.0.0
UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
Example Response
===================
HTTP/1.1 200 OK
ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
name: mapreduce
version: 1.0.0
connection: close
content-length: 138
+--------+-------------------------------------------------+----------------+
|00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
|00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 |111111_0001_m_00|
|00000020| 30 30 30 31 5f 30 05 0a 00 |0001_0... |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 |aaaaa |
+--------+-------------------------------------------------+----------------+
|00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
|00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 |111111_0002_m_00|
|00000020| 30 30 30 32 5f 30 05 0a 00 |0002_0... |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 |bbbbb |
+--------+-------------------------------------------------+----------------+
|00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
|00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 |111111_0003_m_00|
|00000020| 30 30 30 33 5f 30 05 0a 00 |0003_0... |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 63 63 |ccccc |
+--------+-------------------------------------------------+----------------+
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classMaintain parameters per messageReceived() Netty context.Nested classes/interfaces inherited from interface io.netty.channel.ChannelHandler
io.netty.channel.ChannelHandler.Sharable -
Method Summary
Modifier and TypeMethodDescriptionvoidchannelActive(io.netty.channel.ChannelHandlerContext ctx) voidchannelInactive(io.netty.channel.ChannelHandlerContext ctx) voidchannelRead0(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request) voidexceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) protected org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfogetMapOutputInfo(String mapId, int reduce, String jobId, String user) protected voidpopulateHeaders(List<String> mapIds, String jobId, String user, int reduce, io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, Map<String, org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo> mapOutputInfoMap) protected voidsendError(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpResponseStatus status) protected voidsendError(io.netty.channel.ChannelHandlerContext ctx, String message, io.netty.handler.codec.http.HttpResponseStatus status) protected voidsendError(io.netty.channel.ChannelHandlerContext ctx, String msg, io.netty.handler.codec.http.HttpResponseStatus status, Map<String, String> headers) voidsendMap(ShuffleChannelHandler.ReduceContext reduceContext) Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend and increments it.protected io.netty.channel.ChannelFuturesendMapOutput(io.netty.channel.Channel ch, String user, String mapId, int reduce, org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo mapOutputInfo) protected voidsetResponseHeaders(io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, long contentLength) static io.netty.buffer.ByteBufshuffleHeaderToBytes(org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader header) protected voidverifyRequest(String appid, io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpRequest request, io.netty.handler.codec.http.HttpResponse response, URL requestUri) Methods inherited from class io.netty.channel.SimpleChannelInboundHandler
acceptInboundMessage, channelReadMethods inherited from class io.netty.channel.ChannelInboundHandlerAdapter
channelReadComplete, channelRegistered, channelUnregistered, channelWritabilityChanged, userEventTriggeredMethods inherited from class io.netty.channel.ChannelHandlerAdapter
ensureNotSharable, handlerAdded, handlerRemoved, isSharableMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface io.netty.channel.ChannelHandler
handlerAdded, handlerRemoved
-
Method Details
-
channelActive
- Specified by:
channelActivein interfaceio.netty.channel.ChannelInboundHandler- Overrides:
channelActivein classio.netty.channel.ChannelInboundHandlerAdapter- Throws:
Exception
-
channelInactive
- Specified by:
channelInactivein interfaceio.netty.channel.ChannelInboundHandler- Overrides:
channelInactivein classio.netty.channel.ChannelInboundHandlerAdapter- Throws:
Exception
-
channelRead0
public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.FullHttpRequest request) - Specified by:
channelRead0in classio.netty.channel.SimpleChannelInboundHandler<io.netty.handler.codec.http.FullHttpRequest>
-
sendMap
Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend and increments it. This method is first called by messageReceived() maxSessionOpenFiles times and then on the completion of every sendMapOutput operation. This limits the number of open files on a node, which can get really large(exhausting file descriptors on the NM) if all sendMapOutputs are called in one go, as was done previous to this change.- Parameters:
reduceContext- used to call sendMapOutput with correct params.
-
getMapOutputInfo
protected org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo getMapOutputInfo(String mapId, int reduce, String jobId, String user) throws IOException - Throws:
IOException
-
populateHeaders
protected void populateHeaders(List<String> mapIds, String jobId, String user, int reduce, io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, Map<String, org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo> mapOutputInfoMap) throws IOException- Throws:
IOException
-
setResponseHeaders
protected void setResponseHeaders(io.netty.handler.codec.http.HttpResponse response, boolean keepAliveParam, long contentLength) -
verifyRequest
protected void verifyRequest(String appid, io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpRequest request, io.netty.handler.codec.http.HttpResponse response, URL requestUri) throws IOException - Throws:
IOException
-
shuffleHeaderToBytes
public static io.netty.buffer.ByteBuf shuffleHeaderToBytes(org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader header) throws IOException - Throws:
IOException
-
sendMapOutput
protected io.netty.channel.ChannelFuture sendMapOutput(io.netty.channel.Channel ch, String user, String mapId, int reduce, org.apache.hadoop.mapred.ShuffleChannelHandler.MapOutputInfo mapOutputInfo) throws IOException - Throws:
IOException
-
sendError
protected void sendError(io.netty.channel.ChannelHandlerContext ctx, io.netty.handler.codec.http.HttpResponseStatus status) -
sendError
protected void sendError(io.netty.channel.ChannelHandlerContext ctx, String message, io.netty.handler.codec.http.HttpResponseStatus status) -
sendError
-
exceptionCaught
public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception - Specified by:
exceptionCaughtin interfaceio.netty.channel.ChannelHandler- Specified by:
exceptionCaughtin interfaceio.netty.channel.ChannelInboundHandler- Overrides:
exceptionCaughtin classio.netty.channel.ChannelInboundHandlerAdapter- Throws:
Exception
-