Skip to content

Commit f6d7dac

Browse files
committed
add configurable response code
1 parent 7ed8691 commit f6d7dac

File tree

6 files changed

+45
-14
lines changed

6 files changed

+45
-14
lines changed

lib/logstash/inputs/http.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base
115115

116116
config :max_content_length, :validate => :number, :required => false, :default => 100 * 1024 * 1024
117117

118+
config :response_code, :validate => [200, 202, 204], :default => 200
118119
# Deprecated options
119120

120121
# The JKS keystore to validate the client's certificates
@@ -210,7 +211,7 @@ def validate_ssl_settings!
210211

211212
def create_http_server(message_handler)
212213
org.logstash.plugins.inputs.http.NettyHttpServer.new(
213-
@host, @port, message_handler, build_ssl_params(), @threads, @max_pending_requests, @max_content_length)
214+
@host, @port, message_handler, build_ssl_params(), @threads, @max_pending_requests, @max_content_length, @response_code)
214215
end
215216

216217
def build_ssl_params

spec/inputs/http_spec.rb

+16
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,22 @@
341341
end
342342
end
343343
end
344+
describe "return code" do
345+
it "responds with a 200" do
346+
response = client.post("http://127.0.0.1:#{port}", :body => "hello")
347+
response.call
348+
expect(response.code).to eq(200)
349+
end
350+
context "when response_code is configured" do
351+
let(:code) { 202 }
352+
subject { LogStash::Inputs::Http.new("port" => port, "response_code" => code) }
353+
it "responds with the configured code" do
354+
response = client.post("http://127.0.0.1:#{port}", :body => "hello")
355+
response.call
356+
expect(response.code).to eq(202)
357+
end
358+
end
359+
end
344360
end
345361

346362
context "with :ssl => false" do

src/main/java/org/logstash/plugins/inputs/http/HttpInitializer.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.netty.channel.socket.SocketChannel;
66
import io.netty.handler.codec.http.HttpContentDecompressor;
77
import io.netty.handler.codec.http.HttpObjectAggregator;
8+
import io.netty.handler.codec.http.HttpResponseStatus;
89
import io.netty.handler.codec.http.HttpServerCodec;
910
import io.netty.handler.ssl.SslHandler;
1011
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;
@@ -18,13 +19,15 @@ public class HttpInitializer extends ChannelInitializer<SocketChannel> {
1819
private final IMessageHandler messageHandler;
1920
private SslHandlerProvider sslHandlerProvider;
2021
private final int maxContentLength;
22+
private final HttpResponseStatus responseStatus;
2123
private final ThreadPoolExecutor executorGroup;
2224

2325
public HttpInitializer(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
24-
int maxContentLength) {
26+
int maxContentLength, HttpResponseStatus responseStatus) {
2527
this.messageHandler = messageHandler;
2628
this.executorGroup = executorGroup;
2729
this.maxContentLength = maxContentLength;
30+
this.responseStatus = responseStatus;
2831
}
2932

3033
protected void initChannel(SocketChannel socketChannel) throws Exception {
@@ -37,7 +40,7 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
3740
pipeline.addLast(new HttpServerCodec());
3841
pipeline.addLast(new HttpContentDecompressor());
3942
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
40-
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup));
43+
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup, responseStatus));
4144
}
4245

