1. <nobr id="easjo"><address id="easjo"></address></nobr>

      <track id="easjo"><source id="easjo"></source></track>
      1. 
        

      2. <bdo id="easjo"><optgroup id="easjo"></optgroup></bdo>
      3. <track id="easjo"><source id="easjo"><em id="easjo"></em></source></track><option id="easjo"><span id="easjo"><em id="easjo"></em></span></option>
          貴州做網站公司
          貴州做網站公司~專業!靠譜!
          10年網站模板開發經驗,熟悉國內外開源網站程序,包括DEDECMS,WordPress,ZBlog,Discuz! 等網站程序,可為您提供網站建設,網站克隆,仿站,網頁設計,網站制作,網站推廣優化等服務。我們專注高端營銷型網站,企業官網,集團官網,自適應網站,手機網站,網絡營銷,網站優化,網站服務器環境搭建以及托管運維等。為客戶提供一站式網站解決方案?。?!

          Java 常見隊列的使用與緩沖實現

          來源:互聯網轉載 時間:2023-09-13 09:35:23

          常見隊列介紹與緩沖實現

          • 一 常見隊列
          • 二 常見隊列的方法
            • 1.ArrayBlockingQueue
            • 2.LinkedBlockingQueue
            • 3.LinkedBlockingDeque
            • 4.ConcurrentLinkedQueue
            • 5.SynchronousQueue
            • 6.LinkedTransferQueue
              • 1.定義緩沖隊列類
              • 2.定義數據對象
              • 3.測試代碼

          一 常見隊列

          隊列說明
          ArrayBlockingQueue有界
          LinkedBlockingQueue有/無界
          LinkedBlockingDeque無界
          ConcurrentLinkedQueue無界
          SynchronousQueue無界
          LinkedTransferQueue無界
          DelayQueue延遲

          二 常見隊列的方法

          1.ArrayBlockingQueue

          public static void arrayBlockingQueue() throws InterruptedException {/*** 有界阻塞隊列,初始化必須指定大小*/ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);/*** 阻塞插入,容量不足會阻塞,直到有空閑位置*/queue.put(1);/*** 非阻塞插入,返回插入狀態 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 異常插入,無空閑位置則拋出異常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定時長拉取,阻塞時間內有數據則直接取數,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}

          2.LinkedBlockingQueue

          public static void linkedBlockingQueue() throws InterruptedException {/*** 指定長度即為有界* 不指定長度即為無界*/LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);/*** 阻塞插入,容量不足會阻塞,直到有空閑位置*/queue.put(1);/*** 非阻塞插入,返回插入狀態 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/queue.offer(1,1, TimeUnit.SECONDS);/*** 異常插入,無空閑位置則拋出異常*///queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 阻塞指定時長拉取,阻塞時間內有數據則直接取數,取出并移除元素*/queue.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/queue.peek();/*** 阻塞拉取,直到有元素*/queue.take();}

          3.LinkedBlockingDeque

          public static void linkedBlockingDeque() throws InterruptedException {/*** 消費默認從頭開始*/LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>(1);/*** 阻塞插入,容量不足會阻塞,直到有空閑位置*/deque.put(1);/*** 非阻塞插入,返回插入狀態 true/false*/deque.offer(1);/*** 阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/deque.offer(1,1, TimeUnit.SECONDS);/*** 異常插入,無空閑位置則拋出異常*///deque.add(1);/*** 從頭非阻塞插入,返回插入狀態 true/false*/deque.offerFirst(1);/*** 從頭阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 從尾非阻塞插入,返回插入狀態 true/false*/deque.offerLast(1);/*** 從尾阻塞指定時長插入,阻塞時間內有空閑位置則可插入,返回插入狀態 true/false*/deque.offerFirst(1,1,TimeUnit.SECONDS);/*** 非阻塞拉取,取出并移除元素*/deque.poll();/*** 阻塞指定時長拉取,阻塞時間內有數據則直接取數,取出并移除元素*/deque.poll(1,TimeUnit.SECONDS);/*** 非阻塞拉取頭元素,取出并移除元素*/deque.pollFirst();/*** 阻塞指定時長拉取頭元素,阻塞時間內有數據則直接取數,取出并移除元素*/deque.pollFirst(1,TimeUnit.SECONDS);/*** 非阻塞拉取尾元素,取出并移除元素*/deque.pollLast();/*** 阻塞指定時長拉取尾元素,阻塞時間內有數據則直接取數,取出并移除元素*/deque.pollLast(1,TimeUnit.SECONDS);/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/deque.peek();/*** 阻塞拉取,直到有元素*/deque.take();/*** 阻塞拉取,直到頭有元素*/deque.takeFirst();/*** 阻塞拉取,直到尾有元素*/deque.takeLast();}

          4.ConcurrentLinkedQueue

          public static void concurrentLinkedQueue(){/*** 無界,不支持指定大小* 無鎖,通過 CAS 控制并發;可能存在讀寫不一致*/ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();/*** 非阻塞插入,返回插入狀態 true/false*/queue.offer(1);/*** 內部調用了 offer 方法*/queue.add(1);/*** 非阻塞拉取,取出并移除元素*/queue.poll();/*** 非阻塞拉取,隊首為空返回NULL,取出不移除元素*/queue.peek();}public static void synchronousQueue () throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());/*** 同步隊列 不存儲數據 匹配生產者和消費者線程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,沒有消費線程直接返回 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有消費線程與之匹配則可插入,返回插入狀態 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 異常插入,無消費線程則拋出異常*///queue.add(1);/*** 非阻塞拉取,如果有生產線程,則消費其數據*/System.out.println("First:" + queue.poll());/*** 阻塞指定時長拉取,阻塞時間內有生產線程與之匹配則直接取數,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 無效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生產線程與之匹配*/System.out.println("Third:" + queue.take());}

          5.SynchronousQueue

          public static void synchronousQueue () throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 100,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());/*** 同步隊列 不存儲數據 匹配生產者和消費者線程*/SynchronousQueue<Integer> queue = new SynchronousQueue<>();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.put(1);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 非阻塞插入,沒有消費線程直接返回 true/false*/queue.offer(1);/*** 阻塞指定時長插入,阻塞時間內有消費線程與之匹配則可插入,返回插入狀態 true/false*/queue.offer(1,3, TimeUnit.SECONDS);/*** 異常插入,無消費線程則拋出異常*///queue.add(1);/*** 非阻塞拉取,如果有生產線程,則消費其數據*/System.out.println("First:" + queue.poll());/*** 阻塞指定時長拉取,阻塞時間內有生產線程與之匹配則直接取數,取出并移除元素*/System.out.println("Second:" + queue.poll(3,TimeUnit.SECONDS));/*** 無效方法 直接返回 NULL*/queue.peek();/*** 阻塞插入,阻塞直到有消費線程與之匹配*/executor.execute(()->{try {queue.offer(999999,3, TimeUnit.SECONDS);} catch (InterruptedException e) {throw new RuntimeException(e);}});/*** 阻塞拉取,直到有生產線程與之匹配*/System.out.println("Third:" + queue.take());}

          6.LinkedTransferQueue

          public static void linkedTransferQueue() throws InterruptedException {/*** 無界非阻塞隊列 類似 LinkedBlockingQueue + SynchronousQueue* 可以存儲實體并實現生產者和消費者線程匹配*/LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();/*** 非阻塞插入:實際都是調用 xfer(e, true, ASYNC, 0L)*/queue.put(1);queue.offer(2);queue.offer(3,1,TimeUnit.SECONDS);queue.add(4);/*** 非阻塞獲取,并移除元素*/System.out.println(queue.poll());/*** 阻塞指定時長獲取,并移除元素*/System.out.println(queue.poll(1, TimeUnit.SECONDS));/*** 非阻塞獲取,不移除元素*/System.out.println(queue.peek());/*** 阻塞獲取*/System.out.println(queue.take());}

          三 緩沖隊列實現

          隊列常用語 FIFO 場景的數據處理,同時如多線程生產單線程消費、多線程生產多線程消費和 單線程生產多線程消費

          1.定義緩沖隊列類

          import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils;import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function;/*** 任務緩沖隊列 用于批量處理 或 順序緩沖處理** @author * @date 2023-05-16 19:05* @since 1.8*/ public class TaskQueue<T,R> {/*** 日志打印*/private static final Logger logger = LoggerFactory.getLogger(TaskQueue.class);/*** 任務批量大小*/private int taskSize = 300;/*** 延遲處理時間設置 默認 1 MIN*/private long handDelayTime = 1000 * 60 ;/*** 緩沖隊列大小 默認 1W*/private int capacity = 10000;/*** 無界阻塞隊列,用于數據/任務緩沖*/private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(capacity);/*** 待處理數據/任務集合*/private List<T> taskList = new ArrayList<>(taskSize);/*** 單線程處理*/private Semaphore handSemaphore = new Semaphore(1);/*** 是否強制處理*/private boolean isForceDeal = false;/*** 最遲處理超時時間(MS) 默認 3 * 60 * 1000*/private int lastDealTTL = 180000;/*** 上次處理時間*/private long lastDealTime = 0;/*** 阻塞隊列阻塞超時時間(MS) 默認 3 * 60 * 1000*/private int queueTTL = 180000;/*** 同步信號量阻塞超時(MS) 默認 1MIN*/private int semaphoreTTL = 6000;/*** 定義消費線程池 僅通過核心線程循環消費隊列數據即可*/private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));/*** 自定義處理方法*/private Function<List<T>,R> function;/*** 無參*/public TaskQueue(){}/*** 初始化隊列大小* @param capacity*/public TaskQueue(int capacity){this.capacity = capacity;}/*** 初始化鎖 用于線程檢查*/private static final Integer INIT_LOCK = 0;/*** 消費方法* @return*/private void taskHandlerThread(){executor.execute(()->{//定義任務/數據對象T temp;//循環消費while (true){try {//取數據temp = queue.poll(queueTTL,TimeUnit.MILLISECONDS);//判斷是否為 NULL 或是否超過最遲處理時間if (temp == null || isNeedDeal()){//阻塞等待后仍沒有新數據則直接處理taskHandler();//阻塞直到有新數據進來if (temp == null){temp = queue.take();}}//填充到集合taskList.add(temp);// 判斷是否需要發送if (taskList.size() >= taskSize){taskHandler();}} catch (InterruptedException e) {logger.error("Take Consumer InterruptedException:",e);} catch (Exception e){logger.error("Take Consumer Exception:",e);}}});}/*** 是否需要處理(針對超時情況)* 避免數據遲遲無法處理,否則最遲情況可能為 queueTTL * taskSize 時長無法處理* @return*/private boolean isNeedDeal(){return isForceDeal && System.currentTimeMillis() > lastDealTime;}/*** 數據/任務集合處理方法* Spring 管理下可用 @PreDestroy 在實例銷毀前觸發一次處理*/private void taskHandler(){boolean acq = false;try {//本次許可超時則等待下次觸發(1MIN)acq = handSemaphore.tryAcquire(semaphoreTTL,TimeUnit.MILLISECONDS);//如果成功獲取許可才發送if (acq){if (!CollectionUtils.isEmpty(taskList)){deal(taskList);}//重置時間lastDealTime = System.currentTimeMillis() + lastDealTTL;//處理完成清空集合taskList.clear();}} catch (Exception e) {logger.error("Task List Handle Exception:",e);} finally {if (acq){handSemaphore.release();}}}/*** 檢查線程池是否就緒*/private boolean isHandlerReady(){//如果變量為 nullif (executor == null ){synchronized (INIT_LOCK){if (executor == null ){{executor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1));}}}taskHandlerThread();return false;} else {//如果活動任務數小于 1 重新添加任務int count = executor.getActiveCount();if (count < 1){synchronized (INIT_LOCK){count = executor.getActiveCount();if (count < 1){taskHandlerThread();}}return false;} else {//否則線程池健康 則返回 True 允許使用return true;}}}/*** 入隊,失敗則直接處理* @param t*/private boolean put(T t) {try {return queue.offer(t);} catch (Exception e) {logger.error("Offer Queue Exception:",e);return false;}}/*** 填充到阻塞隊列,如果失敗就立即發送* @param t*/private void handleByQueue(T t){boolean putSuccess = this.put(t);if (!putSuccess){//否則直接處理this.deal(Collections.singletonList(t));}}/*** 前置過濾,校驗消費者狀態* @param t*/public void taskHandlerPre(T t){// 默認使用連接池if (isHandlerReady()){this.handleByQueue(t);} else {//否則直接處理deal(Collections.singletonList(t));}}/*** 設置 Function* @param function*/public void setFunction(Function<List<T>,R> function){this.function = function;}/*** 重寫處理方法* @param list*/public R deal(List<T> list) {if (null == function){return null;}return function.apply(list);} }

          2.定義數據對象

          public class User {private String code;private String name;private int age;public User(int age,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.code = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";} }

          3.測試代碼

          public void task(){/*** 初始化延遲隊列* User 數據/任務類* String 返回值類型* 其他初始化參數自行修改或支持外部修改*/TaskQueue<User,String> queue = new TaskQueue<>();/*** 批量發送* 批量入庫* 批量計算*/queue.setFunction(t->{t.forEach(k->{System.out.println(k);});return "";});/*** 模擬發送數據到緩沖隊列*/for (int i=0;i<302;i++){queue.taskHandlerPre(new User(1,"Alycia:" + i));}}

          網絡推廣與網站優化公司(網絡優化與推廣專家)作為數字營銷領域的核心服務提供方,其價值在于通過技術手段與策略規劃幫助企業提升線上曝光度、用戶轉化率及品牌影響力。這...

          在當今數字化時代,公司網站已成為企業展示形象、傳遞信息和開展業務的重要平臺。然而,對于許多公司來說,網站建設的價格是一個關鍵考量因素。本文將圍繞“公司網站建設價...

          在當今的數字化時代,企業網站已成為企業展示形象、吸引客戶和開展業務的重要平臺。然而,對于許多中小企業來說,高昂的網站建設費用可能會成為其發展的瓶頸。幸運的是,隨...

          泰安與泰山有什么區別?的泰山是指泰山的一座山。;安,和泰 安是一座城市。泰 安包括泰和;;安,但泰 安沒有。泰 安歷史悠久,居五岳之首,是古代中原王朝重要的文化發祥地?。?!泰安只是建國后設立的一個地級市。;(),文化很短,和泰安 美國的經濟在全國名列前茅??!泰 安、是市名,泰山在泰山之內。;安,五岳之首,風景旅游區。泰 安,是一個地級市的名字,泰山是一座名山,有五座獨特的山。泰安與泰山有什么區別?...

          windows server自帶的backup怎么恢復?1. 安裝windows server備份服務。2使用Windows Server Backup進行本地一次性備份。選擇一次性備份后,將啟動一次性備份向導,并在備份向導中選擇備份方法。三。除特殊要求外,不建議備份整個服務器。首先,不建議備份不必要的數據。其次,備份過程相當緩慢。因此,您可以選擇自定義備份。4在選擇要備份的項目頁面中,選擇添加項...

          天津到深圳的機票多少錢?打幾折?一般機票1000元左右。如果是晚上,可能會有300元左右的特價機票。具體來說,你得開一家航空公司。從天津到深圳坐飛機大概多錢?天津到深圳全價1850,全程飛行約3小時10分鐘。北京到深圳全價1750,全程飛行時間和天津一樣。天津機場有到深圳寶安機場的航班嗎?天津機場到深圳寶安機場都有航班。1.天津作為四大直轄市之一,也是重要的進出口城市,而深圳作為社會主義第一示范區...

          TOP
          国产初高中生视频在线观看|亚洲一区中文|久久亚洲欧美国产精品|黄色网站入口免费进人
          1. <nobr id="easjo"><address id="easjo"></address></nobr>

              <track id="easjo"><source id="easjo"></source></track>
              1. 
                

              2. <bdo id="easjo"><optgroup id="easjo"></optgroup></bdo>
              3. <track id="easjo"><source id="easjo"><em id="easjo"></em></source></track><option id="easjo"><span id="easjo"><em id="easjo"></em></span></option>