Method this.aQueue.add(*args) which returns an job instance is endlessly awaited. After receiving and deserializing data i try to get a job instance in the method RequestService.sendData of app.service.ts module, but can't do it and the rest of code is not running
app.service.ts
import { Injectable } from '@nestjs/common';
import { Job, Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { plainToClass } from 'class-transformer';
import { RequestScheme, ResponseScheme } from './app.schemes';
@Injectable()
export class RequestService {
constructor(
@InjectQueue('request') private requestQueue: Queue
){}
async sendData(data: RequestScheme): Promise<ResponseScheme> {
let responseData: ResponseScheme
data = plainToClass(RequestScheme, data)
console.log("data in controller", data) // data is deserialized as i expect
const jobInstance = await this.requestQueue.add(
'request', data, { delay: data.wait }
) // this method is running and never awaited
console.log(`Job: ${jobInstance}`)
async function setJobData(jobInstance: Job){
return new Promise((resolve, reject) => {
this.requestQueue.on('completed', function(job: Job, result: ResponseScheme, error){
if (jobInstance.id == job.id) {
responseData = result
job.remove()
resolve(result)
}
if (error) reject(error)
})
})}
await setJobData(jobInstance)
return responseData
}
}
app.processor.ts
import { Job } from 'bull';
import {
Processor,
Process,
OnQueueActive,
OnQueueCompleted
} from '@nestjs/bull';
import { ResponseScheme } from './app.schemes';
@Processor('request')
export class RequestConsumer {
@Process('request')
async process_request(job: Job){
console.log(`Job ${job.id} proceed`)
}
@OnQueueActive()
onActive(job: Job){
console.log(`Data ${job.data} were sended`)
}
@OnQueueCompleted()
onComplete(job: Job){
const response = new ResponseScheme()
response.answer = job.data.answer
return response
}
}
app.module.ts
import { Global, Module, NestModule } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { RequestController } from './app.controller';
import { RequestService } from './app.service';
import { RequestConsumer } from './app.processor';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null
}
}),
BullModule.registerQueue({
name: 'request'
})
],
controllers: [RequestController],
providers: [
RequestService
],
exports: [
RequestService
]
})
export class AppModule {
configure(consumer: RequestConsumer){}
}
app.controller.ts
import { Job } from 'bull';
import { Body, Controller, Get, HttpException, HttpStatus, Post, Res } from '@nestjs/common';
import { RequestService } from './app.service';
import { RequestScheme, ResponseScheme } from './app.schemes';
@Controller('request')
export class RequestController {
constructor(private readonly requestService: RequestService) {}
@Get()//this works good
about(): string{
return "Hello! This is the request"
}
@Post()
async getHello(@Body() data: RequestScheme): Promise<ResponseScheme> {
console.log("POST", "data", data) //client data, good as it's expected
let responseData: ResponseScheme
responseData = await this.requestService.sendData(data)
return responseData
}
}
Based on manual[https://docs.nestjs.com/techniques/queues] this method is standard for nestjs.
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true },
);
But my variant is running endlessly - it doesn’t matter what data I have in wait (string or number):
const jobInstance = await this.requestQueue.add(
'request', data, { delay: data.wait }
)
Also, i tried to get hardcode data from RequestService.sendData by hidding "add a job" method, and it works. But i need to add a job.
Oooookay. And what do you want to achieve with this setup? :)
Don't get me wrong but from my place it looks as if you were dispatching a job from controller, to be processed by a processor that doesn't do anything except for console.logging, but controller needs to wait until the job is done and returns some data.
Also, the code from inside a promise:
is supposed to change the
responseDatawhich is outside of its scope, and rather inaccessible from there, as that promise is not even executed in the context of thesendData, but run from within another promise function, also this is never defined, as your processor returns nothing. Plus,job.remove()is a promise itself.Also you have a
if-elseclause, that says:jobInstance.idis equal-ish tojob.id, then do sth,jobInstance.idisn't equal-ish tojob.id, but there's no error) - do nothing. Wait. Wait. Wait. Just wait ;)Also a promise function with
asyncinside of anasyncfunction that's something that might lead to many weird stuff going on. Besides, when you doasync function xyz(...) { this.requestQueue...}, what do you mean bythis?If your intention was to withhold controller until job is finished, then mutate some data inside of the controller method, to display it to the end-user, then it's absolutely not the way. But I can't determine if that was actually your intention, so I don't know if that's the direction you're interested in :)