bullmq

import 'dotenv/config'
import { Worker, Queue, WorkerOptions, QueueOptions, Job, QueueEvents, QueueEventsOptions } from 'bullmq'
import { EventEmitter } from 'events'
import IORedis from 'ioredis'
import os from 'os'
import { assert } from 'is-any-type'

import { HydeLivingError } from '@helpers/helper.error'

export type BullJob = Job
export class BullQueue {
	private workerOptions: WorkerOptions
	private queueOptions: QueueOptions
	private eventOptions: QueueEventsOptions
	private emitter: InstanceType<typeof EventEmitter> = new EventEmitter({ captureRejections: true })
	private redisConnection: InstanceType<typeof IORedis>

	constructor(db: number) {
		this.redisConnection = new IORedis({
			host: process.env.REDIS_HOST as string,
			port: parseInt(process.env.REDIS_PORT as any),
			password: (process.env.REDIS_PASSWORD as string) || '',
			maxRetriesPerRequest: null,
			db: db
		})
	}

	async publisher(key: string, value: Record<string, any> | Record<string, any>[]): Promise<Queue | Error> {
		try {
			if ((assert.isObject(value as any) || assert.isArray(value as any)) != true)
				throw new HydeLivingError('value must be an array or object')

			this.queueOptions = {
				connection: this.redisConnection,
				defaultJobOptions: {
					removeOnComplete: true,
					sizeLimit: 5242880,
					timeout: 1000 * 60,
					attempts: 5,
					backoff: {
						type: 'exponential',
						delay: 1000 * 3
					},
					priority: os.cpus().length
				}
			}

			const queue: InstanceType<typeof Queue> = new Queue(key, this.queueOptions)
			await queue.add(`hydeliving:${key}:${Date.now()}`, value)

			return queue
		} catch (e: any) {
			return Promise.reject(new HydeLivingError(e.message || 'Publisher crash add value to queue failed'))
		}
	}

	private notification(key: string): InstanceType<typeof QueueEvents> {
		this.eventOptions = {
			connection: this.redisConnection
		}
		const events: InstanceType<typeof QueueEvents> = new QueueEvents(key, this.eventOptions)

		events.on('waiting', (args: { jobId: string }) => console.info(`jobs ${args.jobId} is waiting`))
		events.on('progress', (args: { jobId: string }) => console.info(`jobs ${args.jobId} in progress`))
		events.on('completed', (args: { jobId: string }) => console.log(`jobs ${args.jobId}} is completed`))
		events.on('removed', (args: { jobId: string }) => console.info(`jobs ${args.jobId} is removed`))
		events.on('failed', (args: { jobId: string }) => console.error(`jobs ${args.jobId} is failed`))
		events.on('error', (_err: globalThis.Error) => console.error(`jobs ${events.name} is error`))

		return events
	}

	private worker(key: string): InstanceType<typeof Worker> {
		this.workerOptions = {
			connection: this.redisConnection,
			concurrency: os.cpus().length,
			skipDelayCheck: true,
			runRetryDelay: 1000 * 3,
			settings: {
				backoffStrategies: {
					custom(attemptsMade: number) {
						return Math.abs(attemptsMade * 1000)
					}
				}
			}
		}

		const worker: InstanceType<typeof Worker> = new Worker(
			key,
			async (job: Job): Promise<any> => {
				if (await job.isCompleted()) await job.remove()
				if (await job.isFailed()) await job.retry('failed')
				if (job.data) this.emitter.emit('data', JSON.stringify(job.data))
				return job
			},
			this.workerOptions
		)

		worker.on('active', async () => await worker.resume())
		worker.on('paused', async () => await worker.resume())
		worker.on('progress', async () => await worker.resume())

		return worker
	}

	async subscriber(key: string): Promise<any> {
		/**
		 * @description initalize worker and worker notification
		 */
		await this.worker(key)
		await this.notification(key)

		/**
		 * @description listening data from worker
		 */
		return new Promise<any>((resolve, _reject) => {
			this.emitter.on('data', (data: any) => resolve(JSON.parse(data)))
		})
	}
}
Restu Wahyu Saputra