//This is an override class for AWS' own PubSub module.
//This solution comes from this GitHub issue - https://github.com/aws-amplify/amplify-js/issues/3039
//and was posted as a solution to recurring problems with AWS' module resulting in
//AMJS0008I Socket closed errors. As of 10/6/21, the AWS exposed PubSub module still has this issue.
import Observable from 'zen-observable-ts';
import { Auth } from 'aws-amplify';
import {
	Amplify,
	INTERNAL_AWS_APPSYNC_PUBSUB_PROVIDER,
	INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER,
} from '@aws-amplify/core';
import {
	AWSAppSyncProvider,
	AWSAppSyncRealTimeProvider,
} from '@aws-amplify/pubsub/lib/Providers';
// @ts-ignore
import { MqttOverWSProvider } from '@aws-amplify/pubsub/lib/Providers';

export class PubSubClass {
	/**
	 * Initialize PubSub with AWS configurations
	 *
	 * @param {PubSubOptions} options - Configuration object for PubSub
	 */
	constructor(options) {
		this._options = options;
		this._pluggables = [];
		this.subscribe = this.subscribe.bind(this);
		this.aws_pubsub_region = '';
		this.aws_pubsub_endpoint = '';
	}
	/**
	 * Lazy instantiate AWSAppSyncProvider when it is required by the API category
	 */
	get awsAppSyncProvider() {
		if (!this._awsAppSyncProvider) {
			this._awsAppSyncProvider = new AWSAppSyncProvider(this._options);
		}
		return this._awsAppSyncProvider;
	}
	/**
	 * Lazy instantiate AWSAppSyncRealTimeProvider when it is required by the API category
	 */
	get awsAppSyncRealTimeProvider() {
		if (!this._awsAppSyncRealTimeProvider) {
			this._awsAppSyncRealTimeProvider = new AWSAppSyncRealTimeProvider(this._options);
		}
		return this._awsAppSyncRealTimeProvider;
	}
	getModuleName() {
		return 'PubSub';
	}
	/**
	 * Configure PubSub part with configurations
	 *
	 * @param {PubSubOptions} config - Configuration for PubSub
	 * @return {Object} - The current configuration
	 */
	configure(options) {
		const opt = options ? options.PubSub || options : {};
		this._options = Object.assign({}, this._options, opt);
		this._pluggables.map((pluggable) => pluggable.configure(this._options));
		return this._options;
	}
	/**
	 * add plugin into Analytics category
	 * @param {Object} pluggable - an instance of the plugin
	 */
	async addPluggable(pluggable, awsRegion, endpoint) {
		this.aws_pubsub_region = awsRegion;
		this.aws_pubsub_endpoint = endpoint;
		if (pluggable) {
			this._pluggables.push(pluggable);
			const config = pluggable.configure(this._options);
			return config;
		}
	}
	getProviderByName(providerName) {
		if (providerName === INTERNAL_AWS_APPSYNC_PUBSUB_PROVIDER) {
			return this.awsAppSyncProvider;
		}
		if (providerName === INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER) {
			return this.awsAppSyncRealTimeProvider;
		}
		return this._pluggables.find(
			(pluggable) => pluggable.getProviderName() === providerName
		);
	}
	getProviders(options = {}) {
		const { provider: providerName } = options;
		if (!providerName) {
			return this._pluggables;
		}
		const provider = this.getProviderByName(providerName);
		if (!provider) {
			throw new Error(`Could not find provider named ${providerName}`);
		}
		return [provider];
	}
	async restartMqttOverWSProvider() {
		let cognitoUser = await Auth.currentAuthenticatedUser();
		let jwtToken = cognitoUser.signInUserSession.idToken.jwtToken || '';
		this.log('Restarting MqttOverWSProvider...');
		this._pluggables[0] = new MqttOverWSProvider({
			aws_pubsub_region: this.aws_pubsub_region,
			aws_pubsub_endpoint: `wss://${this.aws_pubsub_endpoint}/mqtt?token=${jwtToken}`,
		});
	}
	async publish(topics, msg, options) {
		return Promise.all(
			this.getProviders(options).map((provider) => provider.publish(topics, msg, options))
		);
	}
	subscribe(topics, options) {
		const providers = this.getProviders(options);
		return new Observable((observer) => {
			const observables = providers.map((provider) => ({
				provider,
				observable: provider.subscribe(topics, options),
			}));
			const subscriptions = observables.map(({ provider, observable }) =>
				observable.subscribe({
					start: console.error,
					next: (value) => observer.next({ provider, value }),
					error: (error) => observer.error({ provider, error }),
					// complete: observer.complete, // TODO: when all completed, complete the outer one
				})
			);
			return () => subscriptions.forEach((subscription) => subscription.unsubscribe());
		});
	}
	log(message, data) {
		console.log('CustomPubSub: ' + message, data);
	}
}
export const PubSub = new PubSubClass(null);
Amplify.register(PubSub);
