package com.corundumstudio.socketio.store;

import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import io.netty.util.internal.PlatformDependent;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.redisson.Redisson;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;

/* loaded from: classes.dex */
public class RedissonPubSubStore implements PubSubStore {
    private final ConcurrentMap<String, Queue<Integer>> map = PlatformDependent.newConcurrentHashMap();
    private final Long nodeId;
    private final Redisson redissonPub;
    private final Redisson redissonSub;

    public RedissonPubSubStore(Redisson redisson, Redisson redisson2, Long l) {
        this.redissonPub = redisson;
        this.redissonSub = redisson2;
        this.nodeId = l;
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void publish(String str, PubSubMessage pubSubMessage) {
        pubSubMessage.setNodeId(this.nodeId);
        this.redissonPub.getTopic(str).publish(pubSubMessage);
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void shutdown() {
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public <T extends PubSubMessage> void subscribe(String str, final PubSubListener<T> pubSubListener, Class<T> cls) {
        ConcurrentLinkedQueue concurrentLinkedQueue;
        int addListener = this.redissonSub.getTopic(str).addListener(new MessageListener<T>() { // from class: com.corundumstudio.socketio.store.RedissonPubSubStore.1
            /* JADX WARN: Incorrect types in method signature: (TT;)V */
            public void onMessage(PubSubMessage pubSubMessage) {
                if (RedissonPubSubStore.this.nodeId.equals(pubSubMessage.getNodeId())) {
                    return;
                }
                pubSubListener.onMessage(pubSubMessage);
            }
        });
        Queue<Integer> queue = this.map.get(str);
        if (queue == null && (queue = this.map.putIfAbsent(str, (concurrentLinkedQueue = new ConcurrentLinkedQueue()))) == null) {
            queue = concurrentLinkedQueue;
        }
        queue.add(Integer.valueOf(addListener));
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void unsubscribe(String str) {
        Queue<Integer> remove = this.map.remove(str);
        RTopic topic = this.redissonSub.getTopic(str);
        Iterator<Integer> it = remove.iterator();
        while (it.hasNext()) {
            topic.removeListener(it.next().intValue());
        }
    }
}
