gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] 02/05: Logging and sending HTTP chunks via library's handlers


From: gnunet
Subject: [libeufin] 02/05: Logging and sending HTTP chunks via library's handlers.
Date: Mon, 11 Oct 2021 09:51:02 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

commit b2423b33b921c9ac95ed40d95ec9fbe9d7811d2b
Author: ms <ms@taler.net>
AuthorDate: Mon Oct 11 09:24:35 2021 +0200

    Logging and sending HTTP chunks via library's handlers.
---
 util/src/main/kotlin/UnixDomainSocket.kt | 110 +++++++++++++++++--------------
 1 file changed, 61 insertions(+), 49 deletions(-)

diff --git a/util/src/main/kotlin/UnixDomainSocket.kt 
b/util/src/main/kotlin/UnixDomainSocket.kt
index b2713f2..448f1d5 100644
--- a/util/src/main/kotlin/UnixDomainSocket.kt
+++ b/util/src/main/kotlin/UnixDomainSocket.kt
@@ -1,75 +1,75 @@
 import io.ktor.application.*
 import io.ktor.client.statement.*
 import io.ktor.http.*
+import io.ktor.http.HttpHeaders
 import io.ktor.http.HttpMethod
 import io.ktor.server.engine.*
 import io.ktor.server.testing.*
+import io.ktor.utils.io.pool.*
 import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufInputStream
 import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledDirectByteBuf
 import io.netty.channel.*
 import io.netty.channel.epoll.EpollEventLoopGroup
 import io.netty.channel.epoll.EpollServerDomainSocketChannel
 import io.netty.channel.unix.DomainSocketAddress
+import io.netty.handler.codec.LengthFieldPrepender
 import io.netty.handler.codec.http.*
 import io.netty.handler.codec.http.DefaultHttpResponse
+import io.netty.handler.codec.http.HttpMessage
+import io.netty.handler.logging.LogLevel
+import io.netty.handler.logging.LoggingHandler
+import io.netty.handler.stream.ChunkedInput
+import io.netty.handler.stream.ChunkedStream
+import io.netty.handler.stream.ChunkedWriteHandler
 import io.netty.util.AttributeKey
+import io.netty.util.ReferenceCountUtil
+import org.slf4j.LoggerFactory
+import java.io.ByteArrayInputStream
+import java.io.InputStream
+import java.nio.charset.Charset
 
 fun startServer(unixSocketPath: String, app: Application.() -> Unit) {
-
     val boss = EpollEventLoopGroup()
     val worker = EpollEventLoopGroup()
-    val serverBootstrap = ServerBootstrap()
-    serverBootstrap.group(boss, worker).channel(
-        EpollServerDomainSocketChannel::class.java
-    ).childHandler(LibeufinHttpInit(app))
-
-    val socketPath = DomainSocketAddress(unixSocketPath)
-    serverBootstrap.bind(socketPath).sync().channel().closeFuture().sync()
+    try {
+        val serverBootstrap = ServerBootstrap()
+        serverBootstrap.group(boss, worker).channel(
+            EpollServerDomainSocketChannel::class.java
+        ).childHandler(LibeufinHttpInit(app))
+        val socketPath = DomainSocketAddress(unixSocketPath)
+        logger.debug("Listening on $unixSocketPath ..")
+        serverBootstrap.bind(socketPath).sync().channel().closeFuture().sync()
+    } finally {
+        boss.shutdownGracefully()
+        worker.shutdownGracefully()
+    }
 }
 
