/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.kafka;

import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;

public class KafkaLogSerializationSchema
implements KafkaSerializationSchema<SinkRecord> {
    private static final long serialVersionUID = 1L;
    private final String topic;
    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogChangelogMode changelogMode;

    public KafkaLogSerializationSchema(String topic, @Nullable SerializationSchema<RowData> primaryKeySerializer, SerializationSchema<RowData> valueSerializer, CoreOptions.LogChangelogMode changelogMode) {
        this.topic = topic;
        this.primaryKeySerializer = primaryKeySerializer;
        this.valueSerializer = valueSerializer;
        this.changelogMode = changelogMode;
        if (changelogMode == CoreOptions.LogChangelogMode.UPSERT && primaryKeySerializer == null) {
            throw new IllegalArgumentException("Can not use upsert changelog mode for non-pk table.");
        }
    }

    @Override
    public void open(SerializationSchema.InitializationContext context) throws Exception {
        if (this.primaryKeySerializer != null) {
            this.primaryKeySerializer.open(context);
        }
        this.valueSerializer.open(context);
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(SinkRecord element, @Nullable Long timestamp) {
        RowKind kind = element.row().getRowKind();
        byte[] primaryKeyBytes = null;
        byte[] valueBytes = null;
        if (this.primaryKeySerializer != null) {
            primaryKeyBytes = this.primaryKeySerializer.serialize((Object)element.primaryKey());
            if (this.changelogMode == CoreOptions.LogChangelogMode.ALL || kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
                valueBytes = this.valueSerializer.serialize((Object)element.row());
            }
        } else {
            valueBytes = this.valueSerializer.serialize((Object)element.row());
        }
        return new ProducerRecord<byte[], byte[]>(this.topic, element.bucket(), primaryKeyBytes, valueBytes);
    }
}

