/* eslint max-classes-per-file: ["error", 2] */

import {RealtimeConnection} from "./RealtimeConnection"

/**
 * The config for a subscription cache.
 */


/**
 * Caches subscriptions to the realtime connection.
 * Keeps the real subscription alive for a certain time
 * after unsubscription so that if another subscriber comes along
 * before timeout it can reuse the already-active subscription.
 *
 * This is useful because almost all observable type libraries and approaches
 * rely on the fact that `subscribe()` and `unsubscribe()` are relatively cheap
 * operations, but our approach means they involve roundtrips to the server, so
 * we want to reduce their frequency as much as we can.
 *
 *
 * The cache checks for inactive subscribers every `tickFrequency` milliseconds.
 * If it sees an inactive subscription, it increments the `age` for that subscription
 * by 1. If the subscription reaches `maxAge`, it will be removed from the cache.
 * If a new subscriber comes along for the subscription, its `age` will be reset to 0.
 *
 * By default the subscription cache has a `tickFrequency` of 1000 (1 second) and a
 * `maxAge` of 10, so inactive subscriptions will be removed after 10 seconds.
 *
 * @example
 *
 *     import {RealtimeSubscriptionCache} from "@vimana/lib-realtime";
 *
 *     const cache = new RealtimeSubscriptionCache(myConnection);
 *     const subscription1 = cache.subscribe(["something", {id: 123}], {
 *        next: value => console.log(value),
 *        complete: () => console.log("complete"),
 *        error: err => console.error(err)
 *     });
 *
 *     // Remove the subscription after 1 second.
 *     setTimeout(() => subscription1.unsubscribe(), 1000);
 *
 *     // Add another subscription after 3 seconds.
 *     setTimeout(() => {
 *       // underlying subscription is reused.
 *       const subscription2 = cache.subscribe(["something", {id: 123}], {
 *        next: value => console.log(value),
 *        complete: () => console.log("complete"),
 *        error: err => console.error(err)
 *       });
 *     }, 3000);
 *
 */
export class RealtimeSubscriptionCache {
  /**
   * Initialize the cache.
   */
  constructor(config) {
    /**
     * The connection to cache subscriptions for.
     */
    this.connection = undefined;

    /**
     * The number of milliseconds between each 'tick'.
     */
    this.tickFrequency = undefined;

    /**
     * The number of 'ticks' after which unused subscriptions
     * should be removed from the cache.
     */
    this.maxAge = undefined;

    /**
     * The entries in the cache.
     */
    this.entries = new Set();

    /**
     * The id of the timer for the ticker.
     * @private
     */
    this._timeoutID = undefined;

    /**
     * Indicates whether this cache has been disposed.
     * @private
     */
    this._hasDisposed = false;

    if (config.connection != null) {
      this.connection = config.connection;
    }
    this.tickFrequency = config.tickFrequency == null ? 1000 : config.tickFrequency;
    this.maxAge = config.maxAge == null ? 10 : config.maxAge;
    this._scheduleNextTick();
  }

  /**
   * Indicates whether this cache has been disposed.
   */
  get hasDisposed() {
    return this._hasDisposed
  }

  /**
   * Invoked when the cache receives a new connection.
   */
  onConnection(connection) {
    this.connection = connection
    for (const entry of this.entries) {
      if (entry.subscription && !entry.subscription.closed) {
        entry.ignoreNextComplete = true
        entry.subscription.unsubscribe()
        entry.ignoreNextComplete = false
      }
      entry.subscription = connection.subscribe(entry.params, entry)
    }
  }

  /**
   * Subscribe to a given topic.
   * If we have a subscription already, it will be reused.
   */
  subscribe(params, subscriber) {
    if (this._hasDisposed) {
      throw new Error("The subscription cache has been disposed of, can no longer subscribe to it.")
    }
    const key = JSON.stringify(params)

    for (const existing of this.entries) {
      if (existing.key === key) {
        return existing.subscribe(subscriber)
      }
    }

    const entry = new RealtimeSubscriptionCacheEntry(this, key, params)
    this.entries.add(entry)

    const subscription = entry.subscribe(subscriber)

    if (this.connection) {
      entry.subscription = this.connection.subscribe(params, entry)
    }

    return subscription
  }

