/*
 * Decompiled with CFR 0.152.
 */
package org.jruby.ext.thread;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyModule;
import org.jruby.RubyNumeric;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.api.Convert;
import org.jruby.api.Error;
import org.jruby.ast.util.ArgsUtil;
import org.jruby.ext.thread.Queue;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;

@JRubyClass(name={"SizedQueue"}, parent="Queue")
public class SizedQueue
extends Queue {
    private final RubyThread.Task<IRubyObject, IRubyObject> blockingPutTask = new RubyThread.Task<IRubyObject, IRubyObject>(){

        @Override
        public IRubyObject run(ThreadContext context, IRubyObject value2) throws InterruptedException {
            SizedQueue.this.putInternal(context, value2);
            return SizedQueue.this;
        }

        @Override
        public void wakeup(RubyThread thread2, IRubyObject value2) {
            thread2.getNativeThread().interrupt();
        }
    };

    protected SizedQueue(Ruby runtime2, RubyClass type2) {
        super(runtime2, type2);
    }

    public SizedQueue(Ruby runtime2, RubyClass type2, int max2) {
        super(runtime2, type2);
        ThreadContext context = runtime2.getCurrentContext();
        this.initialize(context, Convert.asFixnum(context, max2));
    }

    public static RubyClass setup(ThreadContext context, RubyClass Thread2, RubyClass Queue2, RubyClass Object2) {
        return (RubyClass)Object2.setConstant(context, "SizedQueue", (IRubyObject)((RubyModule)((RubyClass)Thread2.defineClassUnder(context, "SizedQueue", Queue2, SizedQueue::new)).reifiedClass(SizedQueue.class)).defineMethods(context, SizedQueue.class));
    }

    @JRubyMethod
    public RubyNumeric max(ThreadContext context) {
        return Convert.asFixnum(context, this.capacity);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @JRubyMethod(name={"max="})
    public synchronized IRubyObject max_set(ThreadContext context, IRubyObject arg2) {
        this.initializedCheck(context);
        int max2 = Convert.toInt(context, arg2);
        int diff = 0;
        if (max2 <= 0) {
            throw Error.argumentError(context, "queue size must be positive");
        }
        this.fullyLock();
        try {
            if (this.count.get() >= this.capacity && max2 > this.capacity) {
                diff = max2 - this.capacity;
            }
            this.capacity = max2;
            while (diff-- > 0) {
                this.notFull.signal();
            }
            IRubyObject iRubyObject = arg2;
            return iRubyObject;
        }
        finally {
            this.fullyUnlock();
        }
    }

    @Override
    @JRubyMethod(name={"initialize"}, visibility=Visibility.PRIVATE)
    public synchronized IRubyObject initialize(ThreadContext context, IRubyObject arg2) {
        this.capacity = Integer.MAX_VALUE;
        this.max_set(context, arg2);
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive exception aggregation
     */
    @Override
    @JRubyMethod
    public RubyNumeric num_waiting(ThreadContext context) {
        this.initializedCheck(context);
        ReentrantLock takeLock = this.takeLock;
        ReentrantLock putLock = this.putLock;
        try {
            takeLock.lockInterruptibly();
            try {
                putLock.lockInterruptibly();
                try {
                    RubyFixnum rubyFixnum = Convert.asFixnum(context, takeLock.getWaitQueueLength(this.notEmpty) + putLock.getWaitQueueLength(this.notFull));
                    putLock.unlock();
                    return rubyFixnum;
                }
                catch (Throwable throwable) {
                    putLock.unlock();
                    throw throwable;
                }
            }
            finally {
                takeLock.unlock();
            }
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "num_waiting");
        }
    }

    @Override
    @JRubyMethod(name={"push", "<<", "enq"})
    public IRubyObject push(ThreadContext context, IRubyObject arg0) {
        this.initializedCheck(context);
        try {
            return context.getThread().executeTaskBlocking(context, arg0, this.blockingPutTask);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "push");
        }
    }

    @JRubyMethod(name={"push", "<<", "enq"})
    public IRubyObject push(ThreadContext context, IRubyObject arg0, IRubyObject nonblockOrOpts) {
        this.initializedCheck(context);
        boolean nonblock2 = false;
        long timeoutNS = 0L;
        RubyHash opts = ArgsUtil.extractKeywords(nonblockOrOpts);
        if (opts != null) {
            IRubyObject _timeout = ArgsUtil.extractKeywordArg(context, "timeout", opts);
            if (!_timeout.isNil() && (timeoutNS = SizedQueue.queueTimeoutToNanos(context, _timeout)) == 0L && this.count.get() == this.capacity) {
                return context.nil;
            }
        } else {
            nonblock2 = nonblockOrOpts.isTrue();
        }
        return this.pushCommon(context, arg0, nonblock2, timeoutNS);
    }

    @JRubyMethod(name={"push", "<<", "enq"})
    public IRubyObject push(ThreadContext context, IRubyObject arg0, IRubyObject _nonblock, IRubyObject _opts) {
        this.initializedCheck(context);
        boolean nonblock2 = _nonblock.isTrue();
        long timeoutNS = 0L;
        IRubyObject _timeout = ArgsUtil.extractKeywordArg(context, "timeout", _opts);
        if (!_timeout.isNil()) {
            if (nonblock2) {
                throw Error.argumentError(context, "can't set a timeout if non_block is enabled");
            }
            timeoutNS = SizedQueue.queueTimeoutToNanos(context, _timeout);
            if (timeoutNS == 0L && this.count.get() == this.capacity) {
                return context.nil;
            }
        }
        return this.pushCommon(context, arg0, nonblock2, timeoutNS);
    }

    private IRubyObject pushCommon(ThreadContext context, IRubyObject arg0, boolean nonblock2, long timeoutNS) {
        try {
            RubyThread thread2 = context.getThread();
            if (nonblock2) {
                if (!this.offerInternal(context, arg0)) {
                    throw context.runtime.newThreadError("queue full");
                }
                return this;
            }
            RubyThread.Task<IRubyObject, IRubyObject> task = timeoutNS != 0L ? new BlockingOfferTask(timeoutNS) : this.blockingPutTask;
            return thread2.executeTaskBlocking(context, arg0, task);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "push");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean offerInternal(ThreadContext context, IRubyObject e) {
        int c;
        if (e == null) {
            throw new NullPointerException();
        }
        AtomicInteger count2 = this.count;
        if (count2.get() == this.capacity) {
            return false;
        }
        Queue.Node node = new Queue.Node(e);
        ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (this.closed) {
                this.raiseClosedError(context);
            }
            if (count2.get() == this.capacity) {
                boolean bl = false;
                return bl;
            }
            this.enqueue(node);
            c = count2.getAndIncrement();
            if (c + 1 < this.capacity) {
                this.notFull.signal();
            }
        }
        finally {
            putLock.unlock();
        }
        if (c == 0) {
            this.signalNotEmpty();
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean offerInternal(ThreadContext context, IRubyObject e, long timeout2, TimeUnit unit) throws InterruptedException {
        int c;
        if (e == null) {
            throw new NullPointerException();
        }
        long nanos = unit.toNanos(timeout2);
        ReentrantLock putLock = this.putLock;
        AtomicInteger count2 = this.count;
        putLock.lockInterruptibly();
        try {
            boolean isClosed;
            if (this.closed) {
                this.raiseClosedError(context);
            }
            while (!(isClosed = this.closed) && count2.get() == this.capacity) {
                if (nanos <= 0L) {
                    boolean bl = false;
                    return bl;
                }
                nanos = this.notFull.awaitNanos(nanos);
            }
            if (isClosed) {
                this.notFull.signal();
                this.raiseClosedError(context);
            }
            this.enqueue(new Queue.Node(e));
            c = count2.getAndIncrement();
            if (c + 1 < this.capacity) {
                this.notFull.signal();
            }
        }
        finally {
            putLock.unlock();
        }
        if (c == 0) {
            this.signalNotEmpty();
        }
        return true;
    }

    @Deprecated
    public IRubyObject push(ThreadContext context, IRubyObject[] argv2) {
        return switch (argv2.length) {
            case 1 -> this.push(context, argv2[0]);
            case 2 -> this.push(context, argv2[0], argv2[1]);
            default -> throw Error.argumentError(context, argv2.length, 1, 2);
        };
    }

    private final class BlockingOfferTask
    implements RubyThread.Task<IRubyObject, IRubyObject> {
        private final long timeoutNS;

        public BlockingOfferTask(long timeoutNS) {
            this.timeoutNS = timeoutNS;
        }

        @Override
        public IRubyObject run(ThreadContext context, IRubyObject value2) throws InterruptedException {
            boolean result2 = SizedQueue.this.offerInternal(context, value2, this.timeoutNS, TimeUnit.NANOSECONDS);
            return !result2 ? context.nil : SizedQueue.this;
        }

        @Override
        public void wakeup(RubyThread thread2, IRubyObject sizedQueue) {
            thread2.getNativeThread().interrupt();
        }
    }
}

