Java 并发编程

Posted by whaler404 on January 25, 2024

线程

线程概念

进程就是执行中的程序,每个进程都有自己独立的内存空间、系统资源

Java程序通过流控制来执行程序流

  • 程序中单个顺序的流控制:线程
  • 多线程:多个线程同时运行

线程创建

  • 继承方式创建线程

    继承java.lang.Thread,覆盖Thread类的run()方法创建线程,Thread是具体类

    Thread(ThreadGroup group, Runnable target, String  name);
    //参数target是线程执行的目标对象,即线程执行的代码;
    //group是线程所在的组;
    //name是线程的名字。
    
    class MyThread extends Thread {
        public void run() {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getId()
                                   + " Value " + i);
            }
        }
    }
    public class Example {
        public static void main(String args[]) {
            MyThread t1 = new MyThread();
            t1.start();  // 启动第一个线程
            MyThread t2 = new MyThread();
            t2.start();  // 启动第二个线程
        }
    }
      
    
  • 实现接口创建线程

    实现java.lang.Runnable,只定义了run()方法

    class MyRunnable implements Runnable {
        public void run() {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getId()
                                   + " Value " + i);
            }
        }
    }
    public class Example {
        public static void main(String args[]) {
            Thread t1 = new Thread(new MyRunnable());
            t1.start();
            Thread t2 = new Thread(new MyRunnable());
            t2.start();
        }
    }
    