  /**
   * Get the associated cache entry for the given params.
   */
  getCacheEntry(params) {
    const key = JSON.stringify(params)

    for (const existing of this.entries) {
      if (existing.key === key) {
        return existing
      }
    }
  }

  /**
   * Get the latest value from the cache for the given parameters.
   */
  getCachedValue(params) {
    const key = JSON.stringify(params)
    for (const existing of this.entries) {
      if (existing.key === key) {
        return existing.lastValue
      }
    }
  }

  /**
   * Destroy the subscription cache.
   */
  dispose() {
    if (this._hasDisposed) {
      console.warn(".dispose() called more than once on subscription cache.")
      return
    }
    if (this._timeoutID != null) {
      clearTimeout(this._timeoutID)
      this._timeoutID = null
    }

    for (const entry of this.entries) {
      entry.dispose()
    }

    this._hasDisposed = true
  }

  /**
   * Schedule the next tick, using the `tickFrequency` value.
   */
  _scheduleNextTick() {
    if (this._timeoutID != null) {
      clearTimeout(this._timeoutID)
    }
    this._timeoutID = setTimeout(() => {
      this._timeoutID = null
      try {
        this._tick()
      } catch (e) {
        console.error(`Error in tick:`, e)
      }
      this._scheduleNextTick()
    }, this.tickFrequency)
  }

  /**
   * Check the cache for unused subscriptions.
   */
  _tick() {
    const {maxAge} = this
    for (const entry of this.entries) {
      if (!entry.isActive) {
        entry.age++
      }
      if (entry.age > maxAge) {
        entry.dispose()
      }
    }
  }
}

/**
 * A cached realtime subscription.
 */
export class RealtimeSubscriptionCacheEntry {
  /**
   * Initialize the cache entry.
   */
  constructor(cache, key, params) {
    /**
     * The cache this entry belongs to.
     */
    this.cache = cache;

    /**
     * The cache key for the entry.
     */
    this.key = key;

    /**
     * The subscription parameters.
     */
    this.params = params;

    /**
     * The subscription for this cache entry.
     */
    this.subscription = undefined;

    /**
     * The last value seen over this subscription.
     */
    this.lastValue = undefined;

    /**
     * The number of ticks since this entry was last active.
     */
    this.age = 0;

    /**
     * Whether the cache entry should ignore the next "complete" event.
     * This is used when the cache receives a new connection.
     */
    this.ignoreNextComplete = false;

    /**
     * The subscribers to the cache entry.
     */
    this.subscribers = new Set();
  }

  /**
   * Whether or not this cache entry is considered 'active'.
   */
  get isActive() {
    return this.subscribers.size > 0
  }

  /**
   * Subscribe to the cache entry.
   */
  subscribe(subscriber) {
    this.age = 0
    this.subscribers.add(subscriber)
    let closed = false
    if (this.lastValue !== undefined) {
      // We have a value already, provide it in the next tick.
      Promise.resolve().then(() => {
        if (!closed && this.lastValue !== undefined) {
          subscriber.next(this.lastValue)
        }
      })
    }

    const unsubscribe = () => {
      if (this.subscribers.has(subscriber)) {
        closed = true
        this.subscribers.delete(subscriber)
        subscriber.complete()
      }
    }

    return {
      get closed() {
        return closed
      },
      unsubscribe
    }
  }

  /**
   * Invoked when we receive a value over the subscription.
   */
  next(value) {
    this.lastValue = value
    for (const subscriber of this.subscribers) {
      subscriber.next(value)
    }
  }

  /**
   * Invoked when the subscription completes successfully.
   */
  complete() {
    if (this.ignoreNextComplete) {
      this.ignoreNextComplete = false
      return
    }
    for (const subscriber of this.subscribers) {
      this.subscribers.delete(subscriber)
      subscriber.complete()
    }
    this.cache.entries.delete(this)
  }

  /**
   * Invoked when the subscription terminates in an error.
   */
  error(error) {
    for (const subscriber of this.subscribers) {
      this.subscribers.delete(subscriber)
      subscriber.error(error)
    }
    this.cache.entries.delete(this)
  }

  /**
   * Destroy the cache entry.
   */
  dispose() {
    const {subscription} = this
    this.complete()
    if (subscription) {
      subscription.unsubscribe()
      this.subscription = undefined
    }
  }
}
