package org.springframework.integration.ip.tcp.connection;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLSession;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.util.CompositeExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/ip/tcp/connection/TcpNioConnection.class */
public class TcpNioConnection extends TcpConnectionSupport {
    private static final long DEFAULT_PIPE_TIMEOUT = 60000;
    private final SocketChannel socketChannel;
    private final ChannelOutputStream channelOutputStream;
    private final ChannelInputStream channelInputStream;
    private volatile OutputStream bufferedOutputStream;
    private volatile boolean usingDirectBuffers;
    private volatile CompositeExecutor taskExecutor;
    private volatile ByteBuffer rawBuffer;
    private volatile int maxMessageSize;
    private volatile long lastRead;
    private volatile long lastSend;
    private final AtomicInteger executionControl;
    private volatile boolean writingToPipe;
    private volatile CountDownLatch writingLatch;
    private volatile long pipeTimeout;
    private volatile boolean timedOut;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/integration/ip/tcp/connection/TcpNioConnection$ChannelInputStream.class */
    public class ChannelInputStream extends InputStream {
        private static final int BUFFER_LIMIT = 5;
        private volatile byte[] currentBuffer;
        private volatile int currentOffset;
        private volatile boolean isClosed;
        private final BlockingQueue<byte[]> buffers = new LinkedBlockingQueue(BUFFER_LIMIT);
        private final AtomicInteger available = new AtomicInteger();

        ChannelInputStream() {
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            Assert.notNull(bArr, "byte[] cannot be null");
            if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
                throw new IndexOutOfBoundsException();
            }
            if (i2 == 0) {
                return 0;
            }
            int i3 = 0;
            while (true) {
                if ((this.available.get() > 0 || i3 == 0) && i3 < i2) {
                    int read = read();
                    if (read < 0) {
                        if (i3 == 0) {
                            return -1;
                        }
                        return i3;
                    }
                    int i4 = i3;
                    i3++;
                    bArr[i + i4] = (byte) read;
                }
            }
            return i3;
        }

        @Override // java.io.InputStream
        public synchronized int read() throws IOException {
            if (this.isClosed && this.available.get() == 0) {
                if (TcpNioConnection.this.timedOut) {
                    throw new SocketTimeoutException("Connection has timed out");
                }
                return -1;
            }
            if (this.currentBuffer == null) {
                this.currentBuffer = getNextBuffer();
                this.currentOffset = 0;
                if (this.currentBuffer == null) {
                    if (TcpNioConnection.this.timedOut) {
                        throw new SocketTimeoutException("Connection has timed out");
                    }
                    return -1;
                }
            }
            byte[] bArr = this.currentBuffer;
            int i = this.currentOffset;
            this.currentOffset = i + 1;
            int i2 = bArr[i] & 255;
            this.available.decrementAndGet();
            if (this.currentOffset >= this.currentBuffer.length) {
                this.currentBuffer = null;
            }
            return i2;
        }

