Ich entwickle einen Netty-Server, der als Backend für die Android-App verwendet werden kann. In meiner aktuellen Implementierung wurde der Zugriff auf die Datenbank im Logik-Handler realisiert, der von einem speziellen Netty-Threadpool (nicht von E / A-Threads) ausgeführt wird, indem eine DB-Verbindung pro Netty-Kanal wie folgt verwendet wird:

Initialisieren:

EventExecutorGroup logicExecutor = new DefaultEventExecutorGroup(4);
EventLoopGroup acceptGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try
{
    ServerBootstrap b = new ServerBootstrap();
    b.group(acceptGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 50)
        .childOption(ChannelOption.SO_KEEPALIVE, false)
        .childHandler(new ChannelInitializer<SocketChannel>()
        {
        @Override
        public void initChannel(SocketChannel ch) throws Exception
        {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(5*60, 0, 0));
            pipeline.addLast(new ProtobufDelimitedFrameDecoder(65536));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufDecoder(NetMsg.ClientMsg.getDefaultInstance()));
            pipeline.addLast(new ProtobufEncoder());
            pipeline.addLast(logicExecutor, "logic", new ChannelLogicHandler());
        }
        });

Offene Verbindung zur DB bei Kanalaktivierung:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    dbConnection = DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
    if (dbConnection == null)
    throw new SQLException("Connection to database failed");

    super.channelActive(ctx);
}

... und Verbindung auf Kanal inaktiv schließen.

Soweit ich weiß, ordnet Netty jedem Thread in seinem eigenen Thread-Pool Kanäle für den gesamten Lebenszyklus zu. In meinem Fall bedeutet die Verwendung von DefaultEventExecutorGroup (4) für den Logik-Handler, dass alle Kanäle mit vier Threads und für jeden Thread bedient werden Bei einem bestimmten Kanal wird nur ein Thread aus dem Threadpool verwendet. Die Aufrechterhaltung einer DB-Verbindung pro Executor-Thread reicht daher aus, um die Datenintegrität ohne Sperren (mit der entsprechenden Transaktionsisolationsstufe) bereitzustellen. Meine Frage ist also, ob es möglich ist, eine DB-Verbindung pro Thread im Threadpool zuzuordnen, sodass jede Verbindung beim Starten des Threads (oder beim Zuordnen des ersten Kanals zu ihm) hergestellt wurde, und wie sie implementiert werden kann.

1
jetc 4 Jän. 2016 im 22:27

2 Antworten

Beste Antwort

Ich schätze, ich habe selbst eine Lösung gefunden - jeder ChannelHandlerContext (ctx) hat seinen eigenen EventExecutor, der im Wesentlichen ein Thread ist. Also benutze ich Hashmap, um Datenbankverbindungen mit Executoren herzustellen. In Code:

//Declare hashmap in main server class
private final HashMap<EventExecutor,java.sql.Connection> execConsMap = new HashMap<>(4);

// ........................

    public class ChannelLogicHandler extends ChannelInboundHandlerAdapter
{
private java.sql.Connection dbConnection = null; //DB connection saved as private member of logic handler
//........................

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    EventExecutor ex = ctx.executor(); //Get channel executor
    synchronized(execConsMap)
    {
    if (execConsMap.containsKey(ex)) //If already processed get DB connection from hashmap
    {
        dbConnection = execConsMap.get(ex);
    }
    else //Else create new connection and save in hashmap
    {
        java.sql.Connection dbc = DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
        if (dbc != null)
        {
        execConsMap.put(ex, dbc);
        dbConnection = dbc;
        }
        else
        {
        throw new SQLException("Connection to database failed");
        }
    }
    }

    System.out.println("New client connected");
    super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
    try
    {
    if (!dbConnection.getAutoCommit())
    {
        dbConnection.rollback();
        dbConnection.setAutoCommit(true);
    }
    }
    catch (SQLException e) 
    { 
    e.printStackTrace(); 
    }

    System.out.println("Client disconnected");
    super.channelInactive(ctx);
}

DB-Verbindungen beim Serverstopp geschlossen:

logicExecutor.shutdownGracefully().addListener(new GenericFutureListener()
     {
        @Override
        public void operationComplete(Future future) throws Exception
        {
            for (java.sql.Connection conn : execConsMap.values())
            {
                try 
                {
                    if (!conn.getAutoCommit())
                    conn.rollback();
                } catch (SQLException e) 
                { e.printStackTrace(); }

                try 
                {
                    conn.close();
                } catch (SQLException e) 
                { e.printStackTrace(); }
            }
        }
     });
2
jetc 4 Jän. 2016 im 21:50

Wenn Sie eine Variable pro Thread möchten, können Sie ein ThreadLocal-Feld verwenden.

Etwas wie das:

private static final ThreadLocal<DatabaseConnection> databaseConnection =
     new ThreadLocal<DatabaseConnection>() {
         @Override protected DatabaseConnection initialValue() {
             return DriverManager.getConnection(dbConnectParams[0], dbConnectParams[1], dbConnectParams[2]);
     }
 };

Dabei hat jeder Thread eine andere Datenbankverbindung. Wenn der erste Aufruf der Methode get durchgeführt wird, wird die Verbindung für den Thread durch Aufrufen der Methode initialValue() initialisiert, mit der die Datenbankverbindung erstellt wird. Nachfolgende Aufrufe geben denselben vorherigen Wert zurück. Sie können jedoch auch manuell einen neuen Wert für dieses Feld festlegen.

0
Prim 4 Jän. 2016 im 21:23