“kafkajs” Réponses codées

kafkajs

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
	clientId: 'node-app',
	brokers: ['localhost:9092'],
	requestTimeout: 3000,
	connectionTimeout: 6000,
	ssl: false
})

exports.producer = async (eventName, data) => {
	const producer = kafka.producer()

	// event kafka producer notification
	await producer.on('producer.connect', () => console.info('producer kafka connected'))
	await producer.on('producer.disconnect', () => console.error('producer kafka disconnect'))
	await producer.on('producer.network.request_timeout', () => console.error('producer kafka network timeout'))

	await producer.connect()
	await producer.send({
		topic: eventName,
		messages: data,
		acks: true,
		compression: 1
	})

	await producer.disconnect()
}

exports.consumer = async (eventName, callback) => {
	const consumer = kafka.consumer({
		groupId: 'test-group',
		maxBytes: 1048576000, // 1GB
		maxBytesPerPartition: 1048576000, // 1GB
		sessionTimeout: 60000,
		heartbeatInterval: 6000,
		rebalanceTimeout: 30000
	})

	// event kafka consumer notification
	await consumer.on('consumer.connect', () => console.info('consumer kafka connected'))
	await consumer.on('consumer.disconnect', () => console.error('consumer kafka disconnect'))
	await consumer.on('consumer.crash', () => console.error('consumer kafka crash'))
	await consumer.on('consumer.stop', () => console.error('consumer kafka stop'))
	await consumer.on('consumer.network.request_timeout', () => console.error('consumer kafka network timeout'))

	await consumer.connect()
	await consumer.subscribe({ topic: eventName, fromBeginning: true })
	await consumer.run({ autoCommit: true, eachMessage: callback })
}
Restu Wahyu Saputra

kafkajs

// check here include example usage
// https://pastebin.com/Wu5hG6WK

import {
  Consumer,
  ConsumerConfig,
  ConsumerSubscribeTopics,
  EachBatchHandler,
  EachMessagePayload,
  Kafka as KafkaJs,
  KafkaConfig,
  Producer,
  ProducerBatch,
  ProducerConfig,
  ProducerRecord,
  Transaction
} from 'kafkajs'

interface ConsumerRunConfig {
  autoCommit?: boolean
  autoCommitInterval?: number | null
  autoCommitThreshold?: number | null
  eachBatchAutoResolve?: boolean
  partitionsConsumedConcurrently?: number
  eachBatch?: EachBatchHandler
}

interface SubscriberPayload {
  subscribeConfig: ConsumerSubscribeTopics
  consumerConfig: ConsumerConfig
  runConfig: ConsumerRunConfig
}

interface PublisherPayload {
  type: 'single' | 'multiple'
  sendConfig: ProducerRecord | ProducerBatch
  producerConfig?: ProducerConfig
}

interface PublisherTransactionPayload {
  type: 'single' | 'multiple'
  sendConfig: ProducerRecord | ProducerBatch
  producerConfig?: ProducerConfig
}

export class Kafka {
  private config: KafkaConfig
  private kafka: InstanceType<typeof KafkaJs>
  private producer: Producer
  private consumer: Consumer
  private transaction: Transaction

  constructor(config: KafkaConfig) {
    this.config = config
    this.kafka = new KafkaJs(this.config)
  }

  async publisher(options: PublisherPayload): Promise<void> {
    try {
      this.producer = this.kafka.producer(options.producerConfig || {})
      await this.notification('publisher', this.producer)
      await this.producer.connect()

      options.type == 'single'
        ? await this.producer.send(options.sendConfig as ProducerRecord)
        : await this.producer.sendBatch(options.sendConfig as ProducerBatch)

      await this.producer.disconnect()
    } catch (e: any) {
      console.error(`publisher is not working: ${e}`)
    }
  }

  async publisherTransaction(options: PublisherTransactionPayload): Promise<void> {
    try {
      this.producer = this.kafka.producer(options.producerConfig || {})
      this.transaction = await this.producer.transaction()
      try {
        await this.notification('publisher', this.producer)
        await this.producer.connect()

        options.type == 'single'
          ? await this.producer.send(options.sendConfig as ProducerRecord)
          : await this.producer.sendBatch(options.sendConfig as ProducerBatch)

        await this.transaction.commit()
        await this.producer.disconnect()
      } catch (e: any) {
        if (this.transaction.isActive()) this.transaction.abort()
        console.error(`publisher transaction is not working: ${e}`)
      }
    } catch (e: any) {
      console.error(`publisher transaction is not working: ${e}`)
    }
  }

  async subscriber(options: SubscriberPayload, cb: (payload: EachMessagePayload) => Promise<void>): Promise<void> {
    try {
      this.consumer = this.kafka.consumer(options.consumerConfig)
      this.notification('subscriber', this.consumer)
      await this.consumer.connect()
      await this.consumer.subscribe(options.subscribeConfig)
      await this.consumer.run({ ...(options.runConfig || {}), eachMessage: cb })
    } catch (e: any) {
      console.error(`subscriber is not working: ${e}`)
    }
  }

  private async notification(type: string, handler: Producer | Consumer): Promise<void> {
    try {
      if (type == 'subscriber') {
        this.consumer = handler as Consumer
        await this.consumer.on('consumer.connect', () => console.info('consumer kafka connected'))
        await this.consumer.on('consumer.network.request_timeout', () => console.error('consumer kafka network timeout'))
        await this.consumer.on('consumer.crash', async (): Promise<void> => {
          await this.consumer.disconnect()
          console.error('consumer kafka crash')
        })
        await this.consumer.on('consumer.disconnect', async (): Promise<void> => {
          await this.consumer.disconnect()
          console.error('consumer kafka disconnect')
        })
        await this.consumer.on('consumer.stop', async (): Promise<void> => {
          await this.consumer.stop()
          console.error('consumer kafka disconnect')
        })
      }

      if (type == 'publisher') {
        this.producer = handler as Producer
        await this.producer.on('producer.connect', (): void => console.info('producer kafka connected'))
        await this.producer.on('producer.network.request_timeout', (): void => console.error('producer kafka network timeout'))
        await this.producer.on('producer.disconnect', async (): Promise<void> => {
          await this.producer.disconnect()
          console.error('producer kafka disconnect')
        })
      }
    } catch (e: any) {
      console.error(`notification is not working: ${e}`)
    }
  }
}
Restu Wahyu Saputra

Réponses similaires à “kafkajs”

Parcourir les réponses de code populaires par langue

Parcourir d'autres langages de code