        private byte[] getNextBuffer() throws IOException {
            byte[] bArr = null;
            while (bArr == null) {
                try {
                    bArr = this.buffers.poll(1L, TimeUnit.SECONDS);
                    if (bArr == null && this.isClosed) {
                        return null;
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting for data", e);
                }
            }
            return bArr;
        }

        public void write(byte[] bArr, int i) throws IOException {
            if (i > 0) {
                byte[] bArr2 = new byte[i];
                System.arraycopy(bArr, 0, bArr2, 0, i);
                this.available.addAndGet(i);
                if (TcpNioConnection.this.writingLatch != null) {
                    TcpNioConnection.this.writingLatch.countDown();
                }
                try {
                    if (!this.buffers.offer(bArr2, TcpNioConnection.this.pipeTimeout, TimeUnit.MILLISECONDS)) {
                        throw new IOException("Timed out waiting for buffer space");
                    }
                    TcpNioConnection.this.writingLatch = new CountDownLatch(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting for buffer space", e);
                }
            }
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            super.close();
            this.isClosed = true;
        }

        @Override // java.io.InputStream
        public int available() throws IOException {
            return this.available.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/integration/ip/tcp/connection/TcpNioConnection$ChannelOutputStream.class */
    public class ChannelOutputStream extends OutputStream {
        private Selector selector;
        private int soTimeout;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ChannelOutputStream() {
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            doWrite(ByteBuffer.wrap(new byte[]{(byte) i}));
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            TcpNioConnection.this.doClose();
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            doWrite(ByteBuffer.wrap(bArr, i, i2));
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr) throws IOException {
            doWrite(ByteBuffer.wrap(bArr));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void doWrite(ByteBuffer byteBuffer) throws IOException {
            if (TcpNioConnection.this.logger.isDebugEnabled()) {
                TcpNioConnection.this.logger.debug(TcpNioConnection.this.getConnectionId() + " writing " + byteBuffer.remaining());
            }
            TcpNioConnection.this.socketChannel.write(byteBuffer);
            int remaining = byteBuffer.remaining();
            if (remaining == 0) {
                return;
            }
            if (this.selector == null) {
                this.selector = Selector.open();
                this.soTimeout = TcpNioConnection.this.socketChannel.socket().getSoTimeout();
            }
            TcpNioConnection.this.socketChannel.register(this.selector, 4);
            while (remaining > 0) {
                if (this.selector.select(this.soTimeout) == 0) {
                    throw new SocketTimeoutException("Timeout on write");
                }
                this.selector.selectedKeys().clear();
                TcpNioConnection.this.socketChannel.write(byteBuffer);
                remaining = byteBuffer.remaining();
            }
        }
    }

    public TcpNioConnection(SocketChannel socketChannel, boolean z, boolean z2, ApplicationEventPublisher applicationEventPublisher, String str) throws Exception {
        super(socketChannel.socket(), z, z2, applicationEventPublisher, str);
        this.channelInputStream = new ChannelInputStream();
        this.maxMessageSize = 61440;
        this.executionControl = new AtomicInteger();
        this.pipeTimeout = DEFAULT_PIPE_TIMEOUT;
        this.socketChannel = socketChannel;
        if (socketChannel.socket().getReceiveBufferSize() <= 0) {
            int i = this.maxMessageSize;
        }
        this.channelOutputStream = new ChannelOutputStream();
    }

    public void setPipeTimeout(long j) {
        this.pipeTimeout = j;
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnectionSupport, org.springframework.integration.ip.tcp.connection.TcpConnection
    public void close() {
        setNoReadErrorOnClose(true);
        doClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doClose() {
        try {
            this.channelInputStream.close();
        } catch (IOException e) {
        }
        try {
            this.socketChannel.close();
        } catch (Exception e2) {
        }
        super.close();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public boolean isOpen() {
        return this.socketChannel.isOpen();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public void send(Message<?> message) throws Exception {
        synchronized (this.socketChannel) {
            if (this.bufferedOutputStream == null) {
                int sendBufferSize = this.socketChannel.socket().getSendBufferSize();
                this.bufferedOutputStream = new BufferedOutputStream(getChannelOutputStream(), sendBufferSize > 0 ? sendBufferSize : 8192);
            }
            Object fromMessage = getMapper().fromMessage(message);
            this.lastSend = System.currentTimeMillis();
            try {
                getSerializer().serialize(fromMessage, this.bufferedOutputStream);
                this.bufferedOutputStream.flush();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(getConnectionId() + " Message sent " + message);
                }
            } catch (Exception e) {
                publishConnectionExceptionEvent(new MessagingException(message, "Failed TCP serialization", e));
                closeConnection(true);
                throw e;
            }
        }
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public Object getPayload() throws Exception {
        return getDeserializer().deserialize(this.channelInputStream);
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public int getPort() {
        return this.socketChannel.socket().getPort();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public Object getDeserializerStateKey() {
        return this.channelInputStream;
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public SSLSession getSslSession() {
        return null;
    }

    protected ByteBuffer allocate(int i) {
        return this.usingDirectBuffers ? ByteBuffer.allocateDirect(i) : ByteBuffer.allocate(i);
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace(getConnectionId() + " Nio message assembler running...");
        }
        boolean z = true;
        while (z) {
            try {
                try {
                    if (dataAvailable()) {
                        Message<?> convert = convert();
                        if (dataAvailable()) {
                            this.executionControl.incrementAndGet();
                            try {
                                this.taskExecutor.execute2(this);
                            } catch (RejectedExecutionException e) {
                                this.executionControl.decrementAndGet();
                                if (this.logger.isInfoEnabled()) {
                                    this.logger.info(getConnectionId() + " Insufficient threads in the assembler fixed thread pool; consider increasing this task executor pool size; data avail: " + this.channelInputStream.available());
                                }
                            }
                        }
                        this.executionControl.decrementAndGet();
                        if (convert != null) {
                            sendToChannel(convert);
                        }
                    } else {
                        this.executionControl.decrementAndGet();
                    }
                    z = false;
                    try {
                        if (dataAvailable()) {
                            synchronized (this.executionControl) {
                                if (this.executionControl.incrementAndGet() <= 1) {
                                    this.executionControl.set(1);
                                    z = true;
                                } else {
                                    this.executionControl.decrementAndGet();
                                }
                            }
                        }
                        if (z) {
                            if (this.logger.isTraceEnabled()) {
                                this.logger.trace(getConnectionId() + " Nio message assembler continuing...");
                            }
                        } else if (this.logger.isTraceEnabled()) {
                            this.logger.trace(getConnectionId() + " Nio message assembler exiting... avail: " + this.channelInputStream.available());
                        }
                    } catch (IOException e2) {
                        this.logger.error("Exception when checking for assembler", e2);
                    }
                } catch (Throwable th) {
                    boolean z2 = false;
                    try {
                        if (dataAvailable()) {
                            synchronized (this.executionControl) {
                                if (this.executionControl.incrementAndGet() <= 1) {
                                    this.executionControl.set(1);
                                    z2 = true;
                                } else {
                                    this.executionControl.decrementAndGet();
                                }
                            }
                        }
                        if (z2) {
                            if (this.logger.isTraceEnabled()) {
                                this.logger.trace(getConnectionId() + " Nio message assembler continuing...");
                            }
                        } else if (this.logger.isTraceEnabled()) {
                            this.logger.trace(getConnectionId() + " Nio message assembler exiting... avail: " + this.channelInputStream.available());
                        }
                    } catch (IOException e3) {
                        this.logger.error("Exception when checking for assembler", e3);
                        throw th;
                    }
                    throw th;
                }
            } catch (Exception e4) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.error("Read exception " + getConnectionId(), e4);
                } else if (!isNoReadErrorOnClose()) {
                    this.logger.error("Read exception " + getConnectionId() + " " + e4.getClass().getSimpleName() + ":" + e4.getCause() + ":" + e4.getMessage());
                } else if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Read exception " + getConnectionId() + " " + e4.getClass().getSimpleName() + ":" + e4.getCause() + ":" + e4.getMessage());
                }
                closeConnection(true);
                sendExceptionToListener(e4);
                boolean z3 = false;
                try {
                    if (dataAvailable()) {
                        synchronized (this.executionControl) {
                            if (this.executionControl.incrementAndGet() <= 1) {
                                this.executionControl.set(1);
                                z3 = true;
                            } else {
                                this.executionControl.decrementAndGet();
                            }
                        }
                    }
                    if (z3) {
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace(getConnectionId() + " Nio message assembler continuing...");
                        }
                    } else if (this.logger.isTraceEnabled()) {
                        this.logger.trace(getConnectionId() + " Nio message assembler exiting... avail: " + this.channelInputStream.available());
                    }
                    return;
                } catch (IOException e5) {
                    this.logger.error("Exception when checking for assembler", e5);
                    return;
                }
            }
        }
    }

    private boolean dataAvailable() throws IOException {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace(getConnectionId() + " checking data avail: " + this.channelInputStream.available() + " pending: " + this.writingToPipe);
        }
        return this.writingToPipe || this.channelInputStream.available() > 0;
    }

    private synchronized Message<?> convert() throws Exception {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace(getConnectionId() + " checking data avail (convert): " + this.channelInputStream.available() + " pending: " + this.writingToPipe);
        }
        if (this.channelInputStream.available() <= 0) {
            try {
                if (!this.writingLatch.await(60L, TimeUnit.SECONDS)) {
                    throw new IOException("Timed out waiting for IO");
                }
                if (this.channelInputStream.available() <= 0) {
                    return null;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted waiting for IO");
            }
        }
        try {
            return getMapper().toMessage((TcpConnection) this);
        } catch (Exception e2) {
            closeConnection(true);
            if (!(e2 instanceof SocketTimeoutException)) {
                if (e2 instanceof SoftEndOfStreamException) {
                    return null;
                }
                throw e2;
            }
            if (!this.logger.isDebugEnabled()) {
                return null;
            }
            this.logger.debug("Closing socket after timeout " + getConnectionId());
            return null;
        }
    }

    private void sendToChannel(Message<?> message) {
        if (message != null) {
            try {
                TcpListener listener = getListener();
                if (listener == null) {
                    throw new NoListenerException("No listener");
                }
                listener.onMessage(message);
            } catch (Exception e) {
                if (!(e instanceof NoListenerException)) {
                    this.logger.error("Exception sending message: " + message, e);
                } else if (this.logger.isWarnEnabled()) {
                    this.logger.warn("Unexpected message - no endpoint registered with connection: " + getConnectionId() + " - " + message);
                }
            }
        }
    }

    private void doRead() throws Exception {
        if (this.rawBuffer == null) {
            this.rawBuffer = allocate(this.maxMessageSize);
        }
        this.writingLatch = new CountDownLatch(1);
        this.writingToPipe = true;
        try {
            try {
                if (this.taskExecutor == null) {
                    ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
                    this.taskExecutor = new CompositeExecutor(newCachedThreadPool, newCachedThreadPool);
                }
                checkForAssembler();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Before read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit());
                }
                if (this.socketChannel.read(this.rawBuffer) < 0) {
                    this.writingToPipe = false;
                    closeConnection(true);
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("After read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit());
                }
                this.rawBuffer.flip();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("After flip:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit());
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Read " + this.rawBuffer.limit() + " into raw buffer");
                }
                sendToPipe(this.rawBuffer);
                this.writingToPipe = false;
                this.writingLatch.countDown();
            } catch (RejectedExecutionException e) {
                throw e;
            } catch (Exception e2) {
                publishConnectionExceptionEvent(e2);
                throw e2;
            }
        } catch (Throwable th) {
            this.writingToPipe = false;
            this.writingLatch.countDown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendToPipe(ByteBuffer byteBuffer) throws IOException {
        Assert.notNull(byteBuffer, "rawBuffer cannot be null");
        if (this.logger.isTraceEnabled()) {
            this.logger.trace(getConnectionId() + " Sending " + byteBuffer.limit() + " to pipe");
        }
        this.channelInputStream.write(byteBuffer.array(), byteBuffer.limit());
        byteBuffer.clear();
    }

    private void checkForAssembler() {
        synchronized (this.executionControl) {
            if (this.executionControl.incrementAndGet() <= 1) {
                this.executionControl.set(1);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(getConnectionId() + " Running an assembler");
                }
                try {
                    this.taskExecutor.execute2(this);
                } catch (RejectedExecutionException e) {
                    this.executionControl.decrementAndGet();
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info("Insufficient threads in the assembler fixed thread pool; consider increasing this task executor pool size");
                    }
                    throw e;
                }
            } else {
                this.executionControl.decrementAndGet();
            }
        }
    }

    public void readPacket() {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(getConnectionId() + " Reading...");
        }
        try {
            doRead();
        } catch (ClosedChannelException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(getConnectionId() + " Channel is closed");
            }
            closeConnection(true);
        } catch (RejectedExecutionException e2) {
            throw e2;
        } catch (Exception e3) {
            this.logger.error("Exception on Read " + getConnectionId() + " " + e3.getMessage(), e3);
            closeConnection(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void timeout() {
        this.timedOut = true;
        closeConnection(true);
    }

    public void setTaskExecutor(Executor executor) {
        if (executor instanceof CompositeExecutor) {
            this.taskExecutor = (CompositeExecutor) executor;
        } else {
            this.taskExecutor = new CompositeExecutor(executor, executor);
        }
    }

    public void setUsingDirectBuffers(boolean z) {
        this.usingDirectBuffers = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isUsingDirectBuffers() {
        return this.usingDirectBuffers;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelOutputStream getChannelOutputStream() {
        return this.channelOutputStream;
    }

    public long getLastRead() {
        return this.lastRead;
    }

    public void setLastRead(long j) {
        this.lastRead = j;
    }

    public long getLastSend() {
        return this.lastSend;
    }
}
