import {Injectable} from '@angular/core';
import {DateTime} from 'luxon';
import {asyncScheduler, Observable, Subject, throwError} from 'rxjs';
import {buffer, delay, map, tap, throttleTime} from 'rxjs/operators';

import {AssetsListRequestParams, AssetsPatchRequestParams} from 'api/ias/api/projects.service';
import {ApiAsset, ApiAssetsListAssetsResponse, ApiAssetUserRenditions, ApiCopy, ApiSnippet} from 'api/ias/model/models';
import {assertTruthy, checkExhaustive, checkExhaustiveAllowing} from 'asserts/asserts';
import {AssetCopyStats, AssetRendition, Metadata, SharedLink, SignedUrl} from 'models';


import {environment} from '../environments/environment';
import {GcmaQueryExpressions} from '../query_expressions/gcma_query_expressions';

import {BatchedResourceChanges, IasApiClient, ResourceChange} from './api_client.module';
import {AssetCopy, ListResponse, Original} from './asset_service';
import {ApiApprovalState, ApiApprovalStateStateEnum, ApiAssetStateEnum, ApiEventMetadataStreamingState} from './ias_types';
import {SignUrlResponse} from './shared_links_api_service';

/** Time window to batch asset changes together. */
const ASSET_CHANGED_BUFFER_WINDOW_MS = 300;

/**
 * Id of the internal only schema generated by the backend to pre-populate
 * asset titles from filenames on ingestion.
 */
export const PRE_INGESTION_SCHEMA_ID = 'pre-ingestion';

/** Max supported asset copies constraint that shapes UX assumptions.  */
export const MAX_SUPPORTED_ASSET_COPIES = 30;

/** Special string assigned as to an asset copy before it is persisted. */
export const NEW_ASSET_COPY_NAME = 'new_copy';

/**
 * AssetApiService to interact with IAS backend asset related APIs
 */
@Injectable({providedIn: 'root'})
export class AssetApiService {
  private readonly assetChangedInternal$ =
      new Subject<ResourceChange<Original>>();

  /** Emits whenever an asset is updated via the api. */
  readonly assetChanged$ = this.assetChangedInternal$.pipe(delay(0));

  /** Emits asset updates batched together within 300ms */
  readonly assetsChanged$ = this.watchBatchChanges(this.assetChanged$);

  constructor(
      private readonly apiClient: IasApiClient,
      private readonly gcmaQuery: GcmaQueryExpressions,
  ) {}

  // Currently take the first 3. Default order is by created_time.
  getRecents(): Observable<Original[]> {
    const params: Partial<AssetsListRequestParams> = {
      pageSize: 3,
      filter: this.gcmaQuery.and([
        this.gcmaQuery.is('state', ApiAssetStateEnum.STATE_READY),
        this.gcmaQuery.is('approvalState', ApiApprovalStateStateEnum.APPROVED),
      ]),
    };
    return this.list(params).pipe(map((response) => response.assets));
  }

