/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.microprofile.rest.client.tck.sse;

import jakarta.ws.rs.sse.InboundSseEvent;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest;
import org.eclipse.microprofile.rest.client.tck.sse.HttpSseServer;
import org.eclipse.microprofile.rest.client.tck.sse.MyEventSource;
import org.eclipse.microprofile.rest.client.tck.sse.MyEventSourceServlet;
import org.eclipse.microprofile.rest.client.tck.sse.RsSseClient;
import org.eclipse.microprofile.rest.client.tck.sse.RsWeatherEventClient;
import org.eclipse.microprofile.rest.client.tck.sse.WeatherEvent;
import org.eclipse.microprofile.rest.client.tck.sse.WeatherEventProvider;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.Asset;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.log4testng.Logger;

public class BasicReactiveStreamsTest
extends AbstractSseTest {
    private static final Logger LOG = Logger.getLogger(BasicReactiveStreamsTest.class);

    @Deployment
    public static WebArchive createDeployment() {
        WebArchive webArchive = (WebArchive)((WebArchive)((WebArchive)ShrinkWrap.create(WebArchive.class, (String)(BasicReactiveStreamsTest.class.getSimpleName() + ".war"))).addClasses(new Class[]{AbstractSseTest.class, BasicReactiveStreamsTest.class, HttpSseServer.class, MyEventSource.class, MyEventSourceServlet.class, RsSseClient.class, RsWeatherEventClient.class, WeatherEvent.class, WeatherEventProvider.class})).addAsWebInfResource((Asset)EmptyAsset.INSTANCE, "beans.xml");
        return webArchive;
    }

    @Override
    @Test
    public void testDataOnlySse_InboundSseEvent() throws Exception {
        CountDownLatch resultsLatch = new CountDownLatch(3);
        AtomicReference subscriptionException = new AtomicReference();
        AtomicReference<Throwable> serverException = BasicReactiveStreamsTest.launchServer(resultsLatch, es -> {
            es.emitData("foo");
            es.emitData("bar");
            es.emitData("baz");
        });
        RsSseClient client = (RsSseClient)RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
        Publisher<InboundSseEvent> publisher = client.getEvents();
        InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3L, resultsLatch);
        publisher.subscribe((Subscriber)subscriber);
        Assert.assertTrue((boolean)resultsLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals(subscriber.data, new HashSet<String>(Arrays.asList("foo", "bar", "baz")));
        Assert.assertNull((Object)serverException.get());
        Assert.assertNull(subscriptionException.get());
    }

    @Override
    @Test
    public void testDataOnlySse_String() throws Exception {
        LOG.debug((Object)"testDataOnlySse_String");
        CountDownLatch resultsLatch = new CountDownLatch(3);
        AtomicReference<Throwable> serverException = BasicReactiveStreamsTest.launchServer(resultsLatch, es -> {
            es.emitData("foo2");
            es.emitData("bar2");
            es.emitData("baz2");
        });
        RsSseClient client = (RsSseClient)RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
        Publisher<String> publisher = client.getStrings();
        StringSubscriber subscriber = new StringSubscriber(3L, resultsLatch);
        publisher.subscribe((Subscriber)subscriber);
        Assert.assertTrue((boolean)resultsLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals(subscriber.eventStrings, new HashSet<String>(Arrays.asList("foo2", "bar2", "baz2")));
        Assert.assertNull((Object)serverException.get());
        Assert.assertNull((Object)subscriber.throwable);
    }

    @Override
    @Test
    public void testDataOnlySse_JsonObject() throws Exception {
        LOG.debug((Object)"testDataOnlySse_JsonObject");
        CountDownLatch resultsLatch = new CountDownLatch(3);
        AtomicReference<Throwable> serverException = BasicReactiveStreamsTest.launchServer(resultsLatch, es -> {
            es.emitData("{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}");
            es.emitData("{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}");
            es.emitData("{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}");
        });
        RsWeatherEventClient client = (RsWeatherEventClient)RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsWeatherEventClient.class);
        Publisher<WeatherEvent> publisher = client.getEvents();
        WeatherEventSubscriber subscriber = new WeatherEventSubscriber(3L, resultsLatch);
        publisher.subscribe((Subscriber)subscriber);
        Assert.assertTrue((boolean)resultsLatch.await(30L, TimeUnit.SECONDS));
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
        Assert.assertEquals(subscriber.weatherEvents, new HashSet<WeatherEvent>(Arrays.asList(new WeatherEvent(df.parse("2020-01-21"), "Significant snowfall"), new WeatherEvent(df.parse("2020-02-16"), "Hail storm"), new WeatherEvent(df.parse("2020-04-12"), "Blizzard"))));
        Assert.assertNull((Object)serverException.get());
        Assert.assertNull((Object)subscriber.throwable);
    }

    @Override
    @Test
    public void testCommentOnlySse() throws Exception {
        CountDownLatch resultsLatch = new CountDownLatch(3);
        AtomicReference subscriptionException = new AtomicReference();
        AtomicReference<Throwable> serverException = BasicReactiveStreamsTest.launchServer(resultsLatch, es -> {
            es.emitComment("huey");
            es.emitComment("dewey");
            es.emitComment("louie");
        });
        RsSseClient client = (RsSseClient)RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
        Publisher<InboundSseEvent> publisher = client.getEvents();
        InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3L, resultsLatch);
        publisher.subscribe((Subscriber)subscriber);
        Assert.assertTrue((boolean)resultsLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals(subscriber.comments, new HashSet<String>(Arrays.asList("huey", "dewey", "louie")));
        Assert.assertNull((Object)serverException.get());
        Assert.assertNull(subscriptionException.get());
    }

    @Override
    @Test
    public void testNamedEventSse() throws Exception {
        CountDownLatch resultsLatch = new CountDownLatch(3);
        AtomicReference subscriptionException = new AtomicReference();
        AtomicReference<Throwable> serverException = BasicReactiveStreamsTest.launchServer(resultsLatch, es -> {
            es.emitNamedEvent("1", "{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}");
            BasicReactiveStreamsTest.sleep(500L);
            es.emitNamedEvent("2", "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}");
            BasicReactiveStreamsTest.sleep(500L);
            es.emitNamedEvent("3", "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}");
        });
        RsSseClient client = (RsSseClient)RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
        Publisher<InboundSseEvent> publisher = client.getEvents();
        InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3L, resultsLatch);
        publisher.subscribe((Subscriber)subscriber);
        Assert.assertTrue((boolean)resultsLatch.await(40L, TimeUnit.SECONDS));
        Assert.assertEquals(subscriber.names, new HashSet<String>(Arrays.asList("1", "2", "3")));
        Assert.assertEquals(subscriber.data, new HashSet<String>(Arrays.asList("{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}", "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}", "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}")));
        Assert.assertNull((Object)serverException.get());
        Assert.assertNull(subscriptionException.get());
    }

    @Override
    @Test
    public void testServerClosesConnection() throws Exception {
        CountDownLatch resultsLatch = new CountDownLatch(6);
        AtomicReference subscriptionException = new AtomicReference();
        AtomicReference<Throwable> serverException = BasicReactiveStreamsTest.launchServer(resultsLatch, es -> {
            es.emitData("one");
            es.emitData("two");
            BasicReactiveStreamsTest.sleep(500L);
            es.emitData("three");
            BasicReactiveStreamsTest.sleep(500L);
            es.emitData("four");
            es.emitData("five");
            BasicReactiveStreamsTest.sleep(500L);
            es.close();
        });
        RsSseClient client = (RsSseClient)RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
        Publisher<InboundSseEvent> publisher = client.getEvents();
        InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(20L, resultsLatch){

            @Override
            public void onComplete() {
                super.onComplete();
                this.eventLatch.countDown();
            }
        };
        publisher.subscribe((Subscriber)subscriber);
        Assert.assertTrue((boolean)resultsLatch.await(45L, TimeUnit.SECONDS));
        Assert.assertEquals(subscriber.data, new HashSet<String>(Arrays.asList("one", "two", "three", "four", "five")));
        Assert.assertTrue((boolean)subscriber.completed);
        Assert.assertNull((Object)serverException.get());
        Assert.assertNull(subscriptionException.get());
    }

    private static class InboundSseEventSubscriber
    implements Subscriber<InboundSseEvent>,
    AutoCloseable {
        final Set<String> data = new HashSet<String>();
        final Set<String> comments = new HashSet<String>();
        final Set<String> names = new HashSet<String>();
        final Set<String> ids = new HashSet<String>();
        final CountDownLatch eventLatch;
        Throwable throwable;
        boolean completed;
        Subscription subscription;
        long requestedEvents;

        InboundSseEventSubscriber(long requestedEvents, CountDownLatch eventLatch) {
            this.requestedEvents = requestedEvents;
            this.eventLatch = eventLatch;
        }

        public void onSubscribe(Subscription s) {
            LOG.debug((Object)("InboundSseEventSubscriber onSubscribe " + s));
            this.subscription = s;
            s.request(this.requestedEvents);
        }

        public void onNext(InboundSseEvent event) {
            LOG.debug((Object)("InboundSseEventSubscriber onNext " + event));
            this.data.add(event.readData());
            this.comments.add(event.getComment());
            this.names.add(event.getName());
            this.ids.add(event.getId());
            this.eventLatch.countDown();
        }

        public void onError(Throwable t) {
            LOG.debug((Object)("InboundSseEventSubscriber onError " + t));
            this.throwable = t;
        }

        public void onComplete() {
            LOG.debug((Object)"InboundSseEventSubscriber onComplete");
            this.completed = true;
        }

        @Override
        public void close() throws Exception {
            LOG.debug((Object)"InboundSseEventSubscriber close");
            this.subscription.cancel();
        }
    }

    private static class StringSubscriber
    implements Subscriber<String>,
    AutoCloseable {
        final Set<String> eventStrings = new HashSet<String>();
        final CountDownLatch eventLatch;
        Throwable throwable;
        Subscription subscription;
        long requestedEvents;

        StringSubscriber(long requestedEvents, CountDownLatch eventLatch) {
            this.requestedEvents = requestedEvents;
            this.eventLatch = eventLatch;
        }

        public void onSubscribe(Subscription s) {
            LOG.debug((Object)("StringSubscriber onSubscribe " + s));
            this.subscription = s;
            s.request(this.requestedEvents);
        }

        public void onNext(String s) {
            LOG.debug((Object)("StringSubscriber onNext " + s));
            this.eventStrings.add(s);
            this.eventLatch.countDown();
        }

        public void onError(Throwable t) {
            LOG.debug((Object)("StringSubscriber onError " + t));
            this.throwable = t;
        }

        public void onComplete() {
            LOG.debug((Object)"StringSubscriber onComplete");
        }

        @Override
        public void close() throws Exception {
            LOG.debug((Object)"StringSubscriber close");
            this.subscription.cancel();
        }
    }

    private static class WeatherEventSubscriber
    implements Subscriber<WeatherEvent>,
    AutoCloseable {
        final Set<WeatherEvent> weatherEvents = new HashSet<WeatherEvent>();
        final CountDownLatch eventLatch;
        Throwable throwable;
        Subscription subscription;
        long requestedEvents;

        WeatherEventSubscriber(long requestedEvents, CountDownLatch eventLatch) {
            this.requestedEvents = requestedEvents;
            this.eventLatch = eventLatch;
        }

        public void onSubscribe(Subscription s) {
            LOG.debug((Object)("WeatherEventSubscriber onSubscribe " + s));
            this.subscription = s;
            s.request(this.requestedEvents);
        }

        public void onNext(WeatherEvent s) {
            LOG.debug((Object)("WeatherEventSubscriber onNext " + s));
            this.weatherEvents.add(s);
            this.eventLatch.countDown();
        }

        public void onError(Throwable t) {
            LOG.debug((Object)("WeatherEventSubscriber onError " + t));
            this.throwable = t;
        }

        public void onComplete() {
            LOG.debug((Object)"WeatherEventSubscriber onComplete");
        }

        @Override
        public void close() throws Exception {
            LOG.debug((Object)"WeatherEventSubscriber close");
            this.subscription.cancel();
        }
    }
}

