`
bofang
  • 浏览: 126620 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

jetty6中ThreadPool实现的代码解析

阅读更多

QueuedThreadPool承载了jetty6提交任务至线程工作的使命,在没有用concurrent包的情况下,作者实现了漂亮的线程池。

 

QueuedThreadPool内部维护了一个FIFO的job队列,该队列基于数组实现。

 

QueuedThreadPool的核心接口是dispatch(Runnable),正如字面所表达的那样,该方法将派发一个job到一个线程来执行。下面是对该方法的注释解析。

 

 

public boolean dispatch(Runnable job) 
    {  
    	//前置条件的判断
        if (!isRunning() || job==null)
            return false;
        
        //可以执行该job的线程
        PoolThread thread=null;
        
        //是否需要创建新线程的标识
        boolean spawn=false;
            
        synchronized(_lock)
        {
            // Look for an idle thread
            int idle=_idle.size();
            //判断是否有空闲线程,如果有,取出空闲线程列表的最后一个线程
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);
            else//已经没有空闲线程了,将该job放入到job队列中
            {
                // queue the job
            	//增加队列的大小,得到最新的队列大小
            	_queued++;
                
            	//如果最新的队列大小大于最大值,则将最大值设置为最新的队列大小
                if (_queued>_maxQueued)
                    _maxQueued=_queued;
                
                //设置插入位置的job为最新dispatch的job,并且递增下一个插入位置,_nextJobSlot记录的是下一个job可以放置的索引
                _jobs[_nextJobSlot++]=job;
                
                //如果已经插入位置的游标已经到达了队列的尾部,则从头开始
                if (_nextJobSlot==_jobs.length)
                    _nextJobSlot=0;
                
                //如果队列已经满了,则扩容队列。_nextJob是下一个可以获取job的位置,可以认为是队列的尾部,这是一个FIFO的队列
                if (_nextJobSlot==_nextJob)
                {
                    // Grow the job queue
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;
                    if (split>0)
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)
                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                    
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }
                  
                spawn=_queued>_spawnOrShrinkAt;
            }
        }
        
        //如果得到了可以工作的空闲线程,则将利用该线程执行job
        if (thread!=null)
        {
            thread.dispatch(job);
        }
        else if (spawn)//如果需要增加新的线程,则创建一个新的线程
        {
            newThread();
        }
        
        //不管怎么说,这个job要么被一个线程运行,要么成功放到了job的队列,我们的工作算是成功做完了。
        return true;
    }

 

可以看到实现逻辑大致如下:首先判断是否有空闲线程,如果有,从空闲线程列表中移除最后一个线程,用该线程来执行提交的job。如果没有空闲线程,就把该job放到job的队列中。如果放入队列后,队列的长度大于_spawnOrShrinkAt,就说明可以创建一个新的线程。

 

QueuedThreadPool是用PoolThread作为工作线程的。PoolThread是Thread的子类。看看run方法有什么事情要做。

 

 

public void run()
        {
            boolean idle=false;
            Runnable job=null;
            try
            {
            	//只要系统还在运行,就不让该线程停止工作
                while (isRunning())
                {   
                    //有job要做了
                    if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        
                        //标识这个线程不在空闲,因为我有工作要做了。
                        idle=false;
                        todo.run();
                        //我觉得这个地方应该加上 idle = true;下面if(!idle)改成if(idle)语义上更清楚一些
                    }
                    
                    //需要操作FIFO的job队列,给临界代码加锁
                    synchronized(_lock)
                    {
                        //哈哈,有job可以做了
                    	if (_queued>0)
                        {
                    		//递减队列的长度
                            _queued--;
                            
                            //从队列的头部取出一个job
                            job=_jobs[_nextJob];
                            
                            //将头部的位置值为null,并且递增_nextJob,更新头部
                            _jobs[_nextJob++]=null;
                            
                            //如果到达了数组的尾部,则从数组的头开始
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            
                            //非常从容地得到了一个需要执行的job,跳到while的开始,执行该job
                            continue;
                        }

                        //如果job队列为空,那是不是我这个线程是多余的线程,job能很快地被执行掉,需要亲手干掉自己?下面做一些判断
                    	
                    	//所有线程的数量总和
                        final int threads=_threads.size();
                        
                        //如果从数量上判断可以清除掉该线程
                        if (threads>_minThreads && 
                            (threads>_maxThreads || 
                             _idle.size()>_spawnOrShrinkAt))   
                        {
                        	
                        	//空闲时间的条件是否能满足。如果有一个兄弟在不久之前已经自杀了,我需要再晚点。
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                            	//记录我自杀的时间(即最后一个线程自杀的时间)
                                _lastShrink=now;
                                
                                //从空闲队列中移除自己
                                _idle.remove(this);
                                
                                //使命完成,寿终正寝
                                return;
                            }
                        }
                        
                        //如果我做完了自己的job,我已经空闲了,进入空闲线程池里
                        if (!idle)
                        {   
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            idle=true;
                        }
                    }

                    //既然我已经空闲了,如果这个时候还没有job让我来做,我就再等一会吧。
                    //如果等待的时间到了,或者等待的时候有新的job分配给我做,我就不再等待。
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }
                }
            }
            catch (InterruptedException e)
            {
                Log.ignore(e);
            }
            //我死掉了(自杀,或者在执行job时发生异常意外死亡),善后工作
            finally
            {
            	//从空闲线程池中把我的尸体抬走
                synchronized (_lock)
                {
                    _idle.remove(this);
                }
                //从总的线程池中把我的尸体抬走。这个地方可以看到锁的分离。_idle和_threads两个集合用不同的锁控制,提高了性能
                synchronized (_threadsLock)
                {
                    _threads.remove(this);
                }
                synchronized (this)
                {
                    job=_job;
                }
                
                //如果我在处理job时意外死亡,就把我未完成的事业交给下一个兄弟来处理
                if (job!=null)
                {
                    QueuedThreadPool.this.dispatch(job);
                }
            }
        }
        
 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics