Here’s a comprehensive example for a Kafka
integration with a Golang
service that includes a Producer
, a Consumer Group
, error and success channel handling, and logging with Uber’s Zap
(Popular for high-performance, structured JSON logging, often used in production). The example includes configuration, retry logic
, metadata
, and logging
, while minimizing message loss with Kafka best practices.
To prepare for this section you can refer to Kafka Quick Setup and Common Kafka Commands
Project Structure
kafka-example/
├── config/
│ ├── config.go # Parsed config from YAML
│ ├── config.yaml # YAML config file
├── constant/
│ ├── constant.go # App constants
├── consumer/
│ ├── main.go # Entry point for the consumer ├── kafka/
│ ├── kafka_consumer_group.go # Consumer Group implementation
│ ├── kafka_producer.go # Producer implementation
├── listener/
│ ├── listener.go # Async Producer listener implementation
├── logger/
│ ├── logger.go # Zap logger configuration
├── logs/ # Folder to store generated log files
├── producer/
│ ├── main.go # Entry point for the producer ├── go.mod
├── go.sum
1. Configuration (config/config.yaml
)
The configuration file will store Kafka credentials, log rotation settings, and producer/consumer options.
kafka: brokers: - "localhost:29092" username: "dev-user" password: "dev-password" topic: "latestMsgToRedis" retries: 10 producer_return_successes: true
log: rotation_size: 50 # 50MB rotation_count: 7 # 7 days level: "info"
2. Config Parsing (config/config.go
)
These structs read the configuration from config.yaml
:
package config import ( "log" "os" "gopkg.in/yaml.v3"
) type KafkaConfig struct { Brokers []string `yaml:"brokers"` Username string `yaml:"username"` Password string `yaml:"password"` Topic string `yaml:"topic"` Retries int `yaml:"retries"` ProducerReturnSuccesses bool `yaml:"producer_return_successes"`
} type LogConfig struct { RotationSize int `yaml:"rotation_size"` RotationCount int `yaml:"rotation_count"`
} type Config struct { Kafka KafkaConfig `yaml:"kafka"` Log LogConfig `yaml:"log"`
} func LoadConfig(configPath string) (*Config, error) { _, err := os.Stat(configPath) if os.IsNotExist(err) { log.Fatalf("Config file does not exist: %v", err) } file, err := os.Open(configPath) if err != nil { return nil, err } defer file.Close() var cfg Config decoder := yaml.NewDecoder(file) if err := decoder.Decode(&cfg); err != nil { return nil, err } return &cfg, nil
}
3. Constant (constant/constant.go
)
package constant // Define custom key types to avoid key collisions
type ContextKey string const ( OperationID ContextKey = "operationID" // For tracking, debugging OpUserID ContextKey = "opUserID" // For indentifying user accross micro services
)
4. Logger Setup (logger/logger.go
)
Configuring Uber Zap for file logging with rotation:
package logger import ( "fmt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "gopkg.in/natefinch/lumberjack.v2"
) // NewLogger initializes a new zap.Logger with log rotation settings
func NewLogger(processID string, rotationSize int, rotationCount int) *zap.Logger { // Configure lumberjack to handle log rotation by size and age w := zapcore.AddSync(&lumberjack.Logger{ Filename: fmt.Sprintf("./logs/%s.log", processID), // Log file path based on processID MaxAge: rotationCount, // Number of days to retain old log files MaxSize: rotationSize, // Rotate log when it reaches rotationSize MB }) // Set up the core logging configuration core := zapcore.NewCore( zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), // Use JSON format for log entries w, // Set log writer with rotation settings zapcore.InfoLevel, // Set minimum log level to Info ) // Return the logger with caller information enabled return zap.New(core, zap.AddCaller())
}
5. Kafka Producer Inplementation (kafka/kafka_producer.go
)
package kafka import ( "context" "errors" "fmt" "kafka-example/config" "kafka-example/constant" "time" "github.com/IBM/sarama" "go.uber.org/zap"
) type MProducer struct { // producer sarama.AsyncProducer producer sarama.SyncProducer topic string logger *zap.Logger config *config.KafkaConfig
} func NewProducer(cfg *config.KafkaConfig, topic string, log *zap.Logger) (*MProducer, error) { saramaConfig := sarama.NewConfig() // The total number of times to retry sending a message (default 3) // the producer will stop retrying to send the message after 5 failed attempts. // This means the message could be dropped if it hasn't successfully been sent after these retries, potentially resulting in message loss unless other safeguards (like error handling or dead-letter queues) are in place. saramaConfig.Producer.Retry.Max = 5 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration key. saramaConfig.Producer.RequiredAcks = sarama.WaitForAll // Setting saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner configures the Kafka producer to use a hash-based partitioner for determining which partition a message should go to. // This partitioner applies a hash function to the message key, ensuring messages with the same key consistently go to the same partition. This is useful for maintaining ordering for specific keys, as all messages with that key will always be sent to the same partition. // When sending a message, we must specify the key value of the message. If there is no key, the partition will be selected randomly saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner // In sarama.SyncProducer, setting Producer.Return.Successes = true is required to receive message acknowledgments after successful sends. // Without this, SyncProducer won’t wait for broker acknowledgment, making it impossible to return partition and offset information for sent messages. Setting this option ensures that SendMessage can confirm successful delivery with metadata, enhancing reliability. saramaConfig.Producer.Return.Successes = true if cfg.Username != "" && cfg.Password != "" { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = cfg.Username saramaConfig.Net.SASL.Password = cfg.Password } // Following only for working with AsyncProducer, where we handle Errors and Succcess asynchronously // saramaConfig.Producer.Return.Errors = true // saramaConfig.Producer.Return.Successes = cfg.Kafka.ProducerReturnSuccesses // ListenAsyncProducerStatus(asyncProcuder,log) var prod sarama.SyncProducer var err error for i := 0; i <= cfg.Retries; i++ { // prod, err := sarama.NewAsyncProducer(cfg.Kafka.Brokers, saramaConfig) prod, err = sarama.NewSyncProducer(cfg.Brokers, saramaConfig) if err == nil { break } else { log.Error("Failed to create producer", zap.Int("tryTime", i), zap.Error(err)) } time.Sleep(time.Duration(1) * time.Second) } if err != nil { log.Error("Failed to create producer after many tries", zap.Error(err)) return nil, err } log.Info("Success to create producer") // The main differences between sarama.SyncProducer and sarama.AsyncProducer are: // Message Delivery Mechanism: // SyncProducer: Sends messages synchronously. Each SendMessage call waits for the broker’s acknowledgment, making it blocking and ensuring delivery order. // AsyncProducer: Sends messages asynchronously through channels (Input() for messages, Errors() for errors, and optionally Successes() for successful deliveries). It’s non-blocking and faster for high-throughput needs. // Use Cases: // SyncProducer: Ideal for low-throughput scenarios where message delivery guarantees and ordering are critical. // AsyncProducer: Suitable for high-throughput applications where latency is prioritized, and managing message acknowledgment and error handling is feasible. return &MProducer{producer: prod, topic: topic, config: cfg, logger: log}, nil
} // Send context Data between producer consumers via Header
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { operationID, ok := ctx.Value(constant.OperationID).(string) if !ok { err := errors.New("ctx missing operationID") return nil, err } opUserID, ok := ctx.Value(constant.OpUserID).(string) if !ok { err := errors.New("ctx missing userID") return nil, err } return []sarama.RecordHeader{ {Key: []byte(constant.OperationID), Value: []byte(operationID)}, {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, }, nil
} func (p *MProducer) SendMessage(ctx context.Context, key, msgValue string) error { header, err := GetMQHeaderWithContext(ctx) if err != nil { p.logger.Error("Failed to get Header", zap.Error(err)) } kafkaMsg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(msgValue), Headers: header, } partition, offset, err := p.producer.SendMessage(kafkaMsg) if err != nil { p.logger.Error("Failed to send message", zap.Error(err)) return err } fmt.Println("[Message Sent] ", "topic:", p.topic, " - key:", key, " - msg:", msgValue, " - partition:", partition, " - offset:", offset) // Logging message sent // p.logger.Info("Message sent", // zap.String("topic", p.topic), // zap.String("key", key), // zap.String("msg", msgValue), // zap.Int32("partition", partition), // zap.Int64("offset", offset), // ) return nil
} func (p *MProducer) Close() error { return p.producer.Close()
}
5. Producer Entry Point (producer/main.go
)
The producer sends a message every 3 seconds and includes headers like OperationID
and UserID
for management.
package main import ( "context" "fmt" "math/rand" "kafka-example/config" "kafka-example/constant" "kafka-example/kafka" "kafka-example/logger" "time" "go.uber.org/zap"
) func startProducer(ctx context.Context, cfg *config.Config, log *zap.Logger) error { producer, err := kafka.NewProducer(&cfg.Kafka, cfg.Kafka.Topic, log) if err != nil { return err } defer producer.Close() ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() // Send Messages: A loop that sends messages to Kafka every 3 seconds. for counter := 1; ; counter++ { <-ticker.C producer.SendMessage(ctx, fmt.Sprintf("msg-key-%d", counter), fmt.Sprintf("Counter message %d", counter)) }
} func main() { cfg, _ := config.LoadConfig("config/config.yaml") log := logger.NewLogger("producer", cfg.Log.RotationSize, cfg.Log.RotationCount) ctx := context.Background() opID := fmt.Sprintf("op-%d", rand.Intn(1000)) ctx = context.WithValue(ctx, constant.OperationID, opID) ctx = context.WithValue(ctx, constant.OpUserID, "user-396") if err := startProducer(ctx, cfg, log); err != nil { log.Fatal("Failed to start producer", zap.Error(err)) }
}
6. Kafka Consumer Group Inplementation (kafka/kafka_consumer_group.go
)
package kafka import ( "context" "kafka-example/config" "kafka-example/constant" "time" "github.com/IBM/sarama" "go.uber.org/zap"
) type MConsumerGroup struct { config *config.KafkaConfig topic string group sarama.ConsumerGroup logger *zap.Logger
} func NewConsumerGroup(cfg *config.KafkaConfig, topic string, groupId string, consumerId string, logger *zap.Logger) (*MConsumerGroup, error) { saramaConfig := sarama.NewConfig() // OffsetOldest stands for the oldest offset available on the broker for a partition. // We can send this to a client's GetOffset method to get this offset, or when calling ConsumePartition to start consuming from the oldest offset that is still available on the broker. saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest // If enabled, any errors that occurred while consuming are returned on the Errors channel (default disabled). saramaConfig.Consumer.Return.Errors = true // Setting saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} specifies how Kafka partitions are assigned to consumers within a consumer group. // The Range strategy (NewBalanceStrategyRange) divides partitions among consumers by assigning consecutive partitions to each consumer. // This ensures a balanced distribution of partitions, especially when the number of partitions is divisible by the number of consumers. This strategy is often used to maintain a predictable partition assignment. // Alternative strategies, like RoundRobin, distribute partitions more evenly in cases with mismatched partition-consumer counts. saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} if cfg.Username != "" && cfg.Password != "" { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = cfg.Username saramaConfig.Net.SASL.Password = cfg.Password } group, err := sarama.NewConsumerGroup(cfg.Brokers, groupId, saramaConfig) if err != nil { logger.Error("Failed to create consumer group", zap.Error(err)) return nil, err } logger.Info("Success to create or connect to existed consumerGroup", zap.String("consumerID", consumerId)) // Handle errors in consumer group go func() { // Handle Errors: Listen for errors in the consumer group by checking the Errors() method on the consumer group session, which provides error events. for err := range group.Errors() { logger.Error("Consumer group error", zap.Error(err)) } }() return &MConsumerGroup{config: cfg, topic: topic, group: group, logger: logger}, nil
} func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { var values []string for _, recordHeader := range cMsg.Headers { values = append(values, string(recordHeader.Value)) } mapper := []constant.ContextKey{constant.OperationID, constant.OpUserID} ctx := context.Background() for i, value := range values { ctx = context.WithValue(ctx, mapper[i], value) } return ctx
} func (c *MConsumerGroup) RegisterHandlerAndConsumeMessages(ctx context.Context, handler sarama.ConsumerGroupHandler) { defer c.group.Close() for { if err := c.group.Consume(ctx, []string{c.topic}, handler); err != nil { c.logger.Error("Error consuming messages", zap.Error(err)) time.Sleep(2 * time.Second) // retry delay } }
} func (c *MConsumerGroup) Close() error { return c.group.Close()
}
7. Kafka Consumer Entry Point (consumer/main.go
)
Each consumer instance uses a unique clientID
and joins a common consumer group
.
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "kafka-example/config" "kafka-example/kafka" "kafka-example/logger" "syscall" "time" "github.com/IBM/sarama" "go.uber.org/zap"
) type ConsumerGroupHandler struct { clientID string Logger *zap.Logger consumerGroup *kafka.MConsumerGroup
} func (handler ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (handler ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (handler ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { ctx := handler.consumerGroup.GetContextFromMsg(msg) fmt.Println("[Message Recieved] ", " timeStamp:", msg.Timestamp.Format("2006-01-02 15:04:05"), "consumerId:", handler.clientID, "context:", ctx, " - topic:", msg.Topic, " - key:", string(msg.Key), " - msgValue:", string(msg.Value), " - partition:", msg.Partition, " - offset:", msg.Offset) // handler.Logger.Info("Message received", // zap.String("consumerId", handler.clientID), // zap.Any("context", ctx), // zap.String("topic", msg.Topic), // zap.ByteString("key", msg.Key), // zap.ByteString("value", msg.Value), // zap.Int32("partition", msg.Partition), // zap.Int64("offset", msg.Offset), // zap.Time("timestamp", msg.Timestamp), // ) session.MarkMessage(msg, "") } return nil
} func startConsumer(ctx context.Context, cfg *config.Config, log *zap.Logger) error { clientID := fmt.Sprintf("consumer-%d", rand.Intn(1000)) group, err := kafka.NewConsumerGroup(&cfg.Kafka, cfg.Kafka.Topic, "my-consumer-group", clientID, log) if err != nil { return err } defer group.Close() handler := ConsumerGroupHandler{Logger: log, clientID: clientID, consumerGroup: group} group.RegisterHandlerAndConsumeMessages(ctx, handler) return nil
} func main() { cfg, _ := config.LoadConfig("config/config.yaml") log := logger.NewLogger("consumer", cfg.Log.RotationSize, cfg.Log.RotationCount) // Start Consumer in Background ctx, cancel := context.WithCancel(context.Background()) go func() { if err := startConsumer(ctx, cfg, log); err != nil { log.Fatal("Failed to start consumer", zap.Error(err)) } }() // Handle graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan cancel() time.Sleep(2 * time.Second) log.Info("Shutting down gracefully")
}
8. Kafka Async Producer Listener (Optional) (listener/listener.go
)
The background jobs listens to Return.Errors
and Return.Successes
channels of Async Producer, logging message status.
package listener import ( "github.com/IBM/sarama" "go.uber.org/zap"
) // In case work with asyncProducer
func ListenAsyncProducerStatus(producer sarama.AsyncProducer, log *zap.Logger) { go func() { for err := range producer.Errors() { // Convert sarama.Encoder to []byte, then to string valueBytes, _ := err.Msg.Value.Encode() log.Error("Producer error", zap.Error(err.Err), zap.String("msg", string(valueBytes))) } }() go func() { for msg := range producer.Successes() { log.Info("Message acknowledged", zap.String("topic", msg.Topic), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset)) } }()
}
9. Run Producer & Comsumer
Run Producer
, check to see console log and auto generated logging files in logs
folder
go run producer/main.go
Run Comsumer
in another Terminal Window
go run consumer/main.go
This setup includes a reusable Kafka producer and consumer service with error handling, retry, logging to both file and console, and metadata (headers). Demonstrating a Kafka producer
and Consumer Group
integration using sarama
and logs withuber/zap
. The components are modular, making it easy to extend the functionality to new Kafka topics
and reuse the logger
setup.
If you found this helpful, let me know by leaving a 👍 or a comment!, or if you think this post could help someone, feel free to share it! Thank you very much! 😃