




























































































































































































































































































































import { FormBlock, LoadingCloneModal, Scrollbar } from '@/app/components';
import { useAxios, useFeatureFlags } from '@/app/composable';
import { useRoute } from '@/app/composable/router';
import store from '@/app/store';
import { maxLengthValidator, requiredValidator } from '@/app/validators';
import { AccessPolicy } from '@/modules/access-policy/components';
import { AuthzResourceType } from '@/modules/access-policy/constants';
import { AccessLevel, AccessLevelsExtensiveOptions } from '@/modules/access-policy/constants/access-levels.constants';
import { GeneralPolicy } from '@/modules/access-policy/models';
import { CheckIcon, ChevronLeftIcon, XIcon } from '@vue-hero-icons/outline';
import { computed, defineComponent, onMounted, onUnmounted, ref } from '@vue/composition-api';
import { OrbitSpinner } from 'epic-spinners';
import * as R from 'ramda';
import { ValidationObserver, ValidationProvider, extend } from 'vee-validate';
import { ApolloAPI, RunnerAPI } from '../api';
import { ExecutionLocation, HarvesterBlockId, PreprocessingBlockId, WorkflowStatus } from '../constants';
import { ApolloPipeline, ApolloTask, TaskOption } from '../types';

extend('required', requiredValidator);
extend('max', maxLengthValidator);

