











































































































































































































































































import * as R from 'ramda';
import { defineComponent, computed, ref, onMounted, onUnmounted, Ref, nextTick } from '@vue/composition-api';
import { ValidationObserver, ValidationProvider, extend } from 'vee-validate';
import { useAxios } from '@/app/composable';
import { FormBlock } from '@/app/components';
import { AssetsAPI } from '../../asset/api';
import { maxLengthValidator, localPathValidator, requiredValidator, minLengthValidator } from '@/app/validators';
import { regex } from 'vee-validate/dist/rules';
import { HarvesterBlockId, PreprocessingBlockId, blockIdToTaskMap } from '@/modules/apollo/constants';
import { ApolloAPI } from '@/modules/apollo/api';
import { ApolloTask, WizardAction } from '../types';
import { LoaderConfiguration } from '../types/loader.type';
import { ScheduleAPI } from '@/modules/workflow-designer/api';
import { Asset } from '@/modules/asset/types';
import { useApolloPipeline, useApolloTask } from '../composable';
import { ApolloTaskShell } from '../components';

extend('required', requiredValidator);
extend('max', maxLengthValidator);
extend('min', minLengthValidator);
extend('path', localPathValidator);
extend('regex', {
    ...regex,
    message: 'Title must contain only alphanumeric characters, dashes, underscores, spaces and at least one letter.',
});

export default defineComponent({
    name: 'Loader',
    metaInfo() {
        return { title: `Data Loading` };
    },
    props: {
        id: {
            type: String,
            required: true,
        },
        queryParams: {
            type: String,
            default: '{}',
        },
    },
    components: {
        FormBlock,
        ValidationProvider,
        ValidationObserver,
        ApolloTaskShell,
    },
    setup(props, { root }: any) {
        const { loading, error: loaderError, exec } = useAxios(true);

        const loaderRef: Ref<any> = ref(null);
        const resetKafka: Ref<any> = ref(null);

        const loader: Ref<ApolloTask<LoaderConfiguration> | undefined> = ref();
        const asset: Ref<Asset> = ref({ name: '', description: null }) as any;

        const showFinalizeModal = ref<boolean>(false);

        const loadingFinalise = ref(false);

        const {
            pipeline,
            loading: pipelineLoading,
            harvester,
            isFinalized,
            isSuspended,
            expiredRetrieveUntilDate,
            isOnPremise,
            error: pipelineError,
            resetSuspended,
            fetchPipeline,
        } = useApolloPipeline(props.id, undefined, false);

        const { canRevise } = useApolloTask<LoaderConfiguration>(loader);

        const isLoading = computed(() => loading.value || pipelineLoading.value || loadingFinalise.value);

        const canFinalize = computed(() => {
            return (
                !isFinalized.value &&
                loaderRef.value?.flags.passed &&
                (!(isKafka.value && isSuspended.value) || !R.isNil(resetKafka.value)) &&
                !(isKafka.value && expiredRetrieveUntilDate.value)
            );
        });

        const error = computed(() => loaderError.value || pipelineError.value);

        const finalise = async () => {
            if (loadingFinalise.value) return;
            loadingFinalise.value = true;

            // check if loader can be finalised
            const isValid = await loaderRef.value.validate();
            if (!isValid || !loader.value) return;
            if (isSuspended.value) {
                try {
                    await resetSuspended(resetKafka.value);
                } catch (e: any) {
                    if (
                        e?.response.status === 400 &&
                        e?.response?.data?.message === 'Consumer group is in Stable state'
                    ) {
                        (root as any).$toastr.w(
                            'The consumer group is being deleted. Please try again later.',
                            'Consumer group is in Stable state',
                        );
                        loadingFinalise.value = false;
                        return;
                    }
                }
            }

            // create asset
            const { id: collection } = await createAsset();

            // update loader
            const updatedConfiguration = { ...loader.value.configuration, collection };
            await exec(ApolloAPI.updateTask(props.id, 'loader', updatedConfiguration));

            // finalise loader
            await exec(ApolloAPI.finalizeTask(props.id, 'loader'));

            // finalise pipeline
            await exec(ApolloAPI.finalize(props.id));

            // show modal to go back to data checkin jobs
            showFinalizeModal.value = true;
            loadingFinalise.value = false;
        };

        const createAsset = async () => {
            if (asset.value?.id) return asset.value; // asset already created so return it
            const schedules = (await ScheduleAPI.getSchedules(props.id))?.data;
            const data = {
                ...asset.value,
                assetTypeId: 1,
                provenance: {
                    id: props.id,
                    type: 'data-checkin',
                    name: pipeline.value?.name,
                    runnerId: pipeline.value?.runnerId,
                },
                accessLevel: pipeline.value?.accessLevel,
                preprocessingTasks: pipeline.value?.tasks.map((task) => ({
                    blockId: task.blockId,
                    type: blockIdToTaskMap[task.blockId],
                    configuration: task.id === loader.value?.id ? loader.value?.configuration : task.configuration,
                })),
                processedSample: loader.value?.inputSample,
                schedules,
                createdById: pipeline.value?.createdById,
            };
            asset.value = (await exec(AssetsAPI.createAsset(data)))?.data;
            return asset.value;
        };

        const isKafka = computed(() =>
            [HarvesterBlockId.Kafka, HarvesterBlockId.ExternalKafka].includes(harvester.value?.blockId as any),
        );

        const hasEncryption = computed(() =>
            pipeline.value?.tasks.some((task) => task.blockId === PreprocessingBlockId.Encryption),
        );

        const restart = async () => {
            if (!loader.value) return;
            await exec(ApolloAPI.run(props.id));
            root.$router.push({ name: 'data-checkin-jobs', query: JSON.parse(props.queryParams) });
        };

        const fetch = async () => {
            await fetchPipeline();
            loader.value = (await exec(ApolloAPI.getTask(props.id, 'loader')))?.data;
            if (pipeline.value?.provenanceAssetIds?.length) {
                // asset already exists so fetch it to initialise the form
                const res = await exec(AssetsAPI.getAsset(pipeline.value.provenanceAssetIds[0]));
                asset.value = res?.data;
                // once the form is rendered, validate it in order to enable the finalize button
                nextTick(() => loaderRef.value.validate());
            }
        };

        const unlockJob = async () => {
            await exec(ApolloAPI.unlock(props.id));
        };

        const wizardActions = computed<Partial<WizardAction>[]>(() => [
            { key: 'finalize', show: !isFinalized.value, enabled: canFinalize.value },
            { key: 'revise', label: 'Restart', show: canRevise.value, enabled: canRevise.value },
        ]);

        onMounted(async () => {
            window.addEventListener('beforeunload', unlockJob);
        });

        onUnmounted(async () => {
            unlockJob();
        });

        fetch();

        return {
            error,
            isFinalized,
            isLoading,
            loaderRef,
            asset,
            finalise,
            loader,
            canFinalize,
            isOnPremise,
            restart,
            resetKafka,
            harvester,
            expiredRetrieveUntilDate,
            isKafka,
            isSuspended,
            showFinalizeModal,
            wizardActions,
            hasEncryption,
        };
    },
});
