- CountDownLatch
- CyclicBarrier
- Semaphore
- ExChanger
1 2 3 4 5 6 7 8 9 10 11 | /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch( int count) { if (count < 0 ) throw new IllegalArgumentException( "count < 0" ); this .sync = new Sync(count); } |
- CountDownLatch:初始化方法
- await:等待方法,同时带参数的是超时重载方法
- countDown:每执行一次,计数器减一,就是初始化传入的数字,也代表着一个线程完成了任务
- getCount:获取当前值
- toString:这个就不用说了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | package org.dance.tools; import java.util.concurrent.TimeUnit; /** * 类说明:线程休眠辅助工具类 */ public class SleepTools { /** * 按秒休眠 * @param seconds 秒数 */ public static final void second( int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } /** * 按毫秒数休眠 * @param seconds 毫秒数 */ public static final void ms( int seconds) { try { TimeUnit.MILLISECONDS.sleep(seconds); } catch (InterruptedException e) { } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | package org.dance.day2.util; import org.dance.tools.SleepTools; import java.util.concurrent.CountDownLatch; /** * CountDownLatch的使用,有五个线程,6个扣除点 * 扣除完成后主线程和业务线程,才能执行工作 * 扣除点一般都是大于等于需要初始化的线程的 * @author ZYGisComputer */ public class UseCountDownLatch { /** * 设置为6个扣除点 */ static CountDownLatch countDownLatch = new CountDownLatch( 6 ); /** * 初始化线程 */ private static class InitThread implements Runnable { @Override public void run() { System.out.println( "thread_" + Thread.currentThread().getId() + " ready init work ....." ); // 执行扣减 扣减不代表结束 countDownLatch.countDown(); for ( int i = 0 ; i < 2 ; i++) { System.out.println( "thread_" + Thread.currentThread().getId() + ".....continue do its work" ); } } } /** * 业务线程 */ private static class BusiThread implements Runnable { @Override public void run() { // 业务线程需要在等初始化完毕后才能执行 try { countDownLatch.await(); for ( int i = 0 ; i < 3 ; i++) { System.out.println( "BusiThread " + Thread.currentThread().getId() + " do business-----" ); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { // 创建单独的初始化线程 new Thread(){ @Override public void run() { SleepTools.ms( 1 ); System.out.println( "thread_" + Thread.currentThread().getId() + " ready init work step 1st....." ); // 扣减一次 countDownLatch.countDown(); System.out.println( "begin stop 2nd....." ); SleepTools.ms( 1 ); System.out.println( "thread_" + Thread.currentThread().getId() + " ready init work step 2nd....." ); // 扣减一次 countDownLatch.countDown(); } }.start(); // 启动业务线程 new Thread( new BusiThread()).start(); // 启动初始化线程 for ( int i = 0 ; i <= 3 ; i++) { new Thread( new InitThread()).start(); } // 主线程进入等待 try { countDownLatch.await(); System.out.println( "Main do ites work....." ); } catch (InterruptedException e) { e.printStackTrace(); } } } |
thread_13 ready init work .....
thread_13.....continue do its work
thread_13.....continue do its work
thread_14 ready init work .....
thread_14.....continue do its work
thread_14.....continue do its work
thread_15 ready init work .....
thread_15.....continue do its work
thread_11 ready init work step 1st.....
begin stop 2nd.....
thread_16 ready init work .....
thread_16.....continue do its work
thread_16.....continue do its work
thread_15.....continue do its work
thread_11 ready init work step 2nd.....
Main do ites work.....
BusiThread 12 do business-----
BusiThread 12 do business-----
BusiThread 12 do business-----
1 2 3 4 5 6 7 8 9 10 11 12 | /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier( int parties) { this (parties, null ); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier( int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException(); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | package org.dance.day2.util; import org.dance.tools.SleepTools; import java.util.Map; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; /** * CyclicBarrier的使用 * * @author ZYGisComputer */ public class UseCyclicBarrier { /** * 存放子线程工作结果的安全容器 */ private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>(); private static CyclicBarrier cyclicBarrier = new CyclicBarrier( 5 , new CollectThread()); /** * 结果打印线程 * 用来演示CyclicBarrier的第二个参数,barrierAction */ private static class CollectThread implements Runnable { @Override public void run() { StringBuffer result = new StringBuffer(); for (Map.Entry<String, Long> workResult : resultMap.entrySet()) { result.append( "[" + workResult.getValue() + "]" ); } System.out.println( "the result = " + result); System.out.println( "do other business....." ); } } /** * 工作子线程 * 用于CyclicBarrier的一组线程 */ private static class SubThread implements Runnable { @Override public void run() { // 获取当前线程的ID long id = Thread.currentThread().getId(); // 放入统计容器中 resultMap.put(String.valueOf(id), id); Random random = new Random(); try { if (random.nextBoolean()) { Thread.sleep( 1000 + id); System.out.println( "Thread_" +id+ "..... do something" ); } System.out.println(id+ " is await" ); cyclicBarrier.await(); Thread.sleep( 1000 +id); System.out.println( "Thread_" +id+ ".....do its business" ); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } public static void main(String[] args) { for ( int i = 0 ; i <= 4 ; i++) { Thread thread = new Thread( new SubThread()); thread.start(); } } } |
11 is await
14 is await
15 is await
Thread_12..... do something
12 is await
Thread_13..... do something
13 is await
the result = [11][12][13][14][15]
do other business.....
Thread_11.....do its business
Thread_12.....do its business
Thread_13.....do its business
Thread_14.....do its business
Thread_15.....do its business
通过返回结果可以看出前面的11 14 15三个线程没有进入if语句块,在执行到await的时候进入了等待,而另外12 13两个线程进入到了if语句块当中,多休眠了1秒多,然后当5个线程同时到达await的时候,屏障开放,执行了barrierAction线程,然后线程组继续执行
CountDownLatch | CyclicBarrier | |
控制 | 第三方控制 | 自身控制 |
传入数量 | 大于等于线程数量 | 等于线程数量 |
1 2 3 4 5 6 7 8 9 10 11 | /** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */ public Semaphore( int permits) { sync = new NonfairSync(permits); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 | package org.dance.day2.util.pool; import org.dance.tools.SleepTools; import java.sql.*; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; /** * 数据库连接 * @author ZYGisComputer */ public class SqlConnection implements Connection { /** * 获取数据库连接 * @return */ public static final Connection fetchConnection(){ return new SqlConnection(); } @Override public void commit() throws SQLException { SleepTools.ms( 70 ); } @Override public Statement createStatement() throws SQLException { SleepTools.ms( 1 ); return null ; } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { return null ; } @Override public CallableStatement prepareCall(String sql) throws SQLException { return null ; } @Override public String nativeSQL(String sql) throws SQLException { return null ; } @Override public void setAutoCommit( boolean autoCommit) throws SQLException { } @Override public boolean getAutoCommit() throws SQLException { return false ; } @Override public void rollback() throws SQLException { } @Override public void close() throws SQLException { } @Override public boolean isClosed() throws SQLException { return false ; } @Override public DatabaseMetaData getMetaData() throws SQLException { return null ; } @Override public void setReadOnly( boolean readOnly) throws SQLException { } @Override public boolean isReadOnly() throws SQLException { return false ; } @Override public void setCatalog(String catalog) throws SQLException { } @Override public String getCatalog() throws SQLException { return null ; } @Override public void setTransactionIsolation( int level) throws SQLException { } @Override public int getTransactionIsolation() throws SQLException { return 0 ; } @Override public SQLWarning getWarnings() throws SQLException { return null ; } @Override public void clearWarnings() throws SQLException { } @Override public Statement createStatement( int resultSetType, int resultSetConcurrency) throws SQLException { return null ; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null ; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return null ; } @Override public Map<String, Class<?>> getTypeMap() throws SQLException { return null ; } @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException { } @Override public void setHoldability( int holdability) throws SQLException { } @Override public int getHoldability() throws SQLException { return 0 ; } @Override public Savepoint setSavepoint() throws SQLException { return null ; } @Override public Savepoint setSavepoint(String name) throws SQLException { return null ; } @Override public void rollback(Savepoint savepoint) throws SQLException { } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { } @Override public Statement createStatement( int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null ; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null ; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return null ; } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return null ; } @Override public PreparedStatement prepareStatement(String sql, int [] columnIndexes) throws SQLException { return null ; } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return null ; } @Override public Clob createClob() throws SQLException { return null ; } @Override public Blob createBlob() throws SQLException { return null ; } @Override public NClob createNClob() throws SQLException { return null ; } @Override public SQLXML createSQLXML() throws SQLException { return null ; } @Override public boolean isValid( int timeout) throws SQLException { return false ; } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { } @Override public String getClientInfo(String name) throws SQLException { return null ; } @Override public Properties getClientInfo() throws SQLException { return null ; } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { return null ; } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { return null ; } @Override public void setSchema(String schema) throws SQLException { } @Override public String getSchema() throws SQLException { return null ; } @Override public void abort(Executor executor) throws SQLException { } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { } @Override public int getNetworkTimeout() throws SQLException { return 0 ; } @Override public <T> T unwrap(Class<T> iface) throws SQLException { return null ; } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return false ; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | package org.dance.day2.util.pool; import java.sql.Connection; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.Semaphore; /** * 使用信号量控制数据库的链接和释放 * * @author ZYGisComputer */ public class DBPoolSemaphore { /** * 池容量 */ private final static int POOL_SIZE = 10 ; /** * useful 代表可用连接 * useless 代表已用连接 * 为什么要使用两个Semaphore呢?是因为,在连接池中不只有连接本身是资源,空位也是资源,也需要记录 */ private final Semaphore useful, useless; /** * 连接池 */ private final static LinkedList<Connection> POOL = new LinkedList<>(); /** * 使用静态块初始化池 */ static { for ( int i = 0 ; i < POOL_SIZE; i++) { POOL.addLast(SqlConnection.fetchConnection()); } } public DBPoolSemaphore() { // 初始可用的许可证等于池容量 useful = new Semaphore(POOL_SIZE); // 初始不可用的许可证容量为0 useless = new Semaphore( 0 ); } /** * 获取数据库连接 * * @return 连接对象 */ public Connection takeConnection() throws InterruptedException { // 可用许可证减一 useful.acquire(); Connection connection; synchronized (POOL) { connection = POOL.removeFirst(); } // 不可用许可证数量加一 useless.release(); return connection; } /** * 释放链接 * * @param connection 连接对象 */ public void returnConnection(Connection connection) throws InterruptedException { if ( null !=connection){ // 打印日志 System.out.println( "当前有" +useful.getQueueLength()+ "个线程等待获取连接,," + "可用连接有" +useful.availablePermits()+ "个" ); // 不可用许可证减一 useless.acquire(); synchronized (POOL){ POOL.addLast(connection); } // 可用许可证加一 useful.release(); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | package org.dance.day2.util.pool; import org.dance.tools.SleepTools; import java.sql.Connection; import java.util.Random; /** * 测试Semaphore * @author ZYGisComputer */ public class UseSemaphore { /** * 连接池 */ public static final DBPoolSemaphore pool = new DBPoolSemaphore(); private static class BusiThread extends Thread{ @Override public void run() { // 随机数工具类 为了让每个线程持有连接的时间不一样 Random random = new Random(); long start = System.currentTimeMillis(); try { Connection connection = pool.takeConnection(); System.out.println( "Thread_" +Thread.currentThread().getId()+ "_获取数据库连接耗时[" +(System.currentTimeMillis()-start)+ "]ms." ); // 模拟使用连接查询数据 SleepTools.ms( 100 +random.nextInt( 100 )); System.out.println( "查询数据完成归还连接" ); pool.returnConnection(connection); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { for ( int i = 0 ; i < 50 ; i++) { BusiThread busiThread = new BusiThread(); busiThread.start(); } } } |
1 2 3 4 5 6 | /** * Creates a new Exchanger. */ public Exchanger() { participant = new Participant(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | package org.dance.day2.util; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Exchanger; /** * 线程之间交换数据 * @author ZYGisComputer */ public class UseExchange { private static final Exchanger<Set<String>> exchanger = new Exchanger<>(); public static void main(String[] args) { new Thread(){ @Override public void run() { Set<String> aSet = new HashSet<>(); aSet.add( "A" ); aSet.add( "B" ); aSet.add( "C" ); try { Set<String> exchange = exchanger.exchange(aSet); for (String s : exchange) { System.out.println( "aSet" +s); } } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); new Thread(){ @Override public void run() { Set<String> bSet = new HashSet<>(); bSet.add( "1" ); bSet.add( "2" ); bSet.add( "3" ); try { Set<String> exchange = exchanger.exchange(bSet); for (String s : exchange) { System.out.println( "bSet" +s); } } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } } |
以上就是详解JUC 常用4大并发工具类的详细内容,更多关于juc 并发工具类的资料请关注自学编程网其它相关文章!
- 本文固定链接: https://zxbcw.cn/post/197084/
- 转载请注明:必须在正文中标注并保留原文链接
- QQ群: PHP高手阵营官方总群(344148542)
- QQ群: Yii2.0开发(304864863)