常見隊列介紹與緩沖實現
- 一 常見隊列
- 二 常見隊列的方法
- 1.ArrayBlockingQueue
- 2.LinkedBlockingQueue
- 3.LinkedBlockingDeque
- 4.ConcurrentLinkedQueue
- 5.SynchronousQueue
- 6.LinkedTransferQueue
-
一 常見隊列
隊列說明
ArrayBlockingQueue | 有界 |
LinkedBlockingQueue | 有/無界 |
LinkedBlockingDeque | 無界 |
ConcurrentLinkedQueue | 無界 |
SynchronousQueue | 無界 |
LinkedTransferQueue | 無界 |
DelayQueue | 延遲 |
二 常見隊列的方法
1.ArrayBlockingQueue
public static void arrayBlockingQueue() throws InterruptedException {/*** 有界阻塞隊列,初始化必須指定大小*/ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);/*** 阻塞插入,容量不足會阻塞,直到有空閑位置*/queue.put(1);/*** 非阻塞插入,返回插入狀態 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 異常插入,無空閑位置則拋出異常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定時長拉取,阻塞時間內有數據則直接取數,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}
2.LinkedBlockingQueue
public static void linkedBlockingQueue() throws InterruptedException {/*** 指定長度即為有界* 不指定長度即為無界*/LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);/*** 阻塞插入,容量不足會阻塞,直到有空閑位置*/queue.put(1);/*** 非阻塞插入,返回插入狀態 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 異常插入,無空閑位置則拋出異常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定時長拉取,阻塞時間內有數據則直接取數,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}
3.LinkedBlockingDeque
public static void linkedBlockingDeque() throws InterruptedException {/*** 消費默認從頭開始*/LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>(1);/*** 阻塞插入,容量不足會阻塞,直到有空閑位置*/deque.put(1);/*** 非阻塞插入,返回插入狀態 true/false*/deque.offer(1);/*** 阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/deque.offer(1,1, TimeUnit.SECONDS);/*** 異常插入,無空閑位置則拋出異常*///deque.add(1);/*** 從頭非阻塞插入,返回插入狀態 true/false*/deque.offerFirst(1);/*** 從頭阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 從尾非阻塞插入,返回插入狀態 true/false*/deque.offerLast(1);/*** 從尾阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 非阻塞拉取,取出并移除元素*/deque.poll();/*** 阻塞指定時長拉取,阻塞時間內有數據則直接取數,取出并移除元素*/deque.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取頭元素,取出并移除元素*/deque.pollFirst();/*** 阻塞指定時長拉取頭元素,阻塞時間內有數據則直接取數,取出并移除元素*/deque.pollFirst(1,TimeUnit.SECONDS);/*** 非阻塞拉取尾元素,取出并移除元素*/deque.pollLast();/*** 阻塞指定時長拉取尾元素,阻塞時間內有數據則直接取數,取出并移除元素*/deque.pollLast(1,TimeUnit.SECONDS);/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/deque.peek();/*** 阻塞拉取,直到有元素*/deque.take();/*** 阻塞拉取,直到頭有元素*/deque.takeFirst();/*** 阻塞拉取,直到尾有元素*/deque.takeLast();}
4.ConcurrentLinkedQueue
public static void concurrentLinkedQueue(){/*** 無界,不支持指定大小* 無鎖,通過 CAS 控制并發;可能存在讀寫不一致*/ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();/*** 非阻塞插入,返回插入狀態 true/false*/queue.offer(1);/*** 內部調用了 offer 方法*/queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/queue.peek();}public static void synchronousQueue () throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());/*** 同步隊列 不存儲數據 匹配生產者和消費者線程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,沒有消費線程直接返回 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有消費線程與之匹配則可插入,返回插入狀態 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 異常插入,無消費線程則拋出異常*///queue.add(1);/*** 非阻塞拉取,如果有生產線程,則消費其數據*/System.out.println("First:" + queue.poll());/*** 阻塞指定時長拉取,阻塞時間內有生產線程與之匹配則直接取數,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 無效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生產線程與之匹配*/System.out.println("Third:" + queue.take());}
5.SynchronousQueue
public static void synchronousQueue () throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());/*** 同步隊列 不存儲數據 匹配生產者和消費者線程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,沒有消費線程直接返回 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有消費線程與之匹配則可插入,返回插入狀態 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 異常插入,無消費線程則拋出異常*///queue.add(1);/*** 非阻塞拉取,如果有生產線程,則消費其數據*/System.out.println("First:" + queue.poll());/*** 阻塞指定時長拉取,阻塞時間內有生產線程與之匹配則直接取數,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 無效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生產線程與之匹配*/System.out.println("Third:" + queue.take());}
6.LinkedTransferQueue
public static void linkedTransferQueue() throws InterruptedException {/*** 無界非阻塞隊列 類似 LinkedBlockingQueue + SynchronousQueue* 可以存儲實體并實現生產者和消費者線程匹配*/LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();/*** 非阻塞插入:實際都是調用 xfer(e, true, ASYNC, 0L)*/queue.put(1);queue.offer(2);queue.offer(3,1,TimeUnit.SECONDS);queue.add(4);/*** 非阻塞獲取,并移除元素*/System.out.println(queue.poll());/*** 阻塞指定時長獲取,并移除元素*/System.out.println(queue.poll(1, TimeUnit.SECONDS));/*** 非阻塞獲取,不移除元素*/System.out.println(queue.peek());/*** 阻塞獲取*/System.out.println(queue.take());}
三 緩沖隊列實現
隊列常用語 FIFO 場景的數據處理,同時如多線程生產單線程消費、多線程生產多線程消費和
單線程生產多線程消費
1.定義緩沖隊列類
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.
semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;/*** 任務緩沖隊列 用于批量處理 或 順序緩沖處理** @author * @date 2023-05-16 19:05* @since 1.8*/
public class TaskQueue<T,R> {/*** 日志打印*/private static final Logger logger = LoggerFactory.getLogger(TaskQueue.class);/*** 任務批量大小*/private int taskSize = 300;/*** 延遲處理時間設置 默認 1 MIN*/private long handDelayTime = 1000 * 60 ;/*** 緩沖隊列大小 默認 1W*/private int capacity = 10000;/*** 無界阻塞隊列,用于數據/任務緩沖*/private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(capacity);/*** 待處理數據/任務集合*/private List<T> taskList = new ArrayList<>(taskSize);/*** 單線程處理*/private Semaphore handSemaphore = new Semaphore(1);/*** 是否強制處理*/private boolean isForceDeal = false;/*** 最遲處理超時時間(MS) 默認 3 * 60 * 1000*/private int lastDealTTL = 180000;/*** 上次處理時間*/private long lastDealTime = 0;/*** 阻塞隊列阻塞超時時間(MS) 默認 3 * 60 * 1000*/private int queueTTL = 180000;/*** 同步信號量阻塞超時(MS) 默認 1MIN*/private int semaphoreTTL = 6000;/*** 定義消費線程池 僅通過核心線程循環消費隊列數據即可*/private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));/*** 自定義處理方法*/private Function<List<T>,R> function;/*** 無參*/public TaskQueue(){}/*** 初始化隊列大小* @param capacity*/public TaskQueue(int capacity){this.capacity = capacity;}/*** 初始化鎖 用于線程檢查*/private static final Integer INIT_LOCK = 0;/*** 消費方法* @return*/private void taskHandlerThread(){executor.execute(()->{//定義任務/數據對象T temp;//循環消費while (true){try {//取數據temp = queue.poll(queueTTL,TimeUnit.MILLISECONDS);//判斷是否為 NULL 或是否超過最遲處理時間if (temp == null || isNeedDeal()){//阻塞等待后仍沒有新數據則直接處理taskHandler();//阻塞直到有新數據進來if (temp == null){temp = queue.take();}}//填充到集合taskList.add(temp);// 判斷是否需要發送if (taskList.size() >= taskSize){taskHandler();}} catch (InterruptedException e) {logger.error("Take Consumer InterruptedException:",e);} catch (Exception e){logger.error("Take Consumer Exception:",e);}}});}/*** 是否需要處理(針對超時情況)* 避免數據遲遲無法處理,否則最遲情況可能為 queueTTL * taskSize 時長無法處理* @return*/private boolean isNeedDeal(){return isForceDeal && System.currentTimeMillis() > lastDealTime;}/*** 數據/任務集合處理方法* Spring 管理下可用 @PreDestroy 在實例銷毀前觸發一次處理*/private void taskHandler(){boolean acq = false;try {//本次許可超時則等待下次觸發(1MIN)acq = handSemaphore.tryAcquire(semaphoreTTL,TimeUnit.MILLISECONDS);//如果成功獲取許可才發送if (acq){if (!CollectionUtils.isEmpty(taskList)){deal(taskList);}//重置時間lastDealTime = System.currentTimeMillis() + lastDealTTL;//處理完成清空集合taskList.clear();}} catch (Exception e) {logger.error("Task List Handle Exception:",e);} finally {if (acq){handSemaphore.release();}}}/*** 檢查線程池是否就緒*/private boolean isHandlerReady(){//如果變量為 nullif (executor == null ){synchronized (INIT_LOCK){if (executor == null ){{executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));}}}taskHandlerThread();return false;} else {//如果活動任務數小于 1 重新添加任務int count = executor.getActiveCount();if (count < 1){synchronized (INIT_LOCK){count = executor.getActiveCount();if (count < 1){taskHandlerThread();}}return false;} else {//否則線程池健康 則返回 True 允許使用return true;}}}/*** 入隊,失敗則直接處理* @param t*/private boolean put(T t) {try {return queue.offer(t);} catch (Exception e) {logger.error("Offer Queue Exception:",e);return false;}}/*** 填充到阻塞隊列,如果失敗就立即發送* @param t*/private void handleByQueue(T t){boolean putSuccess = this.put(t);if (!putSuccess){//否則直接處理this.deal(Collections.singletonList(t));}}/*** 前置過濾,校驗消費者狀態* @param t*/public void taskHandlerPre(T t){// 默認使用連接池if (isHandlerReady()){this.handleByQueue(t);} else {//否則直接處理deal(Collections.singletonList(t));}}/*** 設置 Function* @param function*/public void setFunction(Function<List<T>,R> function){this.function = function;}/*** 重寫處理方法* @param list*/public R deal(List<T> list) {if (null == function){return null;}return function.apply(list);}
}
2.定義數據對象
public class User {private String code;private String name;private int age;public User(int age,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.code = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";}
}
3.測試代碼
public void task(){/*** 初始化延遲隊列* User 數據/任務類* String 返回值類型* 其他初始化參數自行修改或支持外部修改*/TaskQueue<User,String> queue = new TaskQueue<>();/*** 批量發送* 批量入庫* 批量計算*/queue.setFunction(t->{t.forEach(k->{System.out.println(k);});return "";});/*** 模擬發送數據到緩沖隊列*/for (int i=0;i<302;i++){queue.taskHandlerPre(new User(1,"Alycia:" + i));}}