μ€λ λνμ μμ μ²λ¦¬μ μ¬μ©λλ μ€λ λλ₯Ό μ νλ κ°μλ§νΌ μ ν΄ λκ³ μμ νμ λ€μ΄μ€λ μμ λ€μ νλμ© μ€λ λκ° λ§‘μ μ²λ¦¬νλ€.
λ³λ ¬ μμ μ²λ¦¬κ° λ§μμ§λ©΄ μ€λ λ κ°μκ° μ¦κ°λλλ° κ·Έμ λ°λ₯Έ μ€λ λ μμ±, μ€μΌμ€λ§μΌλ‘ μΈν΄ CPUκ° λ°λΉ μ Έ λ©λͺ¨λ¦¬ μ¬μ©λμ΄ λμ΄λλ€.
λ°λΌμ μ ν리μΌμ΄μ μ μ±λ₯μ΄ μ ν λλ―λ‘ μ€λ λ νμ μ¬μ©νμ¬ μ€λ λ μ 체 κ°μκ° λμ΄λμ§ μλλ‘ ν΄μΌνλ€.
μ€λ λν μμ±
public class ExecutorExample {
public static void main(String[] args) {
// 1κ°μ μ€λ λλ₯Ό μ¬μ©νλ μ€λ λν μμ±
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// μ€λ λλ₯Ό μ ν μμ΄ μ¬μ©νλ μ€λ λν μμ±
ExecutorService cachedThread = Executors.newCachedThreadPool();
// 3κ°μ μ€λ λλ₯Ό μ¬μ©νλ μ€λ λν μμ±
ExecutorService fixedThread = Executors.newFixedThreadPool(3);
// CPU μ½μ΄μ μλ§νΌ μ΅λ μ€λ λλ₯Ό μ¬μ©νλ μ€λ λν μμ±
ExecutorService maxFixedThread = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
}
μ€λ λν μ’ λ£
μ€λ λνμ μ€λ λλ κΈ°λ³Έμ μΌλ‘ λ°λͺ¬ μ€λ λκ° μλκΈ° λλ¬Έμ mian μ€λ λκ° μ’ λ£λμ΄λ μμ μ μ²λ¦¬νκΈ° μν΄ κ³μ μ€ν μνλ‘ λ¨μμλ€.
// λ¨μμλ μμ
μ λ§λ¬΄λ¦¬νκ³ μ’
λ£νλ€.
executorService.shutdown();
// λ¨μμλ μμ
κ³Ό μκ΄μμ΄ κ°μ λ‘ μ’
λ£νλ€.
executorService.shutdownNow();
μ€λ λ ν μμ μμ±
νλμ μμ μ Runnable , Callable ꡬν ν΄λμ€λ‘ νννλ€. λμ μ°¨μ΄μ μ
Runnable -> 리ν΄κ°μ΄ μ‘΄μ¬νμ§ μμ.
Runnable task = new Runnable(){
@Override
public void run(){
// μμ
λ΄μ©
}
}
Callable -> 리ν΄κ°μ΄ μ‘΄μ¬ν¨
Callable<T> task = new Callable<T>(){
@Override
public T call() throws Exceoption{
//μμ
λ΄μ©
return T;
}
}
μμ μ²λ¦¬ μμ²
ExecutorServiceμ μμ νμ Runnable λλ Callable κ°μ²΄λ₯Ό λ£λ νμλ€.
execute()λ μμ μ²λ¦¬ κ²°κ³Όλ₯Ό λ°μ§ λͺ»νκ³ μμ μ²λ¦¬ λμ€ μμΈκ° λ°μνλ©΄ μ€λ λκ° μ’ λ£λλ€.
submit() μ μμ μ²λ¦¬ κ²°κ³Όλ₯Ό λ°μ μ μλλ‘ Futureλ₯Ό 리ν΄νκ³ μμ μ²λ¦¬ λμ€ μμΈκ° λ°μνλλΌλ μ€λ λλ μ’ λ£λμ§ μκ³ λ€μ μμ μ μν΄ μ¬μ¬μ© νλ€.
-> κ°κΈμ μμ± μ€λ²ν€λλ₯Ό μ€μ΄κΈ° μν΄ submit()μ μ¬μ©νλ κ²μ΄ μ’λ€.
execute() λ₯Ό μ¬μ©νμ λ
μ€λ λμ κ°μλ λ³ν¨μ΄ μκ³ μ€ν μ€λ λμ μ΄λ¦μ΄ λͺ¨λ λ€λ₯Έκ²μ νμΈν μ μλ€.
μ΄κ²μ μμ μ²λ¦¬ λμ€ μμΈκ° λ°μνκΈ° λλ¬Έμ ν΄λΉ μ€λ λλ μ κ±°λκ³ μ μ€λ λκ° κ³μ μμ±λκΈ° λλ¬Έμ΄λ€.
for (int i=0;i<10;i++){
Runnable runnable = new Runnable() {
@Override
public void run() {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedThread;
int poolSize = threadPoolExecutor.getPoolSize();
String threadName = Thread.currentThread().getName();
System.out.println("μ΄ μ€λ λ κ°μ : "+poolSize+"μμ
μ€λ λ μ΄λ¦ :" + threadName);
//μμΈ λ°μ μν΄
int value = Integer.parseInt("μΌ");
}
};
// μμ
μ²λ¦¬ μμ²
fixedThread.execute(runnable);
submit() μ μ¬μ©νμ λ
μλμ κ°μ΄ μ€λ λ μ΄λ¦μ΄ λμ΄λμ§ μλκ²μ 보면 μ€λ λκ° μ’ λ£λμ§ μκ³ μ¬μ¬μ© λλ κ²μ λ³Ό μ μλ€.
λΈλ‘νΉ λ°©μμ μμ μλ£ ν΅λ³΄
submit() λ©μλλ 맀κ°κ°μΌλ‘ μ€ Runnable μ΄λ Callable μμ μ μ€λ λ νμ μμ νμ μ μ₯νκ³ μ¦μ Futureκ°μ²΄λ₯Ό 리ν΄νλ€.
Future κ°μ±λ μμ κ²°κ³Όκ° μλλΌ μμ μ΄ μλ£λ λκΉμ§ κΈ°λ€λ Έλ€κ°(λΈλ‘νΉ) μ΅μ’ κ²°κ³Όλ₯Ό μ»λλ° μ¬μ©λλ€. (μ§μ° μλ£ κ°μ±)
Futrueμ get() λ©μλλ₯Ό μ¬μ©νλ©΄ μμ μ΄ μλ£λ λκΉμ§ λΈλ‘νΉλμλ€κ° μ²λ¦¬κ²°κ³Ό Vλ₯Ό 리ν΄νλ€.(Future<V>)
get(long timeout,TimeUnit unit) μ μ¬μ©νλ©΄ μκ° μ μ μμ μ΄ μλ£λλ©΄ κ²°κ³Ό Vλ₯Ό 리ν΄νμ§λ§ μλ£λμ§ μμΌλ©΄ Exceptionμ λ°μμν¨λ€.
submit(Runnable task) future.get() -> null 리ν΄
submit(Runnable task, Integer result) future.get() -> int λ°ν
submit(Callable<String> task) future.get() -> String λ°ν
μμ κ°μ΄ λΈλ‘νΉ λ°©μμ μμ μλ£ ν΅λ³΄μμ μ£Όμν μ μ μμ μ μ²λ¦¬νλ μ€λ λκ° μμ μ μλ£νκΈ° κΉμ§λ
get() λ©μλκ° λΈλ‘νΉ λλ―λ‘ λ€λ₯Έ μ½λλ₯Ό μ€νν μ μλ€.
κ·Έλ κΈ° λλ¬Έμ μλμ κ°μ΄ get() λ©μλλ₯Ό νΈμΆνλ μ€λ λλ μλ‘μ΄ μ€λ λμ΄κ±°λ μ€λ λν μμ μλ λ€λ₯Έ μ€λ λκ° λμ΄μΌ νλ€.
// μλ‘μ΄ μ€λ λλ₯Ό μμ±ν΄μ νΈμΆ
new Thread (new Runnalbe(){
@Overrid
public void run(){
try{
future.get();
}catch (Exception e){
e.printStackTrace();
}
}
}
}).start();
// μ€λ λνμ μ€λ λκ° νΈμΆ
executorService.submit(new Runnable(){
@Override
public void run(){
try{
future.get();
}catch (Excetion e){
e.printStackTrace();
}
}
});
Future κ°μ²΄λ μμ κ²°κ³Όλ₯Ό μ»κΈ° μν get() λ©μλ μ΄μΈμλ
cancel : μμ μμμ 맀κ°κ°κ³Ό μκ΄μμ΄ μ·¨μ ν true /μμ μ΄ μ§ν μ€μΌ κ²½μ° λ§€κ°κ°μ΄ true μΌ κ²½μ°μλ§ μμ μ€λ λλ₯Ό interrupt
isCancelled : μμ μ΄ μλ£λμκ±°λ μ΄λ€ μ΄μ λ‘ μΈν΄ μ·¨μν μ μλ€λ©΄ false
isDone : μμ μ΄ μ μμ ,μμΈ,μ·¨μ λ± μλ£λμλ€λ©΄ true
1. 리ν΄κ°μ΄ μλ μμ μλ£ ν΅λ³΄
submit λ©μλλ₯Ό μ¬μ©νλ©΄ λλ€.
κ²°κ³Όκ°μ΄ μμμλ λΆκ΅¬νκ³ Futureκ°μ²΄λ₯Ό 리ν΄νλλ°, μ΄κ²μ μ€λ λκ° μμ μ²λ¦¬λ₯Ό μ μμ μΌλ‘ μλ£νλμ§ μμΈκ° λ°μνλμ§ νμΈνκΈ° μν΄μμ΄λ€.
Future future = executorService.submit(task);
// nullμ 리ν΄νμ§λ§ interruptλκ±°λ μμΈκ° λ°μν κ²½μ°λ₯Ό μν΄ μμΈ μ²λ¦¬ μ½λκ° νμνλ€.
try{
future.get(); // null
}catch(InterruptedException e){
// μμ
μ²λ¦¬ λμ€ μ€λ λκ° interrupt λ κ²½μ° μ€νν μ½λ
}catch(ExecutionException e){
// μμ
μ²λ¦¬ λμ€ μμΈκ° λ°μλ κ²½μ° μ€νν μ½λ
}
2. 리ν΄κ°μ΄ μλ μμ μλ£ ν΅λ³΄
μ€λ λκ° μμ ν μ²λ¦¬ κ²°κ³Όλ₯Ό μ»μ΄μΌ νλ€λ©΄ Callable κ°μ²΄λ₯Ό μμ±νλ©΄ λλ€.
// Callable κ°μ²΄ μμ±
Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception{
//μ€λ λκ° μ²λ¦¬ν μμ
λ΄μ©
return T;
}
};
// submit λ©μλλ₯Ό μ¬μ©ν΄ Future<T>λ₯Ό 리ν΄
Future<T> future = executorService.submit(task);
// μμΈμ²λ¦¬
try{
T result = future.get();
}catch(InterruptedException e){
// μμ
μ²λ¦¬ λμ€ μ€λ λκ° interrupt λ κ²½μ°
}catch(ExecutionException e){
// μμ
μ²λ¦¬ λμ€ μμΈ
3. μμ μ²λ¦¬ κ²°κ³Όλ₯Ό μΈλΆ κ°μ²΄μ μ μ₯
μΈλΆ Result κ°μ²΄μ μμ κ²°κ³Όλ₯Ό μ μ₯νμ¬ μ ν리μΌμ΄μ μ΄ κ°μ²΄λ₯Ό μ¬μ©ν΄ μ΄λ€ μμ μ μ§νν μ μκ² ν΄μ€λ€.
λκ² Result κ°μ²΄λ 곡μ κ°μ²΄κ° λμ΄, λ κ° μ΄μμ μ€λ λ μμ μ μ·¨ν©ν λͺ©μ μΌλ‘ μ¬μ©λλ€.
μλμ κ°μ΄ 곡μ κ°μ²΄κ° λμ΄ μ¬μ©λλ€.
// 곡μ κ°μ²΄ μμ±
Result result = new Result();
// μμ
κ°μ²΄ μμ±
Runnable task = new Task(result);
// μμ
μ²λ¦¬ μμ²
Future<Result> future = excutorServie.submit(task,result);
// κ²°κ³Ό λ°κΈ° or μμΈ μ²λ¦¬
try{
result = future.get();
}catch (Excetion e){
e.printStackTrace();
}
// 곡μ κ°μ²΄
class Result {
int accumValue;
synchronized void addValue(int value){
accumValue += value;
}
}
4. μμ μλ£ μμΌλ‘ ν΅λ³΄
μμ μ μκ³Ό μ€λ λ μ€μΌμ€λ§μ λ°λΌμ λ¨Όμ μμ²ν μμ μ΄ λμ€μ μλ£λλ κ²½μ°λ λ°μνλ€.
μμ°¨μ μΌ νμκ° μμ κ²½μ° μ²λ¦¬κ° μλ£λ κ²λΆν° κ²°κ³Όλ₯Ό μ»μ΄ μ¬μ©ν μ μλ€.
CompletionService μ poll() take() λ₯Ό μ¬μ©νλ©΄ λλ€.
poll() -> μλ£λ μμ μ΄ μλ€λ©΄ μ¦μ null return
take() -> μλ£λ μμ μ΄ μλ€λ©΄ μμ λκΉμ§ λΈλ‘νΉλ¨
ExcutorService executorService = Excutors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
// κ°μ²΄λ₯Ό μμ±ν λ μμ±μ 맀κ°κ°μΌλ‘ ExecutorServiceλ₯Ό μ 곡
CompletionService<V> completionService = new ExecutorCompletionService<V>{
executorService);
// submit() λ©μλλ‘ μμ
μ²λ¦¬ μμ²
// μ€λ λνμκ² μμ
μ²λ¦¬ μμ²
completionService.submit(Callable<T> task);
completionService.submit(Runable task, V result);
// whileλ¬Έμ΄ μμ κ²½μ° μ ν리μΌμ΄μ
μ΄ μ’
λ£λ λκΉμ§ λ°λ³΅ μ€νν΄μΌ νλ―λ‘ μ€λ λνμ μ€λ λμμ μ€ννλ κ²μ΄ μ’λ€.
executorService.submit(new Runnable(){
@Override
public void run(){
while(true){
try{
// μλ£λ μ§μ
κ°μ Έμ€κΈ°
Future<Integer> future = completionService.task();
int value = future.get();
} catch (Exciption e) {
break;
}
5. μ½λ°± λ°©μμ μμ μλ£ ν΅λ³΄
μ€λ λκ° μμ μ μλ£νλ©΄ νΉμ λ©μλλ₯Ό μλ μ€ννλ κΈ°λ²μ΄λ€. μ΄λ μλ μ€νλλ λ©μλλ₯Ό μ½λ°± λ©μλλΌκ³ νλ€.
λΈλ‘νΉ λ°©μκ³Ό μ½λ°± λ°©μμ μ°¨μ΄μ μ 무μμΌκΉ?
λΈλ‘νΉ λ°©μμ μμ μ²λ¦¬λ₯Ό μμ²ν ν μμ μ΄ μλ£λ λκΉμ§ λΈλ‘νΉλμ§λ§
μ½λ°± λ°©μμ μμ μ²λ¦¬λ₯Ό μμ²ν ν κ²°κ³Όλ₯Ό κΈ°λ€λ¦΄ νμ μμ΄ λ€λ₯Έ κΈ°λ₯μ μνν μ μλ€. -> μμ μ²λ¦¬κ° μλ£λλ©΄ μλμΌλ‘ μ½λ°± λ©μλκ° μ€νλμ΄ κ²°κ³Όλ₯Ό μ μ μκΈ° λλ¬Έ
ExecutorServicesms μ μ½λ°±μ μν΄ λ³λμ κΈ°λ₯μ μ 곡νμ§ μμ§λ§ Runnable ꡬν ν΄λμ€λ₯Ό μμ±ν λ μ½λ°± κΈ°λ₯μ ꡬνν μ μλ€.
μ§μ μ μν΄λ μ’κ³ CompletionHandlerλ₯Ό μ΄μ©ν΄λ μ’λ€.
CompletionHandler<V,A> callback = new CompletionHandler<V,A>(){
@Override
public void completed(V result,A attachment){
// μμ
μ μ μ μ²λ¦¬ μλ£νμ λ νΈμΆλλ μ½λ°± λ©μλ
}
@Override
public void failed(Throwable exc, A attachemnt){
// μμ
μ²λ¦¬ λμ€ μμΈκ° λ°μνμ λ νΈμΆλλ μ½λ°± λ©μλ
};
}
Runnable task = new Runnable(){
@Override
public void run(){
try{
// μμ
μ²λ¦¬
V result = ...;
// μμ
μ μ μ μ²λ¦¬νμ κ²½μ° νΈμΆ
callback.completed(result,null);
}catch(Exception e){
// μμΈκ° λ°μ νμ κ²½μ° νΈμΆ
callback.failed(e,null);
}
}
};
μ¬κΈ°μ Vνμ νλΌλ―Έν°λ κ²°κ³Όκ°μ νμ μ΄κ³ Aλ 첨λΆκ°(μ½λ°± λ©μλμ κ²°κ³Όκ° μ΄μΈ μΆκ°μ μΌλ‘ μ λ¬ νλ κ°μ²΄)μ νμ μ΄λ€.
λ§μ½ 첨λΆκ°μ΄ νμ μλ€λ©΄ Voidλ‘ μ§μ ν΄μ£Όλ©΄ λλ€.
(μμ )
import javax.annotation.processing.Completion;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CallbackExample {
// μ€λ λν μ μΈ
private ExecutorService executorService;
// μ€λ λ νμ μ€λ λ κ°―μ μ΄κΈ°ν
public CallbackExample(){
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
// V - Integer A - Void μΌλ‘ μ½λ°±λ©μλλ₯Ό κ°μ§ κ°μ²΄ μμ±
private CompletionHandler<Integer,Void> callback =
new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
// μμ
μ΄ μ μ μ²λ¦¬ λμμ λ νΈμΆ
System.out.println("completed() μ€ν : " +result);
}
@Override
public void failed(Throwable exc, Void attachment) {
// μμΈκ° λ°μ νμ κ²½μ° νΈμΆ
System.out.println("failed() μ€ν : " + exc.toString());
}
};
public void doWork(final String x, final String y){
Runnable task = new Runnable() {
@Override
public void run() {
try{
int intX = Integer.parseInt(x);
int intY = Integer.parseInt(y);
int result = intX+intY;
// μ μμ²λ¦¬ μ½λ°±ν¨μ νΈμΆ
callback.completed(result,null);
}catch (NumberFormatException e){
// μμΈ μ½λ°±ν¨μ νΈμΆ
callback.failed(e,null);
}
}
};
// μ€λ λνμκ² μμ
μμ²
executorService.submit(task);
}
public void finish(){
executorService.shutdown();
}
public static void main(String[] args) {
CallbackExample example = new CallbackExample();
example.doWork("3","3");
// μλ¬ λ°μ
example.doWork("3","μΌ");
example.finish();
}
}