-private val ktorApplicationKey = AttributeKey.newInstance<Application.() -> 
Unit>("KtorApplicationCall")
-
-class LibeufinHttpInit(private val app: Application.() -> Unit) : 
ChannelInitializer<Channel>() {
+class LibeufinHttpInit(
+    private val app: Application.() -> Unit
+) : ChannelInitializer<Channel>() {
     override fun initChannel(ch: Channel) {
-        val libeufinHandler = LibeufinHttpHandler()
         ch.pipeline(
-        ).addLast(
-            HttpServerCodec()
-        ).addLast(
-            HttpObjectAggregator(Int.MAX_VALUE)
-        ).addLast(
-            libeufinHandler
-        )
-        val libeufinCtx: ChannelHandlerContext = 
ch.pipeline().context(libeufinHandler)
-        libeufinCtx.attr(ktorApplicationKey).set(app)
+        ).addLast(LoggingHandler("tech.libeufin.util")
+        ).addLast(HttpServerCodec() // in- and out- bound
+        ).addLast(HttpObjectAggregator(Int.MAX_VALUE) // only in- bound
+        ).addLast(ChunkedWriteHandler()
+        ).addLast(LibeufinHttpHandler(app)) // in- bound, and triggers out- 
bound.
     }
 }
 
-class LibeufinHttpHandler : SimpleChannelInboundHandler<FullHttpRequest>() {
-
+class LibeufinHttpHandler(
+    private val app: Application.() -> Unit
+) : SimpleChannelInboundHandler<FullHttpRequest>() {
     @OptIn(EngineAPI::class)
-    override fun channelRead0(ctx: ChannelHandlerContext?, msg: 
FullHttpRequest) {
-        val app = ctx?.attr(ktorApplicationKey)?.get()
-        if (app == null) throw UtilError(
-            HttpStatusCode.InternalServerError,
-            "custom libEufin Unix-domain-socket+HTTP handler lost its Web app",
-            null
-        )
-        /**
-         * Below is only a echo of what euFin gets from the network.  All
-         * the checks should then occur at the Web app + Ktor level.  Hence,
-         * a HTTP call of GET with a non-empty body is not to be blocked / 
warned
-         * at this level.
-         *
-         * The only exception is the HTTP version value in the response, as the
-         * response returned by the Web app does not set it.  Therefore, this
-         * proxy echoes back the HTTP version that was read in the request.
-         */
+    override fun channelRead0(ctx: ChannelHandlerContext, msg: 
FullHttpRequest) {
         withTestApplication(app) {
             val httpVersion = msg.protocolVersion()
-            // Proxying the request with Ktor API.
-            val call = handleRequest(closeRequest = false) {
+            // Proxying the request to Ktor API.
+            val call = handleRequest {
                 msg.headers().forEach { addHeader(it.key, it.value) }
                 method = HttpMethod(msg.method().name())
                 uri = msg.uri()
@@ -81,17 +81,29 @@ class LibeufinHttpHandler : 
SimpleChannelInboundHandler<FullHttpRequest>() {
                 "app proxied via Unix domain socket did not include a response 
status code",
                 ec = null // FIXME: to be defined.
             )
-            // Responding with Netty API.
-            val response = DefaultFullHttpResponse(
+            // Responding to Netty API.
+            val response = DefaultHttpResponse(
                 httpVersion,
-                HttpResponseStatus.valueOf(statusCode),
-                Unpooled.wrappedBuffer(call.response.byteContent ?: 
ByteArray(0))
+                HttpResponseStatus.valueOf(statusCode)
             )
+            var chunked = false
             call.response.headers.allValues().forEach { s, list ->
-                response.headers().set(s, list.joinToString()) // 
joinToString() separates with ", " by default.
+                if (s == HttpHeaders.TransferEncoding && 
list.contains("chunked"))
+                    chunked = true
+                response.headers().set(s, list.joinToString())
+            }
+            ctx.writeAndFlush(response)
+            if (chunked) {
+                ctx.writeAndFlush(
+                    HttpChunkedInput(
+                        ChunkedStream(
+                            ByteArrayInputStream(call.response.byteContent)
+                        )
+                    )
+                )
+            } else {
+                
ctx.writeAndFlush(Unpooled.wrappedBuffer(call.response.byteContent))
             }
-            ctx.write(response)
-            ctx.flush()
         }
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]