import { channel, buffers } from 'redux-saga';
import { put, call, fork, take, delay, takeEvery, cancel, getContext } from 'redux-saga/effects';
import { isArray, get, isUndefined } from 'lodash';
import { EVENTS, MARKET_REQUESTS, MARKET_TYPES, SERVICE_NAMES } from 'Constants';
import * as actions from 'Store/actions';
import getEventChannelCreator from 'Store/sagas/utils/getEventChannelCreator';
import { prepareInputUint64Value } from 'Utils/uint64';
import logger from 'Utils/logger';

const log = logger('Quote subscription:');

const UPDATE_SYMBOL_DELAY = 800;
const DEFAULT_BUFFER_SIZE = 1000;

const SUBSCRIBE_TYPE = actions.market.subscribeToSymbolQuote.toString();
const UNSUBSCRIBE_TYPE = actions.market.unsubscribeFromSymbolQuote.toString();

const prepareQuotesData = (data) => {
    const bidPrice = prepareInputUint64Value(get(data, 'bidPrice', 0));
    const askPrice = prepareInputUint64Value(get(data, 'askPrice', 0));
    const time = Number(get(data, 'time', 0));

    return {
        ...data,
        bidPrice,
        askPrice,
        time
    };
};

export default function* initSymbolQuoteSubscription() {
    const marketService = yield getContext(SERVICE_NAMES.MARKET);

    // create custom channel for buffering actions
    const symbolQuoteSubscriptionActionChannel = yield call(channel, buffers.expanding(DEFAULT_BUFFER_SIZE));

    // call run channel watcher and pass custom channel to channel watcher
    yield fork(watchSymbolQuoteSubscriptionActions, symbolQuoteSubscriptionActionChannel, marketService);

    // take every quotes subscribe/unsubscribe actions and put it into custom channel
    yield takeEvery(actions.market.subscribeToSymbolQuote, function* (action) {
        yield put(symbolQuoteSubscriptionActionChannel, action);
    });
    yield takeEvery(actions.market.unsubscribeFromSymbolQuote, function* (action) {
        yield put(symbolQuoteSubscriptionActionChannel, action);
    });
}

function* watchSymbolQuoteSubscriptionActions(customActionsChannel, marketService) {
    const unsubscribeChannels = {};
    let subscriptionsCounters = {};

    while (true) {
        const { type, payload: symbol } = yield take(customActionsChannel);

        log('Got subscription action: %s, symbols: %o', type, symbol);

        const symbols = isArray(symbol) ? symbol : [symbol];
        const updatedSubscriptionCounters = getUpdatedSubscriptionCounters(subscriptionsCounters, symbols, type);

        log('Counters current: %o, updated: %o', subscriptionsCounters, updatedSubscriptionCounters);

        if (type === SUBSCRIBE_TYPE) {
            const symbolsToSubscribe = getSymbolsToSubscribe(subscriptionsCounters, updatedSubscriptionCounters);
            log('Symbols to subscribe: %o', symbolsToSubscribe);

            subscriptionsCounters = updatedSubscriptionCounters;

            for (let symbolKey of symbolsToSubscribe) {
                const unsubscribeChannel = yield call(channel);
                unsubscribeChannels[symbolKey] = unsubscribeChannel;
                yield fork(handleSubscribeToSymbolQuote, symbolKey, unsubscribeChannel, marketService);
            }
        }

        if (type === UNSUBSCRIBE_TYPE) {
            const symbolsForUnsubscribe = getSymbolsToUnsubscribe(subscriptionsCounters, updatedSubscriptionCounters);
            subscriptionsCounters = updatedSubscriptionCounters;

            log('Symbols to unsubscribe: %o', symbolsForUnsubscribe);

            for (let symbolKey of symbolsForUnsubscribe) {
                if (unsubscribeChannels.hasOwnProperty(symbolKey)) {
                    yield put(unsubscribeChannels[symbolKey], true);
                }
            }
        }
    }
}

const getSymbolsToSubscribe = (currentCounters, updatedCounters) =>
    Object.keys(updatedCounters)
        .filter((symbol) => updatedCounters[symbol] > 0)
        .filter((symbol) => !currentCounters[symbol]);

const getSymbolsToUnsubscribe = (currentCounters, updatedCounters) =>
    Object.keys(currentCounters)
        .filter((symbol) => currentCounters[symbol] > 0)
        .filter((symbol) => updatedCounters[symbol] <= 0);

const getUpdatedSubscriptionCounters = (subscriptionsCounters, symbols, type) => {
    const summand = type === SUBSCRIBE_TYPE ? 1 : -1;
    const counters = { ...subscriptionsCounters };
    for (let symbol of symbols) {
        if (isUndefined(counters[symbol])) {
            counters[symbol] = summand;
        } else {
            counters[symbol] += summand;
        }
    }
    return counters;
};

function* handleSubscribeToSymbolQuote(symbol, unsubscribeChannel, marketService) {
    try {
        const { subscriptionId } = yield marketService.requestSubscribe(MARKET_REQUESTS.QUOTE_REQUEST, {
            quoteType: MARKET_TYPES.QUOTE_TYPE.BEST,
            symbol
        });

        // create event channel around MarketWebSocketService quotes events
        const createEventChannel = getEventChannelCreator(marketService, EVENTS.MARKET.CLOSE);
        const channel = yield call(createEventChannel, EVENTS.MARKET.QUOTES);

        let currentQuote;

        // fork endless cycle for watch and put symbol data with delay
        yield fork(function* () {
            while (true) {
                // take every 'quotes' event on channel
                const { subscriptionId: eventId, quotes } = yield take(channel);
                if (eventId === subscriptionId) {
                    currentQuote = prepareQuotesData(quotes);
                }
            }
        });

        const updateStateTask = yield fork(function* () {
            while (true) {
                if (!isUndefined(currentQuote)) {
                    // dispatch gotSymbolQuote action
                    yield put(actions.market.gotSymbolQuote({ symbol, quote: currentQuote }));
                }
                // delay is used because of performance considerations
                yield delay(UPDATE_SYMBOL_DELAY);
            }
        });

        // watch unsubscribeFromSymbolQuoteInner action
        yield take(unsubscribeChannel);

        // stop channel, endless watch cycle will be broken automatically, (without cancel(task))
        channel.close();

        yield cancel(updateStateTask);

        yield marketService.requestUnsubscribe(MARKET_REQUESTS.QUOTE_REQUEST, subscriptionId);
    } catch (err) {
        yield put(actions.market.symbolQuoteSubscriptionError({ symbol, error: err.message }));
    }
}