export default defineComponent({
    name: 'ConfigurePipeline',
    components: {
        FormBlock,
        OrbitSpinner,
        ValidationObserver,
        ValidationProvider,
        Scrollbar,
        AccessPolicy,
        LoadingCloneModal,
        XIcon,
        CheckIcon,
        ChevronLeftIcon,
    },
    metaInfo() {
        return {
            title: `${(this as any).actionText} Data Check-in Pipeline`,
        };
    },
    props: {
        id: {
            type: String,
            required: false,
        },
        backTo: {
            type: String,
            default: 'data-checkin-jobs',
        },
    },

    setup(props, { root }) {
        const route = useRoute();
        const { flag, isEnabled: isFeatureEnabled } = useFeatureFlags();
        const pipelineRef = ref<any>(null);
        const id: string | null = props.id ?? null;
        const accessLevel = ref<any>(AccessLevel.Private); //default
        const accessPolicies = ref<any>({ generalPolicy: GeneralPolicy.DENY_ALL, policies: [] });
        const loadingCloning = ref<boolean>(false);
        const isOnPremiseRunnerEnabled = flag('on-premise');

        const user = computed(() => store.state.auth.user);
        const isNew = computed<boolean>(() => props.id === undefined);
        const isClone = computed<boolean>(() => route.name === 'data-checkin-jobs:clone');
        const isUpdate = computed<boolean>(() => !isNew.value && !isClone.value);
        const actionText = computed(() => (isClone.value ? 'Clone' : isNew.value ? 'Create' : 'Update'));

        const pipeline = ref<ApolloPipeline>({
            name: '',
            description: '',
            runnerId: null,
            policies: [],
            tasks: [],
            accessLevel: accessLevel.value,
        });
        const runners = ref<any[]>([]);
        const selectedExecutionLocation = ref<string>(ExecutionLocation.Cloud);
        const selectedHarvester = ref<HarvesterBlockId | null>(null);
        const selectedPreprocessingMethods = ref<any[]>(['dc.Loader']);
        const selectedRunner = ref<any>(null);
        const previousAccessLevel = ref<string | null>(null);
        const previousAccessPolicies = ref<any>([]);
        const { exec, loading, error } = useAxios(true);

        const runnerId = computed(() =>
            selectedExecutionLocation.value === ExecutionLocation.Local ? selectedRunner.value : null,
        );
        const isOnPremise = computed(() => selectedExecutionLocation.value === ExecutionLocation.Local);
        const isUserJobCreator = computed(() => user.value.id === pipeline.value.createdById);
        const isFileUpload = computed(() => selectedHarvester.value === HarvesterBlockId.File);
        const isLargeFiles = computed(() => selectedHarvester.value === HarvesterBlockId.LargeFiles);
        const isDatabaseHarvester = computed(() =>
            selectedHarvester.value
                ? [HarvesterBlockId.BigQuery, HarvesterBlockId.SQL, HarvesterBlockId.SAPHana].includes(
                      selectedHarvester.value,
                  )
                : false,
        );
        const isOtherFile = computed(() => selectedHarvester.value === HarvesterBlockId.OtherFile);

        const executionLocationOptions = computed<any>(() => [
            {
                title: 'Cloud Execution',
                description: 'The data check-in pipeline will run on the cloud.',
                value: ExecutionLocation.Cloud,
                disabled: isClone.value ? !isFileUpload.value && !isDatabaseHarvester.value : !isNew.value,
            },
            {
                title: 'On-Premise Execution',
                description: 'The data check-in pipeline will run on-premise through a registered runner.',
                value: ExecutionLocation.Local,
                disabled:
                    (isClone.value ? !isFileUpload.value && !isDatabaseHarvester.value : !isNew.value) ||
                    !runners.value.length,
            },
        ]);

        const harvestingOptions = computed<TaskOption[]>(() => [
            {
                title: 'File Upload',
                description: 'Direct file upload (CSV, JSON, XML, PARQUET)',
                blockId: HarvesterBlockId.File,
                show: isNew.value || selectedHarvester.value === HarvesterBlockId.File,
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: 'Other Files Upload',
                description: 'Direct file upload of any format without further processing',
                blockId: HarvesterBlockId.OtherFile,
                show: !isOnPremise.value && (isNew.value || selectedHarvester.value === HarvesterBlockId.OtherFile),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Very Large Files Upload through the platform's storage",
                description:
                    'Upload one or multiple very large files (in csv or parquet format) through the dedicated S3 storage provided by the platform',
                blockId: HarvesterBlockId.LargeFiles,
                show:
                    isFeatureEnabled('harvester.large-files') &&
                    !isOnPremise.value &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.LargeFiles),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Platform's Kafka Data Ingestion",
                description: 'Publish streaming data in the Kafka service provided by the Platform',
                blockId: HarvesterBlockId.Kafka,
                show:
                    isFeatureEnabled('harvester.platform-kafka') &&
                    !isOnPremise.value &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.Kafka),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Platform's MQTT Data Ingestion",
                description: 'Publish streaming data in the MQTT service provided by the Platform',
                blockId: HarvesterBlockId.MQTT,
                show:
                    isFeatureEnabled('harvester.platform-mqtt') &&
                    !isOnPremise.value &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.MQTT),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Data Provider's Kafka Data Ingestion",
                description: 'Collect streaming data from the Kafka service available to the data provider',
                blockId: HarvesterBlockId.ExternalKafka,
                show:
                    (isFeatureEnabled('harvester.kafka') && !isOnPremise.value && isNew.value) ||
                    selectedHarvester.value === HarvesterBlockId.ExternalKafka,
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Data Provider's MQTT Data Ingestion",
                description: 'Collect streaming data from the MQTT service available to the data provider',
                blockId: HarvesterBlockId.ExternalMQTT,
                show:
                    isFeatureEnabled('harvester.mqtt') &&
                    !isOnPremise.value &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.ExternalMQTT),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Data Provider's Available API",
                description:
                    'Collect data from the APIs provided by applications and systems of the data provider or from open APIs',
                blockId: HarvesterBlockId.Api,
                show:
                    isFeatureEnabled('harvester.api') &&
                    !isOnPremise.value &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.Api),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: "Platform's API",
                description: "Upload data to the Platform's APIs",
                blockId: HarvesterBlockId.InternalApi,
                show:
                    isFeatureEnabled('harvester.platform-api') &&
                    !isOnPremise.value &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.InternalApi),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: 'Direct Retrieval from a Data Provider’s BigQuery Data Warehouse',
                description: 'Connect to a data provider’s BigQuery data warehouse and collect the selected data',
                blockId: HarvesterBlockId.BigQuery,
                show:
                    isFeatureEnabled('harvester.big-query') &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.BigQuery),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: 'Direct Retrieval from a Data Provider’s SQL Database',
                description:
                    'Connect to a data provider’s SQL database, e.g. MS SQL Server, PostgreSQL, MySQL, MariaDB, and collect the selected data',
                blockId: HarvesterBlockId.SQL,
                show:
                    isFeatureEnabled('harvester.sql') &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.SQL),
                disabled: isUpdate.value || isClone.value,
            },
            {
                title: 'Direct Retrieval from a Data Provider’s SAP Hana Database',
                description: 'Connect to a data provider’s SAP Hana database instance or data lake',
                blockId: HarvesterBlockId.SAPHana,
                show:
                    isFeatureEnabled('harvester.sap-hana') &&
                    (isNew.value || selectedHarvester.value === HarvesterBlockId.SAPHana),
                disabled: isUpdate.value || isClone.value,
            },
        ]);

        const preprocessingOptions = computed<TaskOption[]>(() => [
            {
                title: 'Mapping',
                description: 'Map your data to the common data model',
                blockId: PreprocessingBlockId.Mapping,
                show: !isOtherFile.value,
                disabled: true,
            },
            {
                title: 'Cleaning',
                description: 'Detect and address data quality issues, such as corrupt or inaccurate records',
                blockId: PreprocessingBlockId.Cleaning,
                show: !isOtherFile.value,
                disabled: isUpdate.value,
            },
            {
                title: 'Anonymisation',
                description: 'Anonymise your data to prevent an individual from being identified',
                blockId: PreprocessingBlockId.Anonymisation,
                show:
                    isFeatureEnabled('anonymisation') &&
                    (isFileUpload.value || isLargeFiles.value || (isDatabaseHarvester.value && !isOnPremise.value)),
                disabled: isUpdate.value,
            },
            {
                title: 'Encryption',
                description: 'Encrypt your data to make it unreadable to a person or entity without permission',
                blockId: PreprocessingBlockId.Encryption,
                show: isFeatureEnabled('encryption') && !isOtherFile.value && isOnPremise.value,
                disabled: isUpdate.value,
            },
            {
                title: 'Loader',
                description: 'Load the processed data to the data storage',
                blockId: PreprocessingBlockId.Loader,
                show: true,
                disabled: true,
            },
        ]);

        // reset selected runner in case it does not exist
        const resetRunner = () => {
            if (selectedRunner.value && !runners.value.find((runner: any) => runner.id === selectedRunner.value))
                selectedRunner.value = null;
        };

        const changeExecutionLocation = () => {
            selectedRunner.value = null;

            if (!isNew.value) return;

            selectedPreprocessingMethods.value = ['dc.Loader'];
            selectedHarvester.value = null;

            if (isOnPremise.value) selectedPreprocessingMethods.value.push('dc.DataMapper');
        };

        const changeHarvestingMethod = () => {
            if (!isOtherFile.value) selectedPreprocessingMethods.value = ['dc.DataMapper', 'dc.Loader'];
            else selectedPreprocessingMethods.value = ['dc.Loader'];
        };

        const goBackTo = () => {
            if (props.backTo === 'data-checkin-jobs')
                root.$router.push({ name: props.backTo, query: store.state.queryParams.jobs });
            else if (props.backTo === 'assets')
                root.$router.push({ name: props.backTo, query: store.state.queryParams.assets });
            else root.$router.push({ name: props.backTo });
        };

        // Load pipeline (if an id is provided)
        if (id) {
            exec(ApolloAPI.get(id))
                .then((res: any) => {
                    pipeline.value = res.data;

                    // in case of pipeline cloning reset policies
                    if (isClone.value) {
                        pipeline.value.policies = [];
                    }

                    if (!isClone.value && pipeline.value.status === WorkflowStatus.Suspended) {
                        (root as any).$toastr.w(
                            `This data check-in pipeline is suspended as its output asset (dataset) was deleted. You need to edit the Loader step and configure a new output asset in order to be able to use this pipeline again`,
                            'Warning',
                        );
                        goBackTo();
                    }

                    accessLevel.value = isClone.value ? AccessLevel.Private : pipeline.value.accessLevel;
                    previousAccessLevel.value = pipeline.value.accessLevel;
                    previousAccessPolicies.value = pipeline.value.policies;
                    selectedRunner.value = pipeline.value.runnerId;
                    selectedExecutionLocation.value = selectedRunner.value
                        ? ExecutionLocation.Local
                        : ExecutionLocation.Cloud;
                    selectedHarvester.value = pipeline.value.tasks.find(
                        (task: ApolloTask) => !task.upstreamTaskIds.length,
                    )?.blockId as HarvesterBlockId;

                    selectedPreprocessingMethods.value = pipeline.value.tasks
                        .filter((task: ApolloTask) => task.upstreamTaskIds.length)
                        .map((task: ApolloTask) => task.blockId);

                    resetRunner();
                })
                .catch((e) => {
                    if (e.response) {
                        switch (e.response.status) {
                            case 404:
                                (root as any).$toastr.e('The Data Check-in Pipeline was not found', 'Error');
                                break;
                            case 403:
                                (root as any).$toastr.e('Access to the Data Check-in Pipeline is forbidden', 'Error');
                                break;
                            default:
                                (root as any).$toastr.e('Retrieving Data Check-in Pipeline failed', 'Error');
                        }
                    }
                    goBackTo();
                });
        }

        if (isOnPremiseRunnerEnabled.value)
            // Load registered runners
            exec(RunnerAPI.all()).then((response: any) => {
                runners.value = response.data;
                resetRunner();
            });

        const validForms = ref<any>({
            accessLevelDetails: false,
        });

        const submitForms = () => {
            validForms.value = {
                accessLevelDetails: false,
            };
            (root as any).$formulate.submit('accessLevelDetails');
        };

        const formSubmitted = async (name: string) => {
            validForms.value[name] = true;
            await saveChanges();
        };

        const isFormulateFormsValid = computed(() =>
            Object.values(validForms.value).every((value: any) => value === true),
        );

        const saveChanges = async () => {
            const valid = await pipelineRef.value.validate();

            if (valid && isFormulateFormsValid.value) {
                // keep previous access level and policies before updates
                previousAccessLevel.value = R.clone(pipeline.value.accessLevel);
                previousAccessPolicies.value = R.clone(pipeline.value.policies);

                const availablePreprocessingMethods = preprocessingOptions.value.filter(
                    (option: TaskOption) => option.show,
                );

                const newJob: any = {
                    ...pipeline.value,
                    enabledTasks: [
                        selectedHarvester.value,
                        ...R.pluck('blockId', availablePreprocessingMethods).filter((blockId: string) =>
                            selectedPreprocessingMethods.value.includes(blockId),
                        ),
                    ],
                    runnerId: runnerId.value,
                    accessLevel: accessLevel.value,
                    newPolicies: accessPolicies.value?.policies?.add,
                };

                if (id) {
                    if (isClone.value) {
                        loadingCloning.value = true;
                        try {
                            await exec(ApolloAPI.clone(id, newJob));
                            loadingCloning.value = false;
                        } catch (e: any) {
                            if (e?.response?.status && e.response.status === 400) {
                                error.value = e;
                            }
                            loadingCloning.value = false;
                        }
                    } else {
                        pipeline.value.accessLevel = accessLevel.value;
                        const policies = {
                            newPolicies: accessPolicies.value.policies.add,
                            removePolicies: accessPolicies.value.policies.remove,
                        };

                        try {
                            await exec(ApolloAPI.update(id, { ...pipeline.value, ...policies } as any));
                        } catch (e: any) {
                            if (e?.response?.status && e.response.status === 400) {
                                error.value = e;
                                // on error reset pipeline's access level and policies
                                pipeline.value.accessLevel = previousAccessLevel.value as AccessLevel;
                                pipeline.value.policies = previousAccessPolicies.value;
                            }
                        }
                    }
                } else {
                    await exec(ApolloAPI.create(newJob));
                }

                if (!error.value) goBackTo();
            }
        };

        const lockDcj = async (pipelineId: string) => {
            exec(ApolloAPI.lock(pipelineId))
                .then(() => {
                    return;
                })
                .catch((e: { response: { status: any } }) => {
                    if (e.response) {
                        switch (e.response.status) {
                            case 403:
                                (root as any).$toastr.e(
                                    'The Data Check-in Pipeline is locked by another user',
                                    'Error',
                                );
                                break;
                            default:
                                (root as any).$toastr.e('Retrieving Data Check-in Pipeline failed', 'Error');
                        }
                    }
                    goBackTo();
                });
        };

        const customError = computed<{ title: string; message: string | { assetId: number } }>(() => {
            if (error.value?.response?.status === 403) {
                return {
                    title: 'Access Forbidden!',
                    message: 'You do not have access to edit the specific data checkin pipeline',
                };
            } else if (
                error.value?.response?.status === 400 &&
                error.value.response?.data?.message === 'Access Policy Restriction'
            ) {
                return {
                    title: 'Access Policy Restriction!',
                    message: {
                        assetId: pipeline.value.provenanceAssetIds && pipeline.value.provenanceAssetIds[0],
                    },
                };
            }

            return { title: 'An error has occurred!', message: error.value?.response?.data?.message };
        });

        const unlockJob = async () => {
            if (!isNew.value && !isClone.value) await exec(ApolloAPI.unlock(props.id as string));
        };

        onMounted(async () => {
            if (!isNew.value && !isClone.value) await lockDcj(props.id as string);
            window.addEventListener('beforeunload', unlockJob);
        });

        onUnmounted(async () => {
            unlockJob();
        });

        return {
            ExecutionLocation,
            executionLocationOptions,
            harvestingOptions,
            preprocessingOptions,
            selectedExecutionLocation,
            selectedHarvester,
            selectedPreprocessingMethods,
            actionText,
            goBackTo,
            error,
            isNew,
            isUpdate,
            pipeline,
            pipelineRef,
            loading,
            saveChanges,
            submitForms,
            runners,
            runnerId,
            selectedRunner,
            isOnPremise,
            isClone,
            AccessLevelsExtensiveOptions,
            accessLevel,
            AccessLevel,
            user,
            accessPolicies,
            isUserJobCreator,
            formSubmitted,
            previousAccessLevel,
            customError,
            loadingCloning,
            AuthzResourceType,
            isFileUpload,
            isOnPremiseRunnerEnabled,
            changeExecutionLocation,
            changeHarvestingMethod,
        };
    },
});
