[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] 02/05: Logging and sending HTTP chunks via library's handlers
From: |
gnunet |
Subject: |
[libeufin] 02/05: Logging and sending HTTP chunks via library's handlers. |
Date: |
Mon, 11 Oct 2021 09:51:02 +0200 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
commit b2423b33b921c9ac95ed40d95ec9fbe9d7811d2b
Author: ms <ms@taler.net>
AuthorDate: Mon Oct 11 09:24:35 2021 +0200
Logging and sending HTTP chunks via library's handlers.
---
util/src/main/kotlin/UnixDomainSocket.kt | 110 +++++++++++++++++--------------
1 file changed, 61 insertions(+), 49 deletions(-)
diff --git a/util/src/main/kotlin/UnixDomainSocket.kt
b/util/src/main/kotlin/UnixDomainSocket.kt
index b2713f2..448f1d5 100644
--- a/util/src/main/kotlin/UnixDomainSocket.kt
+++ b/util/src/main/kotlin/UnixDomainSocket.kt
@@ -1,75 +1,75 @@
import io.ktor.application.*
import io.ktor.client.statement.*
import io.ktor.http.*
+import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod
import io.ktor.server.engine.*
import io.ktor.server.testing.*
+import io.ktor.utils.io.pool.*
import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufInputStream
import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledDirectByteBuf
import io.netty.channel.*
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollServerDomainSocketChannel
import io.netty.channel.unix.DomainSocketAddress
+import io.netty.handler.codec.LengthFieldPrepender
import io.netty.handler.codec.http.*
import io.netty.handler.codec.http.DefaultHttpResponse
+import io.netty.handler.codec.http.HttpMessage
+import io.netty.handler.logging.LogLevel
+import io.netty.handler.logging.LoggingHandler
+import io.netty.handler.stream.ChunkedInput
+import io.netty.handler.stream.ChunkedStream
+import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.AttributeKey
+import io.netty.util.ReferenceCountUtil
+import org.slf4j.LoggerFactory
+import java.io.ByteArrayInputStream
+import java.io.InputStream
+import java.nio.charset.Charset
fun startServer(unixSocketPath: String, app: Application.() -> Unit) {
-
val boss = EpollEventLoopGroup()
val worker = EpollEventLoopGroup()
- val serverBootstrap = ServerBootstrap()
- serverBootstrap.group(boss, worker).channel(
- EpollServerDomainSocketChannel::class.java
- ).childHandler(LibeufinHttpInit(app))
-
- val socketPath = DomainSocketAddress(unixSocketPath)
- serverBootstrap.bind(socketPath).sync().channel().closeFuture().sync()
+ try {
+ val serverBootstrap = ServerBootstrap()
+ serverBootstrap.group(boss, worker).channel(
+ EpollServerDomainSocketChannel::class.java
+ ).childHandler(LibeufinHttpInit(app))
+ val socketPath = DomainSocketAddress(unixSocketPath)
+ logger.debug("Listening on $unixSocketPath ..")
+ serverBootstrap.bind(socketPath).sync().channel().closeFuture().sync()
+ } finally {
+ boss.shutdownGracefully()
+ worker.shutdownGracefully()
+ }
}
-private val ktorApplicationKey = AttributeKey.newInstance<Application.() ->
Unit>("KtorApplicationCall")
-
-class LibeufinHttpInit(private val app: Application.() -> Unit) :
ChannelInitializer<Channel>() {
+class LibeufinHttpInit(
+ private val app: Application.() -> Unit
+) : ChannelInitializer<Channel>() {
override fun initChannel(ch: Channel) {
- val libeufinHandler = LibeufinHttpHandler()
ch.pipeline(
- ).addLast(
- HttpServerCodec()
- ).addLast(
- HttpObjectAggregator(Int.MAX_VALUE)
- ).addLast(
- libeufinHandler
- )
- val libeufinCtx: ChannelHandlerContext =
ch.pipeline().context(libeufinHandler)
- libeufinCtx.attr(ktorApplicationKey).set(app)
+ ).addLast(LoggingHandler("tech.libeufin.util")
+ ).addLast(HttpServerCodec() // in- and out- bound
+ ).addLast(HttpObjectAggregator(Int.MAX_VALUE) // only in- bound
+ ).addLast(ChunkedWriteHandler()
+ ).addLast(LibeufinHttpHandler(app)) // in- bound, and triggers out-
bound.
}
}
-class LibeufinHttpHandler : SimpleChannelInboundHandler<FullHttpRequest>() {
-
+class LibeufinHttpHandler(
+ private val app: Application.() -> Unit
+) : SimpleChannelInboundHandler<FullHttpRequest>() {
@OptIn(EngineAPI::class)
- override fun channelRead0(ctx: ChannelHandlerContext?, msg:
FullHttpRequest) {
- val app = ctx?.attr(ktorApplicationKey)?.get()
- if (app == null) throw UtilError(
- HttpStatusCode.InternalServerError,
- "custom libEufin Unix-domain-socket+HTTP handler lost its Web app",
- null
- )
- /**
- * Below is only a echo of what euFin gets from the network. All
- * the checks should then occur at the Web app + Ktor level. Hence,
- * a HTTP call of GET with a non-empty body is not to be blocked /
warned
- * at this level.
- *
- * The only exception is the HTTP version value in the response, as the
- * response returned by the Web app does not set it. Therefore, this
- * proxy echoes back the HTTP version that was read in the request.
- */
+ override fun channelRead0(ctx: ChannelHandlerContext, msg:
FullHttpRequest) {
withTestApplication(app) {
val httpVersion = msg.protocolVersion()
- // Proxying the request with Ktor API.
- val call = handleRequest(closeRequest = false) {
+ // Proxying the request to Ktor API.
+ val call = handleRequest {
msg.headers().forEach { addHeader(it.key, it.value) }
method = HttpMethod(msg.method().name())
uri = msg.uri()
@@ -81,17 +81,29 @@ class LibeufinHttpHandler :
SimpleChannelInboundHandler<FullHttpRequest>() {
"app proxied via Unix domain socket did not include a response
status code",
ec = null // FIXME: to be defined.
)
- // Responding with Netty API.
- val response = DefaultFullHttpResponse(
+ // Responding to Netty API.
+ val response = DefaultHttpResponse(
httpVersion,
- HttpResponseStatus.valueOf(statusCode),
- Unpooled.wrappedBuffer(call.response.byteContent ?:
ByteArray(0))
+ HttpResponseStatus.valueOf(statusCode)
)
+ var chunked = false
call.response.headers.allValues().forEach { s, list ->
- response.headers().set(s, list.joinToString()) //
joinToString() separates with ", " by default.
+ if (s == HttpHeaders.TransferEncoding &&
list.contains("chunked"))
+ chunked = true
+ response.headers().set(s, list.joinToString())
+ }
+ ctx.writeAndFlush(response)
+ if (chunked) {
+ ctx.writeAndFlush(
+ HttpChunkedInput(
+ ChunkedStream(
+ ByteArrayInputStream(call.response.byteContent)
+ )
+ )
+ )
+ } else {
+
ctx.writeAndFlush(Unpooled.wrappedBuffer(call.response.byteContent))
}
- ctx.write(response)
- ctx.flush()
}
}
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.