/*
 * Decompiled with CFR 0.152.
 */
package cz.vutbr.fit.persistence.cassandra.repository;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import cz.vutbr.fit.persistence.cassandra.entity.CassandraPacket;
import cz.vutbr.fit.persistence.cassandra.repository.AsyncOperations;
import cz.vutbr.fit.persistence.cassandra.repository.OnSuccessCallback;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.cassandra.core.AsyncCassandraTemplate;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;

public class PacketRepositoryImpl
implements AsyncOperations {
    private static final Logger LOGGER = LoggerFactory.getLogger(PacketRepositoryImpl.class);
    @Autowired
    private Cluster cluster;
    @Autowired
    private Session session;
    private AsyncCassandraTemplate asyncCassandraTemplate;
    private static final String INSERT_QUERY = "INSERT INTO packet (id, packet) VALUES(?, ?);";
    private PreparedStatement insertPreparedStatement;
    private static final String SELECT_QUERY = "SELECT * FROM packet WHERE id = ?";
    private PreparedStatement selectPreparedStatement;
    private int maxRequestsPerConnection = 0;
    private int maxConnectionsPerHost = 0;
    private Semaphore semaphore;

    @PostConstruct
    private void init() {
        this.postConstructValidation();
        this.postConstructInitialization();
    }

    private void postConstructValidation() {
        Assert.notNull((Object)this.cluster, (String)"Cluster must be initialized");
        Assert.notNull((Object)this.session, (String)"Session must be initialized");
    }

    private void postConstructInitialization() {
        this.initAsyncCassandraTemplate();
        this.initQueries();
        this.initSemaphore();
    }

    private void initAsyncCassandraTemplate() {
        this.asyncCassandraTemplate = new AsyncCassandraTemplate(this.session);
    }

    private void initQueries() {
        this.insertPreparedStatement = this.session.prepare(INSERT_QUERY);
        this.selectPreparedStatement = this.session.prepare(SELECT_QUERY);
    }

    private void initSemaphore() {
        LoadBalancingPolicy loadBalancingPolicy = this.cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
        PoolingOptions poolingOptions = this.cluster.getConfiguration().getPoolingOptions();
        this.session.getState().getConnectedHosts().forEach(host -> {
            HostDistance distance = loadBalancingPolicy.distance(host);
            this.maxRequestsPerConnection = poolingOptions.getMaxRequestsPerConnection(distance);
            this.maxConnectionsPerHost = poolingOptions.getMaxConnectionsPerHost(distance);
        });
        int semaphorePermits = this.maxConnectionsPerHost * this.maxRequestsPerConnection;
        this.semaphore = new Semaphore(semaphorePermits);
        LOGGER.info(String.format("Semaphore initialized with %d permits.", semaphorePermits));
    }

    @Override
    public ResultSetFuture insertAsync(CassandraPacket cassandraPacket) {
        return this.insertAsync(cassandraPacket.getId(), cassandraPacket.getPacket());
    }

    private ResultSetFuture insertAsync(UUID id, ByteBuffer packet) {
        this.lockSemaphore();
        BoundStatement boundStatement = this.insertPreparedStatement.bind(new Object[]{id, packet});
        ResultSetFuture resultSetFuture = this.session.executeAsync((Statement)boundStatement);
        Futures.addCallback((com.google.common.util.concurrent.ListenableFuture)resultSetFuture, (FutureCallback)new FutureCallback<ResultSet>(){

            public void onSuccess(ResultSet rows) {
                PacketRepositoryImpl.this.unlockSemaphore();
            }

            public void onFailure(Throwable throwable) {
                PacketRepositoryImpl.this.unlockSemaphore();
            }
        }, (Executor)MoreExecutors.newDirectExecutorService());
        return resultSetFuture;
    }

    @Override
    public ResultSetFuture selectAsync(UUID id, final OnSuccessCallback onSuccessCallback) {
        this.lockSemaphore();
        BoundStatement boundStatement = this.selectPreparedStatement.bind(new Object[]{id});
        ResultSetFuture resultSetFuture = this.session.executeAsync((Statement)boundStatement);
        Futures.addCallback((com.google.common.util.concurrent.ListenableFuture)resultSetFuture, (FutureCallback)new FutureCallback<ResultSet>(){

            public void onSuccess(ResultSet result) {
                Row row = result.one();
                UUID id = (UUID)row.get("id", UUID.class);
                ByteBuffer rawPacket = (ByteBuffer)row.get("packet", ByteBuffer.class);
                PacketRepositoryImpl.this.unlockSemaphore();
                CassandraPacket packet = new CassandraPacket();
                packet.setId(id);
                packet.setPacket(rawPacket);
                onSuccessCallback.onSuccess(packet);
            }

            public void onFailure(Throwable throwable) {
                PacketRepositoryImpl.this.unlockSemaphore();
            }
        }, (Executor)MoreExecutors.newDirectExecutorService());
        return resultSetFuture;
    }

    private void lockSemaphore() {
        try {
            this.semaphore.acquire();
        }
        catch (InterruptedException exception) {
            LOGGER.error(exception.getMessage(), (Throwable)exception);
        }
    }

    private void unlockSemaphore() {
        this.semaphore.release();
    }

    private void insertAsyncSpringData(CassandraPacket cassandraPacket) {
        this.lockSemaphore();
        ListenableFuture listenableFuture = this.asyncCassandraTemplate.insert((Object)cassandraPacket);
        listenableFuture.addCallback(this::onSuccess, this::onFailure);
    }

    private void onSuccess(CassandraPacket cassandraPacket) {
        this.unlockSemaphore();
    }

    private void onFailure(Throwable throwable) {
        LOGGER.error(throwable.getMessage(), throwable);
        this.unlockSemaphore();
    }
}

