Java在Debian上的并发处理实现
一 环境准备与基础
sudo apt update && sudo apt install openjdk-11-jdkjava -version二 代码模板与常见模式
示例(线程池 + 生产者消费者 + 并行归约):
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentDemo {
private static final int N = 4; // 并行度(可按CPU核数调整)
private static final int TASKS = 100;
static class Task implements Callable<Integer> {
final int id;
Task(int id) { this.id = id; }
public Integer call() {
// 模拟计算
try { Thread.sleep(10); } catch (InterruptedException ignore) {}
return id * id;
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newFixedThreadPool(N);
BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(1000);
// 生产者线程:提交任务
Future<?> submitter = exec.submit(() -> {
for (int i = 0; i < TASKS; i++) {
exec.submit(new Task(i));
}
});
// 消费者线程:从队列消费日志(示例)
Future<?> logger = exec.submit(() -> {
while (!(submitter.isDone() && logQueue.isEmpty())) {
String msg = logQueue.poll(100, TimeUnit.MILLISECONDS);
if (msg != null) System.out.println(msg);
}
});
// 并行归约:收集结果
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < TASKS; i++) {
futures.add(exec.submit(new Task(i)));
}
long sum = 0;
for (Future<Integer> f : futures) {
sum += f.get(5, TimeUnit.SECONDS);
}
System.out.println("Sum of squares: " + sum);
// 关闭
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
}
}
要点:
三 运行与JVM调优
javac ConcurrentDemo.javajava ConcurrentDemo-Xms4g -Xmx4g(避免运行期频繁扩缩堆)-XX:+UseG1GC-XX:ParallelGCThreads=... -XX:ConcGCThreads=...-XX:ThreadStackSize=...四 系统层面与容器场景
nofile 65536),并重启会话或系统使其生效。<Connector port="8080" protocol="org.apache.coyote.http11.Http11Nio2Protocol" maxThreads="500" minSpareThreads="20" acceptCount="1000" />五 实践要点与排错清单