package org.eclipse.microprofile.rest.client.tck.sse;

import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.sse.InboundSseEvent;
import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.log4testng.Logger;

/* loaded from: input_file:org/eclipse/microprofile/rest/client/tck/sse/BasicReactiveStreamsTest.class */
public class BasicReactiveStreamsTest extends AbstractSseTest {
    private static final Logger LOG = Logger.getLogger(BasicReactiveStreamsTest.class);

    /* loaded from: input_file:org/eclipse/microprofile/rest/client/tck/sse/BasicReactiveStreamsTest$InboundSseEventSubscriber.class */
    private static class InboundSseEventSubscriber implements Subscriber<InboundSseEvent>, AutoCloseable {
        final Set<String> data = new HashSet();
        final Set<String> comments = new HashSet();
        final Set<String> names = new HashSet();
        final Set<String> ids = new HashSet();
        final CountDownLatch eventLatch;
        Throwable throwable;
        boolean completed;
        Subscription subscription;
        long requestedEvents;

        InboundSseEventSubscriber(long j, CountDownLatch countDownLatch) {
            this.requestedEvents = j;
            this.eventLatch = countDownLatch;
        }

        public void onSubscribe(Subscription subscription) {
            BasicReactiveStreamsTest.LOG.debug("InboundSseEventSubscriber onSubscribe " + subscription);
            this.subscription = subscription;
            subscription.request(this.requestedEvents);
        }

        public void onNext(InboundSseEvent inboundSseEvent) {
            BasicReactiveStreamsTest.LOG.debug("InboundSseEventSubscriber onNext " + inboundSseEvent);
            this.data.add(inboundSseEvent.readData());
            this.comments.add(inboundSseEvent.getComment());
            this.names.add(inboundSseEvent.getName());
            this.ids.add(inboundSseEvent.getId());
            this.eventLatch.countDown();
        }

        public void onError(Throwable th) {
            BasicReactiveStreamsTest.LOG.debug("InboundSseEventSubscriber onError " + th);
            this.throwable = th;
        }

        public void onComplete() {
            BasicReactiveStreamsTest.LOG.debug("InboundSseEventSubscriber onComplete");
            this.completed = true;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            BasicReactiveStreamsTest.LOG.debug("InboundSseEventSubscriber close");
            this.subscription.cancel();
        }
    }

    /* loaded from: input_file:org/eclipse/microprofile/rest/client/tck/sse/BasicReactiveStreamsTest$StringSubscriber.class */
    private static class StringSubscriber implements Subscriber<String>, AutoCloseable {
        final Set<String> eventStrings = new HashSet();
        final CountDownLatch eventLatch;
        Throwable throwable;
        Subscription subscription;
        long requestedEvents;

        StringSubscriber(long j, CountDownLatch countDownLatch) {
            this.requestedEvents = j;
            this.eventLatch = countDownLatch;
        }

        public void onSubscribe(Subscription subscription) {
            BasicReactiveStreamsTest.LOG.debug("StringSubscriber onSubscribe " + subscription);
            this.subscription = subscription;
            subscription.request(this.requestedEvents);
        }

        public void onNext(String str) {
            BasicReactiveStreamsTest.LOG.debug("StringSubscriber onNext " + str);
            this.eventStrings.add(str);
            this.eventLatch.countDown();
        }

        public void onError(Throwable th) {
            BasicReactiveStreamsTest.LOG.debug("StringSubscriber onError " + th);
            this.throwable = th;
        }

        public void onComplete() {
            BasicReactiveStreamsTest.LOG.debug("StringSubscriber onComplete");
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            BasicReactiveStreamsTest.LOG.debug("StringSubscriber close");
            this.subscription.cancel();
        }
    }

    /* loaded from: input_file:org/eclipse/microprofile/rest/client/tck/sse/BasicReactiveStreamsTest$WeatherEventSubscriber.class */
    private static class WeatherEventSubscriber implements Subscriber<WeatherEvent>, AutoCloseable {
        final Set<WeatherEvent> weatherEvents = new HashSet();
        final CountDownLatch eventLatch;
        Throwable throwable;
        Subscription subscription;
        long requestedEvents;

