I am trying to poll an ArrayBlockingQueue to get the item:
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()){
closeAppender();
break;
}
if (!TASK_BUFFER.isEmpty() && !TASK_ADDRESS.isEmpty() && !TASK_MARKER.isEmpty()){
buffer = TASK_BUFFER.poll();
remoteAdd = TASK_ADDRESS.poll();
marker = TASK_MARKER.poll();
// if (buffer.hasArray()){ //if byte buffer is used just get array
// byteArr = buffer.array();
// }else{
// byteArr = directArr; //if direct buffer is used, we get into direct arr
//// Arrays.fill(byteArr, (byte)0); //reset every value to "null" value
// buffer.get(byteArr, 0, buffer.remaining());
// }
// remaining = buffer.remaining();
// runTask();
BUFFER_POOL.returnBuffer(buffer);
}
}
LOGGER.info(mainMarker,"Task Scheduler Thread has terminated...");
}
However, when I run jvisualvm, I can see that whenever I "schedule" a task and poll the queue, it is allocating bytes. I have tried commenting and uncommenting the other parts of the code but only the polling and scheduling will affect the allocated bytes.
I have checked this link but I am not looking to iterate the whole array. I am looking to get one item at a time to process then remove it. What is the proper way to approach this?
edit: I have a separate thread that "produces" instances of ByteBuffer, InetSocketAddress and Marker and puts it inside the arrayblockingqueues. I also just checked the source code for poll() and dequeue() it doesn't seem that it does anything that should warrant the allocation of bytes. If I am creating the objects in the main thread, and adding it to queue, it shouldnt allocate bytes if this thread is only used for processing the queue values
The constructor looks like this:
private final BlockingQueue<ByteBuffer> TASK_BUFFER;
private final BlockingQueue<InetSocketAddress> TASK_ADDRESS;
private final BlockingQueue<Marker> TASK_MARKER;
private final BufferPool BUFFER_POOL;
private final ExcerptAppender APPENDER;
public TaskScheduler(BufferPool bufferPool, ChronicleQueue chronicleQueue){
TASK_BUFFER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
TASK_ADDRESS = new ArrayBlockingQueue<>(bufferPool.getCapacity());
TASK_MARKER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
BUFFER_POOL = bufferPool;
APPENDER = chronicleQueue.acquireAppender();
}
and this puts the produced values inside the queue:
public void scheduleTask(final ByteBuffer buffer, final InetSocketAddress remoteAdd, final Marker marker) throws InterruptedException {
TASK_BUFFER.put(buffer);
TASK_ADDRESS.put(remoteAdd);
TASK_MARKER.put(marker);
}

Whenever an attempt to acquire a
ReentrantLockis made, there’s potentiall a memory allocation. When there’s no contention or the lock is released during the spinning phase, thelock()method will return without allocations. But when the thread is added to the queue of waiting threads, a node object is allocated for that.This applies to all classes using a
ReentrantLockunder the hood, likeArrayBlockingQueue, but also to all concurrency classes usingAbstractQueued[Long]Synchronizerbehind the scenes in general.So your code looks like an attempted premature optimization that backfired. To avoid creating a simple object holding the
ByteBuffer,InetSocketAddress, andMarkerbelonging to a task, you have three blocking queues requiring not only threepollcalls but also to protect them with threeisEmpty()checks, so you end up with up to six allocations in the worst case, instead of the one you had with a singlepoll()andnullcheck when you used a single queue of objects holding all three values.But the entire loop is literally a polling loop, repeatedly calling
poll()without ever giving up the CPU. It’s creating the very overhead it (apparently) was supposed to avoid. Just callingtake(), to only return when there is a new element would avoid this. This would queue the thread if there is no new element, but as said, a thread gets queued anyway when there’s contention at the lock and repeatedly callingpoll()in a tight loop is a recipe to get contention sooner or later. On the other hand,take()will work exactly the same aspoll()when there is a new element.