4346
public void enableSSL(SslHandlerProvider sslHandlerProvider) {

src/main/java/org/logstash/plugins/inputs/http/HttpServerHandler.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,20 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
2222

2323
private final IMessageHandler messageHandler;
2424
private final ThreadPoolExecutor executorGroup;
25+
private final HttpResponseStatus responseStatus;
2526

26-
public HttpServerHandler(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup) {
27+
public HttpServerHandler(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
28+
HttpResponseStatus responseStatus) {
2729
this.messageHandler = messageHandler;
2830
this.executorGroup = executorGroup;
31+
this.responseStatus = responseStatus;
2932
}
3033

3134
@Override
3235
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
3336
final String remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
3437
msg.retain();
35-
final MessageProcessor messageProcessor = new MessageProcessor(ctx, msg, remoteAddress, messageHandler);
38+
final MessageProcessor messageProcessor = new MessageProcessor(ctx, msg, remoteAddress, messageHandler, responseStatus);
3639
executorGroup.execute(messageProcessor);
3740
}
3841

src/main/java/org/logstash/plugins/inputs/http/MessageProcessor.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ public class MessageProcessor implements RejectableRunnable {
2222
private final FullHttpRequest req;
2323
private final String remoteAddress;
2424
private final IMessageHandler messageHandler;
25+
private final HttpResponseStatus responseStatus;
2526
private static final Charset charset = Charset.forName("UTF-8");
2627

2728
MessageProcessor(ChannelHandlerContext ctx, FullHttpRequest req, String remoteAddress,
28-
IMessageHandler messageHandler) {
29+
IMessageHandler messageHandler, HttpResponseStatus responseStatus) {
2930
this.ctx = ctx;
3031
this.req = req;
3132
this.remoteAddress = remoteAddress;
3233
this.messageHandler = messageHandler;
34+
this.responseStatus = responseStatus;
3335
}
3436

3537
public void onRejection() {
@@ -76,19 +78,21 @@ private FullHttpResponse generateFailedResponse(HttpResponseStatus status) {
7678

7779
private FullHttpResponse generateResponse(Map<String, String> stringHeaders) {
7880

79-
final ByteBuf payload = Unpooled.wrappedBuffer("ok".getBytes(charset));
8081
final FullHttpResponse response = new DefaultFullHttpResponse(
8182
req.protocolVersion(),
82-
HttpResponseStatus.OK,
83-
payload);
84-
83+
responseStatus);
8584
final DefaultHttpHeaders headers = new DefaultHttpHeaders();
8685
for(String key : stringHeaders.keySet()) {
8786
headers.set(key, stringHeaders.get(key));
8887
}
8988
response.headers().set(headers);
90-
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, payload.readableBytes());
91-
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
89+
90+
if (responseStatus != HttpResponseStatus.NO_CONTENT) {
91+
final ByteBuf payload = Unpooled.wrappedBuffer("ok".getBytes(charset));
92+
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, payload.readableBytes());
93+
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
94+
response.content().writeBytes(payload);
95+
}
9296

9397
return response;
9498
}

src/main/java/org/logstash/plugins/inputs/http/NettyHttpServer.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.netty.channel.EventLoopGroup;
77
import io.netty.channel.nio.NioEventLoopGroup;
88
import io.netty.channel.socket.nio.NioServerSocketChannel;
9+
import io.netty.handler.codec.http.HttpResponseStatus;
910
import org.logstash.plugins.inputs.http.util.CustomRejectedExecutionHandler;
1011
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;
1112

@@ -27,20 +28,23 @@ public class NettyHttpServer implements Runnable, Closeable {
2728

2829
private final EventLoopGroup processorGroup;
2930
private final ThreadPoolExecutor executorGroup;
31+
private final HttpResponseStatus responseStatus;
3032

3133
public NettyHttpServer(String host, int port, IMessageHandler messageHandler,
3234
SslHandlerProvider sslHandlerProvider, int threads,
33-
int maxPendingRequests, int maxContentLength)
35+
int maxPendingRequests, int maxContentLength, int responseCode)
3436
{
3537
this.host = host;
3638
this.port = port;
39+
this.responseStatus = HttpResponseStatus.valueOf(responseCode);
3740
processorGroup = new NioEventLoopGroup(threads, daemonThreadFactory("http-input-processor"));
3841

3942
executorGroup = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
4043
new ArrayBlockingQueue<>(maxPendingRequests), daemonThreadFactory("http-input-handler-executor"),
4144
new CustomRejectedExecutionHandler());
4245

43-
final HttpInitializer httpInitializer = new HttpInitializer(messageHandler, executorGroup, maxContentLength);
46+
final HttpInitializer httpInitializer = new HttpInitializer(messageHandler, executorGroup,
47+
maxContentLength, responseStatus);
4448

4549
if (sslHandlerProvider != null) {
4650
httpInitializer.enableSSL(sslHandlerProvider);

0 commit comments

Comments
 (0)