  getOne(assetName: string) {
    try {
      return this.apiClient.assetsGetEnhanced({name: assetName})
          .pipe(map(asset => convertApiAssetToUiAsset(asset.asset)));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  /** Re-fetches an asset from api and generates AssetChanged event */
  refreshAsset(asset: Original) {
    return this.getOne(asset.name)
        .pipe(
            tap(updated => this.assetChangedInternal$.next(
                    this.prepareChange(asset, updated))));
  }

  /** Returns assets that are processing and approved. */
  getProcessing(pageSize: number): Observable<ListResponse<Original>> {
    const params: Partial<AssetsListRequestParams> = {
      pageSize,
      filter: this.gcmaQuery.and([
        this.gcmaQuery.is('state', ApiAssetStateEnum.STATE_PROCESSING),
        this.gcmaQuery.is('approvalState', ApiApprovalStateStateEnum.APPROVED),
      ]),
    };
    return this.list(params);
  }

  signUrls(rawUrls: string[]): Observable<SignUrlResponse> {
    try {
      return this.apiClient
          .assetsSignUrls({
            parent: environment.mamApi.parent,
            body: {urls: rawUrls},
          })
          .pipe(map(
              resp => ({
                signedUrls: (resp.signedUrls ?? []).map(u => {
                    if (!u.signedUrl) u.signedUrl = u.rawUrl;
                    return new SignedUrl(u);
                  })
              })));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  updateMetadata(
      asset: Original, schema: string, jsonMetadata: Record<string, unknown>) {
    const params: Partial<AssetsPatchRequestParams> = {
      updateMask: 'assetMetadata',
    };

    return this.patch(
        asset, {
          assetMetadata: {
            jsonMetadata: jsonMetadata as Record<string, object>,
            schema,
          },
        },
        params);
  }

  updateApprovalState(asset: Original, state: ApiApprovalState):
      Observable<Original> {
    const apiAsset: ApiAsset = {approvalState: {state}};

    return this.patch(asset, apiAsset, {updateMask: 'approvalState'});
  }

  list(params: Partial<AssetsListRequestParams> = {}):
      Observable<ListResponse<Original>> {
    try {
      return this.apiClient
          .assetsList({parent: environment.mamApi.parent, ...params})
          .pipe(map((response) => this.convertToUiListResponse(response)));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  delete(asset: Original): Observable<Original> {
    if (asset.isLive) {
      return throwError(() => 'assets:delete only supports VoD assets.');
    }

    try {
      return this.apiClient.assetsDelete({name: asset.name})
          .pipe(
              map(asset => convertApiAssetToUiAsset(asset)),
              tap(updated => this.assetChangedInternal$.next(
                      this.prepareChange(asset, updated))));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  undelete(asset: Original): Observable<Original> {
    if (asset.isLive) {
      return throwError(() => 'assets:undelete only supports VoD assets.');
    }

    try {
      return this.apiClient.assetsUndelete({name: asset.name, body: {}})
          .pipe(
              map(asset => convertApiAssetToUiAsset(asset)),
              tap(updated => this.assetChangedInternal$.next(
                      this.prepareChange(asset, updated))));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  deleteEvent(asset: Original): Observable<Original> {
    if (!asset.isLive) {
      return throwError(() => 'assets:deleteEvent only supports live assets.');
    }

    try {
      return this.apiClient
          .eventsDelete({name: asset.name.replace('/assets/', '/events/')})
          .pipe(
              // The `eventsClient` does not return the deleted asset or event,
              // so we simulate an asset with its `isDeleted` property set.
              map(() => ({...asset, isDeleted: true})),
              tap(updated => this.assetChangedInternal$.next(
                      this.prepareChange(asset, updated))));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  generateUploadUri(fileName: string, mimeType: string): Observable<string> {
    const prefix = `local-uploads/${DateTime.local().toFormat('yyyy/MM')}`;
    try {
      return this.apiClient
          .assetsGenerateUploadUri({
            parent: environment.mamApi.parent,
            body: {
              fileName: `${prefix}/${fileName}`,
              mimeType,
            }
          })
          .pipe(map((response) => response?.uploadUri ?? ''));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  createCopy(asset: Original, copy: AssetCopy): Observable<AssetCopy> {
    const apiCopy: ApiCopy = {
      full: copy.isFullCopy || undefined,
      cut: copy.isFullCopy ? undefined : {
        startCutOffset: `${copy.startOffset}s`,
        endCutOffset: `${copy.endOffset}s`,
      },
      copyMetadata: copy.metadata,
      file: copy.fileName,
      clipsCarryover: copy.isFullCopy ? 'MOVE' : 'KEEP',
      annotationsCarryover: 'COPY',
    };

    try {
      return this.apiClient
          .assetsCopiesCreate({parent: asset.name, body: apiCopy})
          .pipe(map(convertToUiAssetCopy));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  listCopies(asset: Original): Observable<AssetCopy[]> {
    try {
      return this.apiClient
          .assetsCopiesList(
              {parent: asset.name, pageSize: MAX_SUPPORTED_ASSET_COPIES})
          .pipe(map(resp => (resp.copies ?? []).map(c => convertToUiAssetCopy(c))));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  approveCopy(copy: AssetCopy): Observable<AssetCopy> {
    assertTruthy(
        copy.name && copy.name !== NEW_ASSET_COPY_NAME,
        'AssetApiService->approveCopy: cannot approve unsaved copy');
    try {
      return this.apiClient.assetsCopiesApprove({name: copy.name})
          .pipe(map(convertToUiAssetCopy));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  deleteCopy(copy: AssetCopy): Observable<null> {
    assertTruthy(
        copy.name && copy.name !== NEW_ASSET_COPY_NAME,
        'AssetApiService->deleteCopy: cannot delete unsaved copy');
    try {
      return this.apiClient.assetsCopiesDelete({name: copy.name})
          .pipe(map(() => null));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  updateCopyMetadata(copy: AssetCopy, copyMetadata: Metadata):
      Observable<AssetCopy> {
    assertTruthy(
        copy.name && copy.name !== NEW_ASSET_COPY_NAME,
        'AssetApiService->updateCopyMetadata: cannot update unsaved copy');
    try {
      const payload: ApiCopy = {copyMetadata};
      return this.apiClient
          .assetsCopiesPatch(
              {name: copy.name, body: payload, updateMask: 'copyMetadata'})
          .pipe(map(convertToUiAssetCopy));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  private patch(
      asset: Original, newAsset: ApiAsset,
      params: Partial<AssetsPatchRequestParams> = {}): Observable<Original> {
    try {
      const { source } = asset;

      return this.apiClient
          .assetsPatch({name: asset.name, body: newAsset, ...params})
          .pipe(
              map(asset => convertApiAssetToUiAsset(asset)),
              tap(updated => this.assetChangedInternal$.next(
                      this.prepareChange(asset, {...updated, source}))));
    } catch (error: unknown) {
      return throwError(() => error);
    }
  }

  private prepareChange(current: Original, updated: Original):
      ResourceChange<Original> {
    // Currently renditions returned by the patch method contain unsigned
    // "gs://..." style urls.
    updated.renditions = current.renditions;

    // TODO: Remove once all asset/clip caches invalidate on
    // assetChanged$ emission.
    // Update select properties to avoid issue with rendition format. See above.
    current.assetMetadata = updated.assetMetadata;
    current.title = updated.title;
    current.isDeleted = updated.isDeleted;
    current.approved = updated.approved;
    current.state = updated.state;
    current.streamingState = updated.streamingState;
    current.errorReason = updated.errorReason;
    current.copyStats = updated.copyStats;
    current.archiveName = updated.archiveName;
    current.source = updated.source;
    current.gcsLocationUrl = updated.gcsLocationUrl;

    return {item: updated, type: 'UPDATE'};
  }

  private watchBatchChanges(assetChanged$:
                                Observable<ResourceChange<Original>>):
      Observable<BatchedResourceChanges<Original>> {
    return assetChanged$.pipe(
        buffer(assetChanged$.pipe(throttleTime(
            ASSET_CHANGED_BUFFER_WINDOW_MS, asyncScheduler,
            {leading: false, trailing: true}))),
        map(changes => {
          const updates = new Map<string, Original>();
          for (const change of changes) {
            // Currently there is no way to create an asset. Asset deletion is
            // handled as 'update' because it is a soft delete.
            assertTruthy(
                change.type === 'UPDATE',
                `AssetApiService unexpected change type: ${change.type}`);
            updates.set(change.item.name, change.item);
          }
          return {updates};
        }),
        // If we ever need this to be ReplaySubject we need to use
        // "sharedReplay(...), delay(0)" or use asapScheduler to make sure that
        // this observable is not executed synchronously.
    );
  }

  // Converts to UI List Response, once we sync the Asset type from UI with
  // backend proto, this step can be deleted.
  private convertToUiListResponse(response: ApiAssetsListAssetsResponse):
      ListResponse<Original> {
    const {assets, nextPageToken} = response;
    return {
      assets:
          (assets ?? []).map(asset => convertApiAssetToUiAsset(asset.asset)),
      nextPageToken,
    };
  }
}

/**
 * Converts the API Asset type to UI Asset type.
 * 1. Current API Asset is still under construction, not stable to use yet
 * Proto:
 * google3/google/cloud/video/intelligent_asset_service/v1/asset.proto;l=40
 * 2. The UI might not need all the information that the API provides.
 * 3. Normalize the API Asset is helpful for UI to use.
 */
export function convertApiAssetToUiAsset(apiAsset: ApiAsset = {}): Original {
  const title = apiAsset.snippet?.title || '(Untitled)';
  const {state, hasError, isDeleted, isLive, streamingState} =
      getStateInfo(apiAsset);
  const startTime = 0;

  // TODO: Consider using `snippet.duration` once it is fixed.
  const endTime = Number(apiAsset.endTime || 0) / 1000;

  const eventMetadata = apiAsset.eventMetadata;
  // startTime/endTime in eventMetadata are ISO date strings.
  const eventStartTime = new Date(eventMetadata?.startTime ?? 0).getTime();
  const eventEndTime = new Date(eventMetadata?.endTime ?? 0).getTime();

  // `preCut` and `postCut` are given in microseconds, converted into seconds
  // for consistency with other `Asset` times.
  const preCut = Number(eventMetadata?.preCut ?? 0) / 1e6;
  const postCut = Number(eventMetadata?.postCut ?? 0) / 1e6;

  const assetMetadata = new Metadata(apiAsset.assetMetadata);
  const jsonMetadata = assetMetadata.jsonMetadata;

  const isBroadcast = jsonMetadata[MetadataField.IS_BROADCAST] === 'true';
  const eventCorrelationId: string =
      jsonMetadata[MetadataField.CORRELATION_ID] ?? '';
  const cameraAngleType: string =
      jsonMetadata[MetadataField.CAMERA_ANGLE_TYPE] ?? '';
  const cameraCount = Number(jsonMetadata[MetadataField.CAMERA_COUNT] ?? 1);

  // Ideally get the `EventDate` from metadata if available. Fallback to
  // `captureTime` when metadata is incomplete. Finally, live streams may have
  // neither, so we use their event scheduled `startTime` as the best
  // approximation for an event time.
  const eventTimeMetadata: string|undefined =
      assetMetadata.jsonMetadata['EventDate'];
  const eventTime = (eventTimeMetadata || apiAsset.snippet?.captureTime ||
                     eventMetadata?.startTime) ??
      '';

  const gcsLocationUrl =
      convertToGsUri((apiAsset.gcsLocation?.url || apiAsset.liveGcsUrl) ?? '');

  // Mark assets with pre-ingestion schema as assets with no schema.
  if (assetMetadata.schema.endsWith(PRE_INGESTION_SCHEMA_ID)) {
    assetMetadata.schema = '';
  }

  const archiveName =
      isLive ? apiAsset.copyStats?.full?.completeVodAssetNames?.[0] : undefined;

  return {
    title,
    name: apiAsset.name ?? '',
    duration: endTime - startTime,
    startTime,
    endTime,
    updateTime: Date.parse(apiAsset.updateTime ?? ''),
    eventStartTime,
    eventEndTime,
    startTimecode: getStartTimecode(apiAsset.snippet ?? {}),
    preCut,
    postCut,
    createTime: Date.parse(apiAsset.createTime ?? ''),
    eventTime: Date.parse(eventTime),
    renditions: (apiAsset.renditions ?? []).map(r => new AssetRendition(r)),
    gcsLocationUrl,
    state,
    streamingState,
    assetMetadata,
    camera: {
      isBroadcast,
      label: cameraAngleType,
      correlationId: eventCorrelationId,
      totalCount: cameraCount,
    },
    isLive,
    hasError,
    errorReason: apiAsset.stateDetail?.errorDetails?.message ||
        eventMetadata?.streamingErrorReason,
    isDeleted,
    rawSourceUrl: getRawSourceSignedUrl(apiAsset.userRenditions ?? {}),
    videoFormat: apiAsset.videoFormat ?? {},
    copyStats: new AssetCopyStats(apiAsset.copyStats),
    approved: apiAsset.approvalState?.state === 'APPROVED' ||
        (isLive && !!apiAsset.copyStats?.full?.approvedCount),
    archiveName,
  };
}

/** Converts an API Snippet timecode into a number of milliseconds. */
export function getStartTimecode(snippet: ApiSnippet) {
  const nullDate = '1970-01-01T00:00:00Z';
  return snippet.startTimecode && snippet.startTimecode !== nullDate ?
      new Date(snippet.startTimecode).getTime() :
      undefined;
}

/**
 * Converts a `https://storage...` URL (signed or unsigned) to a `gs://` URL,
 * and removes query parameters.
 */
export function convertToGsUri(url: string) {
  if (!url) return url;
  return decodeURIComponent(url)
      .replace('https://storage.cloud.google.com/', 'gs://')
      .replace('https://storage.googleapis.com/', 'gs://')
      .replace(/\?.+/, '');
}

/** Gets the signed raw source URL from user renditions. */
export function getRawSourceSignedUrl(userRenditions: ApiAssetUserRenditions):
    string|undefined {
  if (!userRenditions || !userRenditions.renditionMap) return undefined;
  const rawSource = userRenditions.renditionMap['RAW_SOURCE'];
  return rawSource?.url;
}

/** Generates unique cut-down id based on parent asset name and offsets. */
export function buildCutdownId(
    parentName: string, startOffset: number, endOffset: number) {
  return `${parentName}_${startOffset}_${endOffset}`;
}

function convertToUiAssetCopy(apiCopy: ApiCopy): AssetCopy {
  const startOffset = Number(apiCopy.cut?.startCutOffset?.slice(0, -1) ?? 0);
  const endOffset = Number(apiCopy.cut?.endCutOffset?.slice(0, -1) ?? 0);

  return {
    name: apiCopy.name ?? '',
    isFullCopy: !!apiCopy.full,
    startOffset,
    endOffset,
    fileName: apiCopy.file ?? '',
    metadata: new Metadata(apiCopy.copyMetadata),
    state: apiCopy.stateDetail?.state ?? 'STATE_UNSPECIFIED',
    errorReason: apiCopy.stateDetail?.errorDetails?.message ?? '',
    vodAssetName: apiCopy.vodAssetName,
  };
}

/** Extracts state-related properties from an API asset or shared link. */
export function getStateInfo(apiItem: ApiAsset|SharedLink): AssetStateInfo {
  const streamingState = apiItem instanceof SharedLink ?
      apiItem.streamingState :
      (apiItem.eventMetadata?.streamingState ?? 'STREAMING_STATE_UNSPECIFIED');

  const defaults = {
    streamingState,
    hasError: false,
    isLive: false,
    isDeleted: false,
  } as const;

  const apiAssetState = apiItem.stateDetail?.state ?? 'STATE_UNSPECIFIED';

  // TODO-HLS: Use HLS manifest.
  if (apiAssetState === 'STATE_PROCESSING') {
    // STATE_PROCESSING can refer to either live asset being converted to VoD or
    // to VoD asset processing. To tell these cases apart we can check for
    // rendition that has to be always present for live events and never for
    // VoDs.
    const isLive =
        (apiItem.renditions ?? []).some(r => r.version === 'LIVE_MAIN_DASH');
    return {...defaults, state: AssetState.PROCESSING, isLive};
  }

  if (apiAssetState === 'STATE_DELETED') {
    return {...defaults, state: AssetState.VOD, isDeleted: true};
  }

  // It is possible that streamingState is briefly STREAMING_STATE_UNSPECIFIED
  // for live asset so we need to check asset state as well.
  if (streamingState === 'STREAMING_STATE_UNSPECIFIED' &&
      apiAssetState !== 'STATE_STREAMING') {
    return {
      ...defaults,
      state: AssetState.VOD,
      hasError: apiAssetState === 'STATE_ERROR',
    };
  }

  switch (apiAssetState) {
    case 'STATE_SCHEDULED':
      return {...defaults, state: AssetState.SCHEDULED, isLive: true};
    case 'STATE_STREAMING':
    case 'STATE_STREAMING_ERROR':
      // https://cloud.google.com/internal/intelligent-asset-service/rest/v1/StreamingState
      switch (streamingState) {
        case 'PENDING':
          // Added for completeness. In this case asset state should be
          // STATE_SCHEDULED so we should never get here.
          return {...defaults, state: AssetState.SCHEDULED, isLive: true};
        case 'CREATING':
        case 'STREAMING_STATE_UNSPECIFIED':
          return {...defaults, state: AssetState.PENDING, isLive: true};
        case 'CREATING_ERROR':
          return {
            ...defaults,
            state: AssetState.PENDING,
            hasError: true,
            isLive: true,
          };
        case 'AWAITING_INPUT':
        case 'STREAMING':
        case 'PAUSED':
        case 'STOPPED':
        case 'STOPPING':
        case 'NO_INPUT':
          return {...defaults, state: AssetState.AIRING, isLive: true};
        case 'STREAMING_ERROR':
        case 'STOPPING_ERROR':
        case 'REPAIRING':  // REPAIRING is deprecated.
          return {
            ...defaults,
            state: AssetState.AIRING,
            hasError: true,
            isLive: true,
          };
      }
      return checkExhaustive(
          streamingState, `Unsupported streamingState = ${streamingState}`);
    case 'STATE_STREAMING_STOPPED':
      return {...defaults, state: AssetState.ENDED, isLive: true};
    default:
      checkExhaustiveAllowing<'STATE_UNSPECIFIED'|'STATE_ERROR'|
                              'STATE_PENDING_MEDIA'|'STATE_READY'|
                              'STATE_DELETED'>(
          apiAssetState,
          `Cannot interpret api asset with state = ${
              apiAssetState} and streaming state = ${streamingState}`);
  }
}

/** All possible UI-relevant states of an asset. */
export enum AssetState {
  /** Video on Demand (as opposed to live stream). */
  VOD = 'vod',
  /** Live event scheduled to start at a later time. */
  SCHEDULED = 'scheduled',
  /** Live event has been started but there is no live stream yet. */
  PENDING = 'pending',
  /** Live event currently being broadcasted. */
  AIRING = 'airing',
  /** Live event has been finished. */
  ENDED = 'ended',
  /** Live event is being converted to VoD. */
  PROCESSING = 'processing',
}

/** Asset metadata fields that MAM relies on. */
export enum MetadataField {
  IS_BROADCAST = 'IsBroadcast',
  CORRELATION_ID = 'CorrelationId',
  CAMERA_ANGLE_TYPE = 'CameraAngleType',
  CAMERA_COUNT = 'CameraCount',
  HOME_TEAM = 'EventHomeTeam',
  AWAY_TEAM = 'EventAwayTeam',
  COURTESY = 'EventCourtesy',
  SOURCE_ROUTER = 'SourceRouter',
  DESCRIPTION = 'Description',
  CONTENT_TYPE = 'ContentType',
  SPORT = 'Sport',
  SITE = 'Site',
  /** @deprecated has been replaced by `HI_RES_FILE_PATH` */
  VOD_FILENAME = 'VoD_Filename',
  /** On-prem location of the source file with EVS prefix. */
  HI_RES_FILE_PATH = 'HiResFilePath',
  /**
   * When the source file come from Doha, `HI_RES_FILE_PATH` points to the
   * location in Doha, and `SECONDARY_HI_RES_FILE_PATH` is an array which first
   * and unique entry may point to either a GCS location (staring with the
   * bucket name), or a secondary location where MediaCache is operating
   * (similar to other sources that don't come from Doha).
   */
  SECONDARY_HI_RES_FILE_PATH = 'SecondaryHiResFilePath',
  TITLE = 'Title',
}

export const LIVE_ASSET_LOCATION_METADATA_FIELDS = [
  // Top priority when it exists, as it contains the filename that MediaCache
  // knows in order to locate the file.
  MetadataField.SECONDARY_HI_RES_FILE_PATH,
  // Regular files that have no secondary location.
  MetadataField.HI_RES_FILE_PATH,
  // eslint-disable-next-line deprecation/deprecation -- Deprecated value for old assets, replaced by HI_RES_FILE_PATH.
  MetadataField.VOD_FILENAME,
] as const;

interface AssetStateInfo {
  state: AssetState;
  streamingState: ApiEventMetadataStreamingState;
  hasError: boolean;
  isDeleted: boolean;
  isLive: boolean;
}