        WeatherEventSubscriber(long j, CountDownLatch countDownLatch) {
            this.requestedEvents = j;
            this.eventLatch = countDownLatch;
        }

        public void onSubscribe(Subscription subscription) {
            BasicReactiveStreamsTest.LOG.debug("WeatherEventSubscriber onSubscribe " + subscription);
            this.subscription = subscription;
            subscription.request(this.requestedEvents);
        }

        public void onNext(WeatherEvent weatherEvent) {
            BasicReactiveStreamsTest.LOG.debug("WeatherEventSubscriber onNext " + weatherEvent);
            this.weatherEvents.add(weatherEvent);
            this.eventLatch.countDown();
        }

        public void onError(Throwable th) {
            BasicReactiveStreamsTest.LOG.debug("WeatherEventSubscriber onError " + th);
            this.throwable = th;
        }

        public void onComplete() {
            BasicReactiveStreamsTest.LOG.debug("WeatherEventSubscriber onComplete");
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            BasicReactiveStreamsTest.LOG.debug("WeatherEventSubscriber close");
            this.subscription.cancel();
        }
    }

    @Deployment
    public static WebArchive createDeployment() {
        return ShrinkWrap.create(WebArchive.class, BasicReactiveStreamsTest.class.getSimpleName() + ".war").addClasses(new Class[]{AbstractSseTest.class, BasicReactiveStreamsTest.class, HttpSseServer.class, MyEventSource.class, MyEventSourceServlet.class, RsSseClient.class, RsWeatherEventClient.class, WeatherEvent.class, WeatherEventProvider.class}).addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Override // org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest
    @Test
    public void testDataOnlySse_InboundSseEvent() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference<Throwable> launchServer = launchServer(countDownLatch, myEventSource -> {
            myEventSource.emitData("foo");
            myEventSource.emitData("bar");
            myEventSource.emitData("baz");
        });
        Publisher<InboundSseEvent> events = ((RsSseClient) RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class)).getEvents();
        InboundSseEventSubscriber inboundSseEventSubscriber = new InboundSseEventSubscriber(3L, countDownLatch);
        events.subscribe(inboundSseEventSubscriber);
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals(inboundSseEventSubscriber.data, new HashSet(Arrays.asList("foo", "bar", "baz")));
        Assert.assertNull(launchServer.get());
        Assert.assertNull(atomicReference.get());
    }

    @Override // org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest
    @Test
    public void testDataOnlySse_String() throws Exception {
        LOG.debug("testDataOnlySse_String");
        CountDownLatch countDownLatch = new CountDownLatch(3);
        AtomicReference<Throwable> launchServer = launchServer(countDownLatch, myEventSource -> {
            myEventSource.emitData("foo2");
            myEventSource.emitData("bar2");
            myEventSource.emitData("baz2");
        });
        Publisher<String> strings = ((RsSseClient) RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class)).getStrings();
        StringSubscriber stringSubscriber = new StringSubscriber(3L, countDownLatch);
        strings.subscribe(stringSubscriber);
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals(stringSubscriber.eventStrings, new HashSet(Arrays.asList("foo2", "bar2", "baz2")));
        Assert.assertNull(launchServer.get());
        Assert.assertNull(stringSubscriber.throwable);
    }

    @Override // org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest
    @Test
    public void testDataOnlySse_JsonObject() throws Exception {
        LOG.debug("testDataOnlySse_JsonObject");
        CountDownLatch countDownLatch = new CountDownLatch(3);
        AtomicReference<Throwable> launchServer = launchServer(countDownLatch, myEventSource -> {
            myEventSource.emitData("{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}");
            myEventSource.emitData("{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}");
            myEventSource.emitData("{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}");
        });
        Publisher<WeatherEvent> events = ((RsWeatherEventClient) RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsWeatherEventClient.class)).getEvents();
        WeatherEventSubscriber weatherEventSubscriber = new WeatherEventSubscriber(3L, countDownLatch);
        events.subscribe(weatherEventSubscriber);
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        Assert.assertEquals(weatherEventSubscriber.weatherEvents, new HashSet(Arrays.asList(new WeatherEvent(simpleDateFormat.parse("2020-01-21"), "Significant snowfall"), new WeatherEvent(simpleDateFormat.parse("2020-02-16"), "Hail storm"), new WeatherEvent(simpleDateFormat.parse("2020-04-12"), "Blizzard"))));
        Assert.assertNull(launchServer.get());
        Assert.assertNull(weatherEventSubscriber.throwable);
    }

    @Override // org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest
    @Test
    public void testCommentOnlySse() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference<Throwable> launchServer = launchServer(countDownLatch, myEventSource -> {
            myEventSource.emitComment("huey");
            myEventSource.emitComment("dewey");
            myEventSource.emitComment("louie");
        });
        Publisher<InboundSseEvent> events = ((RsSseClient) RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class)).getEvents();
        InboundSseEventSubscriber inboundSseEventSubscriber = new InboundSseEventSubscriber(3L, countDownLatch);
        events.subscribe(inboundSseEventSubscriber);
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertEquals(inboundSseEventSubscriber.comments, new HashSet(Arrays.asList("huey", "dewey", "louie")));
        Assert.assertNull(launchServer.get());
        Assert.assertNull(atomicReference.get());
    }

    @Override // org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest
    @Test
    public void testNamedEventSse() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference<Throwable> launchServer = launchServer(countDownLatch, myEventSource -> {
            myEventSource.emitNamedEvent("1", "{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}");
            sleep(500L);
            myEventSource.emitNamedEvent("2", "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}");
            sleep(500L);
            myEventSource.emitNamedEvent("3", "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}");
        });
        Publisher<InboundSseEvent> events = ((RsSseClient) RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class)).getEvents();
        InboundSseEventSubscriber inboundSseEventSubscriber = new InboundSseEventSubscriber(3L, countDownLatch);
        events.subscribe(inboundSseEventSubscriber);
        Assert.assertTrue(countDownLatch.await(40L, TimeUnit.SECONDS));
        Assert.assertEquals(inboundSseEventSubscriber.names, new HashSet(Arrays.asList("1", "2", "3")));
        Assert.assertEquals(inboundSseEventSubscriber.data, new HashSet(Arrays.asList("{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}", "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}", "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}")));
        Assert.assertNull(launchServer.get());
        Assert.assertNull(atomicReference.get());
    }

    @Override // org.eclipse.microprofile.rest.client.tck.sse.AbstractSseTest
    @Test
    public void testServerClosesConnection() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference<Throwable> launchServer = launchServer(countDownLatch, myEventSource -> {
            myEventSource.emitData("one");
            myEventSource.emitData("two");
            sleep(500L);
            myEventSource.emitData("three");
            sleep(500L);
            myEventSource.emitData("four");
            myEventSource.emitData("five");
            sleep(500L);
            myEventSource.close();
        });
        Publisher<InboundSseEvent> events = ((RsSseClient) RestClientBuilder.newBuilder().baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class)).getEvents();
        InboundSseEventSubscriber inboundSseEventSubscriber = new InboundSseEventSubscriber(20L, countDownLatch) { // from class: org.eclipse.microprofile.rest.client.tck.sse.BasicReactiveStreamsTest.1
            @Override // org.eclipse.microprofile.rest.client.tck.sse.BasicReactiveStreamsTest.InboundSseEventSubscriber
            public void onComplete() {
                super.onComplete();
                this.eventLatch.countDown();
            }
        };
        events.subscribe(inboundSseEventSubscriber);
        Assert.assertTrue(countDownLatch.await(45L, TimeUnit.SECONDS));
        Assert.assertEquals(inboundSseEventSubscriber.data, new HashSet(Arrays.asList("one", "two", "three", "four", "five")));
        Assert.assertTrue(inboundSseEventSubscriber.completed);
        Assert.assertNull(launchServer.get());
        Assert.assertNull(atomicReference.get());
    }
}