线程生命周期和调度

  • 线程生命周期
    • run方法:线程体thread body
    • sleep:以毫秒为单位,线程阻塞一段时间,线程不释放占用的资源Thread.sleep()调用
    • suspend和resume:配套使用,suspend使线程进入阻塞,不会自动会恢复,必须其对应的resume被调用,线程才重新唤醒,线程释放占用的资源,JVM调度进入临时空间
    • wait和notify:配套使用,wait使得线程进入阻塞,可以指定一段时间后恢复,也可以不主动恢复。前者当对应的notify被调用或超出指定时间时重新就绪,后者必须要notify被调用才能被唤醒。由互斥锁调用,private final Object lock=new Object(); synchronized(lock){},wait和notify比suspend和resume先进。
    public class test extends Thread{
    	private final Object lock=new Object();
    	static int count=0;
    	public synchronized void add(){count++;System.out.println(Thread.currentThread().getId()+":add");}
    	public synchronized int get(){System.out.println(Thread.currentThread().getId()+":get");return count;}
    	public void run(){
    		while(true){
    			add(); 
    			try {
    				Thread.sleep(500) ;
    				synchronized(lock){lock.wait(500);}
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			get();
    		}
    	}
    	public static void main(String[]args){
    		test a=new test();test b=new test();
    		a.start();b.start();
    	}
    }
    
  • 线程调度和优先级
    • 调度scheduling:只有一个CPU,以某种顺序在单CPU情况下执行多线程
    • 抢先式调度:可被抢占,相同优先级,交替运行
    • Java线程分10个优先级,守护进程daemon优先级最低

线程互斥

  • 临界资源:多线程共享的资源

  • 临界代码段

  • Java提供关键字synchronized,配合对象使用互斥锁

    • 同步方法
    public class MyClass {
        private int count = 0;
      
        // 同步方法
        public synchronized void increment() {
            count++;
        }
      
        public synchronized int getCount() {
            return count;
        }
    }
    //在方法的声明中使用 synchronized 关键字可以确保整个方法在同一时刻只能被一个线程执行
    
    • 同步代码块
    public class MyClass {
        private int count = 0;
        private Object lock = new Object();
      
        public void increment() {
            synchronized (lock) {
                count++;
            }
        }
      
        public int getCount() {
            synchronized (lock) {
                return count;
            }
        }
    }
    //使用一个对象 lock 作为锁对象
    //然后在 increment 和 getCount 方法中使用 synchronized 块
    //确保对 count 的操作是线程安全的
    

    使用 synchronized 会带来一定的性能开销,因此在某些情况下,可能会选择使用更轻量级的同步机制,如 java.util.concurrent 包中的工具类,例如 ReentrantLockReadWriteLock 等。这些工具类提供了更灵活的同步控制选项

线程同步

  • synchronized适合用来竞争,同时lock的wait和notify实现持续持有和释放
  • 不适合用来同步,如果要使用,比如生产者消费者问题,先持有lock后持有consume,先释放lock再释放consume这种错开的,可以使用线程内局部变量对释放的动作标记
public class test{
    private static int cnt=0;
    private static int cap=5;
    private final static Object lock=new Object();
    private final static Object c=new Object();
    private final static Object p=new Object();
	public class Comsumer implements Runnable{
        public void run(){
			while(true){
				synchronized(c){
					boolean iflock=false;
					synchronized(lock){
						if(cnt==0)iflock=true;
					}  
					if(iflock)
						try {
							c.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
				}
				boolean ifnotify=false;
				synchronized(lock){
					cnt--;
					synchronized(out){
					System.out.println("in comsumer:"+cnt);}
					if(cnt<cap){
						ifnotify=true;
					}
				}
				if(ifnotify)
					synchronized(p){
						p.notify();
					}
			}
        }
    }
    public class Producer implements Runnable{
        public void run(){
			while(true){
				synchronized(p){
					boolean iflock=false;
					synchronized(lock){
						if(cnt==cap)iflock=true;
					}
					if(iflock)
						try {
							p.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						} 
					
				}
				boolean ifnotify=false;
				synchronized(lock){
					cnt++;
					System.out.println("in producer:"+cnt);
					if(cnt>0){
						ifnotify=true;
					}
				}
				if(ifnotify)
					synchronized(c){
						c.notify();
					}
			}
        }
    }
    public static void main(String[]args){
		test t=new test(); // 内部类的构造还是要用到外部类
        Thread comsumer=new Thread(t.new Comsumer());// 看是静态还是非静态内部类
		Thread producer=new Thread(t.new Producer());// 实现线程接口和继承线程接口不一样
		comsumer.start();producer.start();
    }
}

线程通讯

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

class Sender extends Thread {
    private PipedOutputStream pipedOutputStream;

    public Sender(PipedOutputStream pipedOutputStream) {
        this.pipedOutputStream = pipedOutputStream;
    }

    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                pipedOutputStream.write(i);
                System.out.println("Sent: " + i);
                Thread.sleep(500);
            }
            pipedOutputStream.close();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Receiver extends Thread {
    private PipedInputStream pipedInputStream;

    public Receiver(PipedInputStream pipedInputStream) {
        this.pipedInputStream = pipedInputStream;
    }

    public void run() {
        try {
            int data;
            while ((data = pipedInputStream.read()) != -1) {
                System.out.println("Received: " + data);
            }
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class PipeExample {
    public static void main(String[] args) {
        try {
            PipedOutputStream pipedOutputStream = new PipedOutputStream();
            PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

            Sender sender = new Sender(pipedOutputStream);
            Receiver receiver = new Receiver(pipedInputStream);

            sender.start();
            receiver.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

线程死锁

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Philosopher extends Thread {
    private int id;
    private Lock leftFork;
    private Lock rightFork;

    public Philosopher(int id, Lock leftFork, Lock rightFork) {
        this.id = id;
        this.leftFork = leftFork;
        this.rightFork = rightFork;
    }

    private void think() throws InterruptedException {
        System.out.println("Philosopher " + id + " is thinking");
        Thread.sleep(1000);
    }

    private void eat() throws InterruptedException {
        System.out.println("Philosopher " + id + " is eating");
        Thread.sleep(1000);
    }

    @Override
    public void run() {
        try {
            while (true) {
                think();
                leftFork.lock();
                System.out.println("Philosopher " + id + " picked up left fork");
                rightFork.lock();
                System.out.println("Philosopher " + id + " picked up right fork");
                eat();
                leftFork.unlock();
                System.out.println("Philosopher " + id + " put down left fork");
                rightFork.unlock();
                System.out.println("Philosopher " + id + " put down right fork");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class DiningPhilosophers {
    public static void main(String[] args) {
        int numPhilosophers = 5;
        Lock[] forks = new Lock[numPhilosophers];

        for (int i = 0; i < numPhilosophers; i++) {
            forks[i] = new ReentrantLock();
        }

        Philosopher[] philosophers = new Philosopher[numPhilosophers];
        for (int i = 0; i < numPhilosophers; i++) {
            philosophers[i] = new Philosopher(i, forks[i], forks[(i + 1) % numPhilosophers]);
            philosophers[i].start();
        }
    }
}

相关资料