import {Reply, SerializedReply, errStatus} from '@codesphere/reply-common/lib/Reply';
import {CommonReconnectingWebSocket} from '@codesphere/stubs-common/lib/messaging/CommonReconnectingWebSocket';
import {has} from '@codesphere/utils-common/lib/has';
import {Sequential} from '@codesphere/utils-common/lib/Sequential';
import {COMPLETE_METHOD} from '../stub/reservedRpcMethods';
import {CompleteListener, MessageListener, parse, send} from './utils';

/**
 * Base class of server streams and bidi streams.
 * Can send and receive messages from a pub sub server.
 */
export abstract class WebSocketStreamStub<ClientMessageT, ServerMessageT> {
    protected readonly socket: CommonReconnectingWebSocket;
    protected readonly method: string;
    protected readonly endpointId: number;

    protected completed: boolean = false;
    protected readonly messageListeners: MessageListener<ServerMessageT>[] = [];
    protected readonly completeListeners: CompleteListener[] = [];

    protected constructor(
        socket: CommonReconnectingWebSocket,
        endpointIdSequential: Sequential,
        method: string
    ) {
        this.socket = socket;
        this.method = method;
        this.endpointId = endpointIdSequential.next();

        this.socket.onClose(message => {
            this.completed = true;
            this.completeListeners.forEach(listener => listener(errStatus(message)));
            this.socket.removeEventListener('message', messageListener);
        });
        const messageListener: (data: string) => void = (data) =>
            this.parseAndForwardServerMessage(data, messageListener);
        this.socket.onMessage(messageListener);
    }

    public onCompleted(listener: CompleteListener): void {
        this.completeListeners.push(listener);
    }

    public onMessage(listener: MessageListener<ServerMessageT>): void {
        this.messageListeners.push(listener);
    }

    public complete(end: Reply): void {
        send<SerializedReply>(this.socket, {
            endpointId: this.endpointId,
            method: COMPLETE_METHOD,
            args: end.toSerializedReply(),
        });
    }

    protected send(message: ClientMessageT): void {
        if (this.completed) {
            throw new Error(
                'Can not send messages on a completed bidi stream'
                + ` (endpointId: ${this.endpointId}).`);
        }

        send(this.socket, {
            endpointId: this.endpointId,
            method: this.method,
            args: message,
        });
    }

    private parseAndForwardServerMessage(
        data: string, messageListener: (data: string) => void,
    ): void {
        const message = parse<ServerMessageT | SerializedReply>(data);
        if (message.endpointId !== this.endpointId) {
            return;
        }
        if (has(message.complete) && message.complete) {
            const reply = Reply.createFromSerializedReply(message.reply as SerializedReply);
            this.completed = true;
            this.completeListeners.forEach(listener => listener(reply));
            this.socket.removeEventListener('message', messageListener);
        } else {
            this.messageListeners.forEach(listener => listener(message.reply as ServerMessageT));
        }
    }
}
