I am publishing kafka messages in batch using produce method. In case message gets failed to publish, I am storing the entire batch in database table. Currently, the issue I am facing is that eg. if there are 1000 messages in a batch then these 1000 messages are stored 1000 times in database.Can someone help me to figure out how can store that 1000 number of messages just one time in database ? Note: I don't want to use await in this method.
public bool PublishToKafka(List<GenericData> recordList)
{
Task.Run(async () =>
{
schemaId = await _cachedSchemaRegistryClient.RegisterSchemaAsync(_kafkatable.KafkaTopic + "-value", GenericData._SCHEMA.ToString()).ConfigureAwait(false);
}).GetAwaiter().GetResult();
var serializer = new AvroSerializer<GenericData>(_cachedSchemaRegistryClient, _avroSerializerConfig).AsSyncOverAsync();
using (IProducer<Null, GenericData> producer = new ProducerBuilder<Null, GenericData>(_producerConfig).SetValueSerializer(serializer).Build())
{
try
{
int avroRecordsCount = 0;
foreach (GenericData genericData in recordList)
{
avroRecordsCount++;
producer.Produce(_kafkatable.KafkaTopic, new Message<Null, GenericData>
{
Value = genericData,
Headers = new Headers { new Header(SchemaIdHeader, BitConverter.GetBytes(schemaId)) }
}, deliveryReport =>
{
if (deliveryReport.Status == PersistenceStatus.NotPersisted)
{
Logs.Export.Err("Failed message delivery: error reason: {0} ", deliveryReport.Error.Reason);
byte[] bytes = ToBytes(recordList);
StoreClass.StoreData(bytes);
}
});
producer.Flush();
}
return true;
}
}
return false;
}