Implementarea unui Ring Buffer în Java

1. Prezentare generală

În acest tutorial, vom învăța cum să implementăm un buffer de inel în Java.

2. Inel tampon

Ring Buffer (sau tampon circular) este o structură de date circulară mărginită care este utilizată pentru tamponarea datelor între două sau mai multe fire . Pe măsură ce scriem într-un tampon de inel, acesta se înfășoară în timp ce ajunge la final.

2.1. Cum functioneaza

Un inel tampon este implementat folosind o matrice de dimensiuni fixe care se înfășoară la limite .

În afară de matrice, ține evidența a trei lucruri:

  • următorul slot disponibil în buffer pentru a insera un element,
  • următorul element necitit din buffer,
  • și sfârșitul matricei - punctul în care tamponul se înfășoară până la începutul matricei

Mecanica modului în care un buffer inel gestionează aceste cerințe variază în funcție de implementare. De exemplu, intrarea Wikipedia de pe subiect arată o metodă care utilizează patru indicatori.

Vom împrumuta abordarea din implementarea disruptorului tamponului de inel folosind secvențe.

Primul lucru pe care trebuie să-l știm este capacitatea - dimensiunea maximă fixă ​​a tamponului. În continuare, vom folosi două secvențe în creștere monotonă :

  • Scrie secvența: începând cu -1, crește cu 1 pe măsură ce introducem un element
  • Citește secvența: începând cu 0, crește cu 1 pe măsură ce consumăm un element

Putem asocia o secvență la un index din matrice utilizând o operație mod:

arrayIndex = sequence % capacity 

Operația mod înfășoară secvența în jurul limitelor pentru a obține un slot în buffer :

Să vedem cum am insera un element:

buffer[++writeSequence % capacity] = element 

Pre-incrementăm secvența înainte de a insera un element.

Pentru a consuma un element facem un post-increment:

element = buffer[readSequence++ % capacity] 

În acest caz, efectuăm o post-incrementare a secvenței. Consumul unui element nu îl elimină din buffer - rămâne doar în matrice până când este suprascris .

2.2. Tampoane goale și complete

Pe măsură ce înfășurăm matricea, vom începe să suprascriem datele din buffer. Dacă bufferul este plin, putem alege fie să suprascrieți cele mai vechi date, indiferent dacă cititorul le-a consumat, fie să preveniți suprascrierea datelor care nu au fost citite .

Dacă cititorul își poate permite să rateze valorile intermediare sau vechi (de exemplu, un bifator de preț pentru acțiuni), putem suprascrie datele fără a aștepta ca acestea să fie consumate. Pe de altă parte, dacă cititorul trebuie să consume toate valorile (cum ar fi în cazul tranzacțiilor de comerț electronic), ar trebui să așteptăm (blocare / ocupare-așteptare) până când bufferul are un slot disponibil.

Bufferul este plin dacă dimensiunea bufferului este egală cu capacitatea sa , unde dimensiunea sa este egală cu numărul de elemente necitite:

size = (writeSequence - readSequence) + 1 isFull = (size == capacity) 

Dacă secvența de scriere rămâne în urma secvenței de citire, tamponul este gol :

isEmpty = writeSequence < readSequence 

Tamponul returnează o valoare nulă dacă este gol.

2.2. Avantaje și dezavantaje

Un buffer inelar este un buffer FIFO eficient. Folosește o matrice de dimensiuni fixe care poate fi alocată în prealabil și permite un model eficient de acces la memorie. Toate operațiile tampon sunt în timp constant O (1) , inclusiv consumarea unui element, deoarece nu necesită o deplasare a elementelor.

Pe de altă parte, determinarea dimensiunii corecte a tamponului inelar este critică. De exemplu, operațiile de scriere se pot bloca pentru o lungă perioadă de timp dacă tamponul este subdimensionat și citirile sunt lente. Putem folosi dimensionarea dinamică, dar ar necesita mutarea datelor și vom pierde majoritatea avantajelor discutate mai sus.

3. Implementare în Java

Acum că înțelegem cum funcționează un buffer de inel, să continuăm să îl implementăm în Java.

3.1. Inițializare

