/**
 * Device Data Saga
 * @author priyanka.ambawane@shorelineiot.com
 */

import { AnyAction } from 'redux';
import { take, takeEvery, put, call } from 'redux-saga/effects';
import { eventChannel, END, SagaIterator, EventChannel } from 'redux-saga';
import * as ACTIONS from './deviceLiveData.actions';
import * as ACTION_TYPES from './deviceLiveData.actionTypes';
import { SafeSaga } from '../../../../../framework';
import { AuthService } from '../../../../../framework/services/auth.service';

function startEventChannel(topics: Array<string>) {
  const authService = new AuthService();
  return eventChannel((emitter) => {
    const task = authService.getDataFromTopic(topics).subscribe({
      next: (data: any) => {
        emitter({ value: data.value });
      },
      error: (error: any) => emitter(error),
      complete: () => emitter(END)
    });
    // The subscriber must return an unsubscribe function
    return () => {
      task.unsubscribe();
    };
  });
}

export function* initiateLiveDataSubscription(action: AnyAction) {
  const authService = new AuthService();
  yield authService.connectToMQTTBroker();
  const channel: EventChannel<any> = yield call(startEventChannel, action.payload);
  yield takeEvery(channel, function* ({ value }: any) {
    if (value) {
      yield put(ACTIONS.getDeviceDataSuccess(value));
    }
  });
  yield take(ACTION_TYPES.CANCEL_SUBSCRIPTION_TO_TOPIC);
  channel.close();
}

function* errorHandler(error: any) {
  yield put(ACTIONS.getDeviceDataFailure(error));
}

export function* watcherSagas(): SagaIterator {
  yield takeEvery(
    ACTION_TYPES.INITIATE_SUBSCRIPTION_TO_TOPIC,
    SafeSaga(
      initiateLiveDataSubscription,
      ACTION_TYPES.INITIATE_SUBSCRIPTION_TO_TOPIC,
      errorHandler
    )
  );
}
