import {IKafkaValues, ITriggerService} from '@core/types/Test.types';
import Validator from '@core/utils/Validator';
import KafkaRequest from '@core/models/KafkaRequest.model';
import {Attributes} from '../../constants/SpanAttribute.constants';

const KafkaTriggerService = (): ITriggerService => ({
  async validateDraft(draft) {
    const {brokerUrls, topic, messageValue} = draft as IKafkaValues;

    const isValid = Validator.required(brokerUrls) && Validator.required(topic) && Validator.required(messageValue);

    return isValid;
  },
  async getRequest(values) {
    const {brokerUrls, topic, authentication, sslVerification, headers, messageKey, messageValue} =
      values as IKafkaValues;
    const parsedHeaders = headers.filter(({key}) => key);

    return KafkaRequest({
      brokerUrls,
      topic,
      authentication,
      sslVerification,
      headers: parsedHeaders,
      messageKey,
      messageValue,
    });
  },

  getInitialValues(request) {
    const {brokerUrls, topic, authentication, sslVerification, headers, messageKey, messageValue} =
      request as KafkaRequest;

    return {
      brokerUrls,
      topic,
      authentication,
      sslVerification,
      headers,
      messageKey,
      messageValue,
    };
  },

  getParsedDefinition() {
    return {
      assets: [],
    };
  },

  getRequestFromSpan(span) {
    const topic =
      span.attributes[Attributes.MESSAGING_DESTINATION_NAME]?.value ||
      span.attributes[Attributes.MESSAGING_DESTINATION]?.value ||
      '';
    const messageKey = span.attributes['messaging.kafka.message.key']?.value ?? `\${var:MESSAGE_KEY}`;
    const brokerUrls = [`\${var:BROKER_URL}`];
    return KafkaRequest({
      brokerUrls,
      messageKey,
      topic,
    });
  },
});

export default KafkaTriggerService();
