Skip to content

add configurable response code #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/logstash/inputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class LogStash::Inputs::Http < LogStash::Inputs::Base

config :max_content_length, :validate => :number, :required => false, :default => 100 * 1024 * 1024

config :response_code, :validate => [200, 202, 204], :default => 200
# Deprecated options

# The JKS keystore to validate the client's certificates
Expand Down Expand Up @@ -210,7 +211,7 @@ def validate_ssl_settings!

def create_http_server(message_handler)
org.logstash.plugins.inputs.http.NettyHttpServer.new(
@host, @port, message_handler, build_ssl_params(), @threads, @max_pending_requests, @max_content_length)
@host, @port, message_handler, build_ssl_params(), @threads, @max_pending_requests, @max_content_length, @response_code)
end

def build_ssl_params
Expand Down
16 changes: 16 additions & 0 deletions spec/inputs/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,22 @@
end
end
end
describe "return code" do
it "responds with a 200" do
response = client.post("http://127.0.0.1:#{port}", :body => "hello")
response.call
expect(response.code).to eq(200)
end
context "when response_code is configured" do
let(:code) { 202 }
subject { LogStash::Inputs::Http.new("port" => port, "response_code" => code) }
it "responds with the configured code" do
response = client.post("http://127.0.0.1:#{port}", :body => "hello")
response.call
expect(response.code).to eq(202)
end
end
end
end

context "with :ssl => false" do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslHandler;
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;
Expand All @@ -18,13 +19,15 @@ public class HttpInitializer extends ChannelInitializer<SocketChannel> {
private final IMessageHandler messageHandler;
private SslHandlerProvider sslHandlerProvider;
private final int maxContentLength;
private final HttpResponseStatus responseStatus;
private final ThreadPoolExecutor executorGroup;

public HttpInitializer(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
int maxContentLength) {
int maxContentLength, HttpResponseStatus responseStatus) {
this.messageHandler = messageHandler;
this.executorGroup = executorGroup;
this.maxContentLength = maxContentLength;
this.responseStatus = responseStatus;
}

protected void initChannel(SocketChannel socketChannel) throws Exception {
Expand All @@ -37,7 +40,7 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup));
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup, responseStatus));
}

public void enableSSL(SslHandlerProvider sslHandlerProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque

private final IMessageHandler messageHandler;
private final ThreadPoolExecutor executorGroup;
private final HttpResponseStatus responseStatus;

public HttpServerHandler(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup) {
public HttpServerHandler(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
HttpResponseStatus responseStatus) {
this.messageHandler = messageHandler;
this.executorGroup = executorGroup;
this.responseStatus = responseStatus;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
final String remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
msg.retain();
final MessageProcessor messageProcessor = new MessageProcessor(ctx, msg, remoteAddress, messageHandler);
final MessageProcessor messageProcessor = new MessageProcessor(ctx, msg, remoteAddress, messageHandler, responseStatus);
executorGroup.execute(messageProcessor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ public class MessageProcessor implements RejectableRunnable {
private final FullHttpRequest req;
private final String remoteAddress;
private final IMessageHandler messageHandler;
private final HttpResponseStatus responseStatus;
private static final Charset charset = Charset.forName("UTF-8");

MessageProcessor(ChannelHandlerContext ctx, FullHttpRequest req, String remoteAddress,
IMessageHandler messageHandler) {
IMessageHandler messageHandler, HttpResponseStatus responseStatus) {
this.ctx = ctx;
this.req = req;
this.remoteAddress = remoteAddress;
this.messageHandler = messageHandler;
this.responseStatus = responseStatus;
}

public void onRejection() {
Expand Down Expand Up @@ -76,19 +78,21 @@ private FullHttpResponse generateFailedResponse(HttpResponseStatus status) {

private FullHttpResponse generateResponse(Map<String, String> stringHeaders) {

final ByteBuf payload = Unpooled.wrappedBuffer("ok".getBytes(charset));
final FullHttpResponse response = new DefaultFullHttpResponse(
req.protocolVersion(),
HttpResponseStatus.OK,
payload);

responseStatus);
final DefaultHttpHeaders headers = new DefaultHttpHeaders();
for(String key : stringHeaders.keySet()) {
headers.set(key, stringHeaders.get(key));
}
response.headers().set(headers);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, payload.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");

if (responseStatus != HttpResponseStatus.NO_CONTENT) {
final ByteBuf payload = Unpooled.wrappedBuffer("ok".getBytes(charset));
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, payload.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.content().writeBytes(payload);
}

return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.logstash.plugins.inputs.http.util.CustomRejectedExecutionHandler;
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;

Expand All @@ -27,20 +28,23 @@ public class NettyHttpServer implements Runnable, Closeable {

private final EventLoopGroup processorGroup;
private final ThreadPoolExecutor executorGroup;
private final HttpResponseStatus responseStatus;

public NettyHttpServer(String host, int port, IMessageHandler messageHandler,
SslHandlerProvider sslHandlerProvider, int threads,
int maxPendingRequests, int maxContentLength)
int maxPendingRequests, int maxContentLength, int responseCode)
{
this.host = host;
this.port = port;
this.responseStatus = HttpResponseStatus.valueOf(responseCode);
processorGroup = new NioEventLoopGroup(threads, daemonThreadFactory("http-input-processor"));

executorGroup = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(maxPendingRequests), daemonThreadFactory("http-input-handler-executor"),
new CustomRejectedExecutionHandler());

final HttpInitializer httpInitializer = new HttpInitializer(messageHandler, executorGroup, maxContentLength);
final HttpInitializer httpInitializer = new HttpInitializer(messageHandler, executorGroup,
maxContentLength, responseStatus);

if (sslHandlerProvider != null) {
httpInitializer.enableSSL(sslHandlerProvider);
Expand Down