Skip to content

Commit

Permalink
Merge pull request #92 from utilitywarehouse/revert-91-migrate-off-bsm
Browse files Browse the repository at this point in the history
Revert "Change kafka source to use sarama ConsumerGroup API"
  • Loading branch information
povilasv authored Oct 4, 2019
2 parents 09d6067 + d0f458c commit cdb8f20
Showing 1 changed file with 101 additions and 52 deletions.
153 changes: 101 additions & 52 deletions kafka/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package kafka

import (
"context"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/utilitywarehouse/go-pubsub"
"golang.org/x/sync/errgroup"
)

var _ pubsub.MessageSource = (*messageSource)(nil)
Expand Down Expand Up @@ -61,59 +62,41 @@ func NewMessageSource(config MessageSourceConfig) pubsub.ConcurrentMessageSource
}

func (mq *messageSource) ConsumeMessages(ctx context.Context, handler pubsub.ConsumerMessageHandler, onError pubsub.ConsumerErrorHandler) error {
lock := &sync.Mutex{}
return mq.ConsumeMessagesConcurrently(ctx, func(message pubsub.ConsumerMessage) error {
lock.Lock()
defer lock.Unlock()

return handler(message)
}, onError)
}
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = mq.offset
config.Metadata.RefreshFrequency = mq.metadataRefreshFrequency
config.Consumer.Offsets.Retention = mq.offsetsRetention

// consumerGroupHandler is a handler wrapper for Sarama ConsumerGroupAPI
// sarama will make multiple concurrent calls to ConsumeClaim
type consumerGroupHandler struct {
ctx context.Context
handler pubsub.ConsumerMessageHandler
onError pubsub.ConsumerErrorHandler
errChan <-chan error
}
if mq.Version != nil {
config.Version = *mq.Version
}

// Setup creates consumer group session
func (h *consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
c, err := cluster.NewConsumer(mq.brokers, mq.consumergroup, []string{mq.topic}, config)
if err != nil {
return err
}

// Cleanup cleans up the consumer group session
func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
defer func() {
_ = c.Close()
}()

// ConsumeClaim calls out to pubsub handler
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
// message channel closed so bail cleanly
return nil
}

case msg := <-c.Messages():
message := pubsub.ConsumerMessage{Data: msg.Value}
err := h.handler(message)
err := handler(message)
if err != nil {
err = h.onError(message, err)
err = onError(message, err)
if err != nil {
return err
}
}
sess.MarkMessage(msg, "")

case err, ok := <-h.errChan:
if !ok {
// error channel closed so bail cleanly. If we don't do this we will kill the
// consumer every time there is a consumer rebalance
return nil
}
c.MarkOffset(msg, "")
case err := <-c.Errors():
return err

case <-h.ctx.Done():
case <-ctx.Done():
return nil
}
}
Expand All @@ -123,17 +106,18 @@ func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, cl
// in the context of Kafka this is done by providing a new routine by partition made available to
// the application by kafka at runtime
func (mq *messageSource) ConsumeMessagesConcurrently(ctx context.Context, handler pubsub.ConsumerMessageHandler, onError pubsub.ConsumerErrorHandler) error {
config := sarama.NewConfig()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = mq.offset
config.Metadata.RefreshFrequency = mq.metadataRefreshFrequency
config.Consumer.Offsets.Retention = mq.offsetsRetention
config.Group.Mode = cluster.ConsumerModePartitions

if mq.Version != nil {
config.Version = *mq.Version
}

c, err := sarama.NewConsumerGroup(mq.brokers, mq.consumergroup, config)
c, err := cluster.NewConsumer(mq.brokers, mq.consumergroup, []string{mq.topic}, config)
if err != nil {
return err
}
Expand All @@ -142,19 +126,84 @@ func (mq *messageSource) ConsumeMessagesConcurrently(ctx context.Context, handle
_ = c.Close()
}()

h := &consumerGroupHandler{
ctx: ctx,
handler: handler,
onError: onError,
errChan: c.Errors(),
}
pGroup, pContext := errgroup.WithContext(ctx)

err = c.Consume(ctx, []string{mq.topic}, h)
if err != nil {
return err
for {
select {
case part, ok := <-c.Partitions():
// partitions will emit a nil pointer (part) when the parent channel is tombed as
// the client is closed. ok will be false when the partitions channel is closed
// in both cases we want to wait for the errgroup to handle any errors correctly and
// gracefully close the subroutines. If either the part is nil or ok is false then
// we simply ignore it to give the errgroup and subroutines time to finish
if ok && part != nil {
pGroup.Go(newConcurrentMessageHandler(pContext, c, part, handler, onError))
}
case err := <-c.Errors():

if err == nil {
// our parent chanel was possibly closed due to context cancel, in which case
// we have stopped consuming but should wait for any errors returned from
// subroutines
return pGroup.Wait()
}
// tell the errgroup to cancel all running subroutines, were ignoring any other errors
// // at this point
pGroup.Wait()
// return the original consumer error
return err
case <-ctx.Done():
// main context was cancelled, our errgroup will also be cancelled so we should
// gracefully wait until subroutines finish just in case one returns an error
return pGroup.Wait()
case <-pContext.Done():
// gracefully wait for any error
return pGroup.Wait()
}
}
}

return nil
func newConcurrentMessageHandler(
ctx context.Context,
consumer *cluster.Consumer,
part cluster.PartitionConsumer,
handler pubsub.ConsumerMessageHandler,
onError pubsub.ConsumerErrorHandler) func() error {

return func() error {
for {
select {
case msg, ok := <-part.Messages():
if !ok {
// message channel closed so bail cleanly
return nil
}
message := pubsub.ConsumerMessage{Data: msg.Value}
err := handler(message)
if err != nil {
err = onError(message, err)
if err != nil {
// this error will trigger the errgroup to cancel its context, this will trigger
// a graceful shutdown of the consumer bubbling the error back up to the
// main partition loop
return err
}
}
consumer.MarkOffset(msg, "")
case err, ok := <-part.Errors():
if !ok {
// error chanel closed so bail cleanly. If we don't do this we will kill the
// consumer every time there is a consumer rebalance
return nil
}
return err
case <-ctx.Done():
// another routine has encountered an error, we should really stop
// processing any other partitions
return nil
}
}
}
}

// Status reports the status of the message source
Expand Down

0 comments on commit cdb8f20

Please sign in to comment.