În primul rând, să definim un constructor care inițializează tamponul cu o capacitate predefinită:

public CircularBuffer(int capacity) { this.capacity = capacity; this.data = (E[]) new Object[capacity]; this.readSequence = 0; this.writeSequence = -1; } 

Aceasta va crea un tampon gol și va inițializa câmpurile de secvență așa cum s-a discutat în secțiunea anterioară.

3.3. Oferi

Apoi, vom implementa operațiunea de ofertă care introduce un element în buffer la următorul slot disponibil și se întoarce adevărat la succes. Revine fals dacă tamponul nu poate găsi un slot gol, adică nu putem suprascrie valorile necitite .

Să implementăm metoda ofertei în Java:

public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data[nextWriteSeq % capacity] = element; writeSequence++; return true; } return false; } 

Deci, incrementăm secvența de scriere și calculăm indexul în matrice pentru următorul slot disponibil. Apoi, scriem datele în buffer și stocăm secvența de scriere actualizată.

Să încercăm:

@Test public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size()); } 

3.4. Poll

Finally, we'll implement the poll operation that retrieves and removes the next unread element. The poll operation doesn't remove the element but increments the read sequence.

Let's implement it:

public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data[readSequence % capacity]; readSequence++; return nextValue; } return null; } 

Here, we're reading the data at the current read sequence by computing the index in the array. Then, we're incrementing the sequence and returning the value, if the buffer is not empty.

Let's test it out:

@Test public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape); } 

4. Producer-Consumer Problem

We've talked about the use of a ring buffer for exchanging data between two or more threads, which is an example of a synchronization problem called the Producer-Consumer problem. In Java, we can solve the producer-consumer problem in various ways using semaphores, bounded queues, ring buffers, etc.

Let's implement a solution based on a ring buffer.

4.1. volatile Sequence Fields

Our implementation of the ring buffer is not thread-safe. Let's make it thread-safe for the simple single-producer and single-consumer case.

The producer writes data to the buffer and increments the writeSequence, while the consumer only reads from the buffer and increments the readSequence. So, the backing array is contention-free and we can get away without any synchronization.

But we still need to ensure that the consumer can see the latest value of the writeSequence field (visibility) and that the writeSequence is not updated before the data is actually available in the buffer (ordering).

We can make the ring buffer concurrent and lock-free in this case by making the sequence fields volatile:

private volatile int writeSequence = -1, readSequence = 0; 

In the offer method, a write to the volatile field writeSequence guarantees that the writes to the buffer happen before updating the sequence. At the same time, the volatile visibility guarantee ensures that the consumer will always see the latest value of writeSequence.

4.2. Producer

Let's implement a simple producer Runnable that writes to the ring buffer:

public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items[i])) { System.out.println("Produced: " + items[i]); i++; } } } 

The producer thread would wait for an empty slot in a loop (busy-waiting).

4.3. Consumer

We'll implement a consumer Callable that reads from the buffer:

public T[] call() { T[] items = (T[]) new Object[expectedCount]; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items[i++] = item; System.out.println("Consumed: " + item); } } return items; } 

Firul de consum continuă fără tipărire dacă primește o valoare nulă din buffer.

Să scriem codul nostru de conducător auto:

executorService.submit(new Thread(new Producer(buffer))); executorService.submit(new Thread(new Consumer(buffer))); 

Executarea programului nostru producător-consumator produce rezultate ca mai jos:

Produced: Circle Produced: Triangle Consumed: Circle Produced: Rectangle Consumed: Triangle Consumed: Rectangle Produced: Square Produced: Rhombus Consumed: Square Produced: Trapezoid Consumed: Rhombus Consumed: Trapezoid Produced: Pentagon Produced: Pentagram Produced: Hexagon Consumed: Pentagon Consumed: Pentagram Produced: Hexagram Consumed: Hexagon Consumed: Hexagram 

5. Concluzie

În acest tutorial, am învățat cum să implementăm un Ring Buffer și am explorat modul în care acesta poate fi utilizat pentru a rezolva problema producător-consumator.

Ca de obicei, codul sursă pentru toate exemplele este disponibil pe GitHub.