Mega Code Archive

 
Categories / Java / Threads
 

Wait exclusive semaphore with wait - notify primitives

/*  * JBoss, Home of Professional Open Source  * Copyright 2005, JBoss Inc., and individual contributors as indicated  * by the @authors tag. See the copyright.txt in the distribution for a  * full listing of individual contributors.  *  * This is free software; you can redistribute it and/or modify it  * under the terms of the GNU Lesser General Public License as  * published by the Free Software Foundation; either version 2.1 of  * the License, or (at your option) any later version.  *  * This software is distributed in the hope that it will be useful,  * but WITHOUT ANY WARRANTY; without even the implied warranty of  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU  * Lesser General Public License for more details.  *  * You should have received a copy of the GNU Lesser General Public  * License along with this software; if not, write to the Free  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.  */ import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; /*  * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and  * individual contributors as indicated by the @authors tag. See the  * copyright.txt in the distribution for a full listing of individual  * contributors.  *   * This is free software; you can redistribute it and/or modify it under the  * terms of the GNU Lesser General Public License as published by the Free  * Software Foundation; either version 2.1 of the License, or (at your option)  * any later version.  *   * This software is distributed in the hope that it will be useful, but WITHOUT  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more  * details.  *   * You should have received a copy of the GNU Lesser General Public License  * along with this software; if not, write to the Free Software Foundation,  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF  * site: http://www.fsf.org.  */ /**  * Wait exclusive semaphore with wait - notify primitives  *   * @author <a href="mailto:simone.bordet@compaq.com">Simone Bordet</a>  * @version $Revision: 1958 $  */ public class WaitSemaphore extends Semaphore implements WaitSync {   // Constants -----------------------------------------------------   private final static int MAX_USERS_ALLOWED = 1;   // Attributes ----------------------------------------------------   private int m_waiters;   // Static --------------------------------------------------------   // Constructors --------------------------------------------------   public WaitSemaphore() {     super(MAX_USERS_ALLOWED);   }   // Public --------------------------------------------------------   public void doWait() throws InterruptedException {     synchronized (this) {       release();       ++m_waiters;       waitImpl(this);       --m_waiters;       acquire();     }   }   public void doNotify() throws InterruptedException {     synchronized (this) {       if (getWaiters() > 0) {         acquire();         notify();         release();       }     }   }   public int getWaiters() {     synchronized (this) {       return m_waiters;     }   }   // Object overrides ---------------------------------------------------   public String toString() {     return super.toString() + " - " + m_waiters;   }   // Package protected ---------------------------------------------   // Protected -----------------------------------------------------   // Private -------------------------------------------------------   // Inner classes ------------------------------------------------- } /**  * Semaphore that can allow a specified number of threads to enter, blocking the  * others. If the specified number of threads is 1, it acts as an exclusive  * semaphore and can be used instead of synchronized blocks  *   * @author <a href="mailto:simone.bordet@compaq.com">Simone Bordet</a>  * @version $Revision: 2787 $  */ @SuppressWarnings("unchecked") class Semaphore implements Sync {   // Constants -----------------------------------------------------   private static final long DEADLOCK_TIMEOUT = 5 * 60 * 1000;   // Attributes ----------------------------------------------------   private final static boolean m_debug = false;   private int m_users;   private int m_allowed;   private Map m_logMap;   // Static --------------------------------------------------------   // Constructors --------------------------------------------------   public Semaphore(int allowed) {     if (allowed < 1)       throw new IllegalArgumentException();     m_users = 0;     m_allowed = allowed;     m_logMap = new HashMap();   }   // Public --------------------------------------------------------   public int getUsers() {     synchronized (this) {       return m_users;     }   }   // Sync implementation ----------------------------------------------   public void acquire() throws InterruptedException {     synchronized (this) {       logAcquire();       // One user more called acquire, increase users       ++m_users;       boolean waitSuccessful = false;       while (m_allowed <= 0) {         waitSuccessful = waitImpl(this);         if (!waitSuccessful) {           // Dealock was detected, restore status, 'cause it's like a release()           // that will probably be never called           --m_users;           ++m_allowed;         }       }       --m_allowed;     }   }   public void release() {     synchronized (this) {       logRelease();       --m_users;       ++m_allowed;       notify();     }   }   // Object overrides ---------------------------------------------------   public String toString() {     return super.toString() + " - " + m_users;   }   // Package protected ---------------------------------------------   // Protected -----------------------------------------------------   protected boolean waitImpl(Object lock) throws InterruptedException {     // Wait (forever) until notified. To discover deadlocks,     // turn on debugging of this class     long start = System.currentTimeMillis();     lock.wait(DEADLOCK_TIMEOUT);     long end = System.currentTimeMillis();     if ((end - start) > (DEADLOCK_TIMEOUT - 1000)) {       logDeadlock();       return false;     }     return true;   }   protected void logAcquire() {     if (m_debug) {       // Check if thread is already mapped       Thread thread = Thread.currentThread();       // Create stack trace       StringWriter sw = new StringWriter();       new Exception().printStackTrace(new PrintWriter(sw));       String trace = sw.toString();       LinkedList list = (LinkedList) m_logMap.get(thread);       if (list != null) {         // Thread is mapped         // Add info         Info prevInfo = (Info) list.getLast();         Info info = new Info(thread, m_users, trace);         list.add(info);       } else {         // Thread is not mapped, create list and add counter         list = new LinkedList();         Info info = new Info(thread, m_users, trace);         list.add(info);         // Map thread         m_logMap.put(thread, list);       }     }   }   protected void logDeadlock() {     System.err.println();     System.err.println("DEADLOCK ON SEMAPHORE " + this);     if (m_debug) {       for (Iterator i = m_logMap.values().iterator(); i.hasNext();) {         LinkedList list = (LinkedList) i.next();         for (Iterator iter = list.iterator(); iter.hasNext();) {           System.err.println(iter.next());         }       }     }     System.err.println();   }   protected void logRelease() {     if (m_debug) {       // Find a matching thread and remove info for it       Thread thread = Thread.currentThread();       LinkedList list = (LinkedList) m_logMap.get(thread);       if (list != null) {         Info info = new Info(thread, 0, "");         if (!list.remove(info)) {           System.err.println("LOG INFO SIZE: " + list);           new IllegalStateException("BUG: semaphore log list does not contain required info")               .printStackTrace();         }         // If no info left, remove the mapping         int size = list.size();         if (size < 1) {           m_logMap.remove(thread);         }       } else {         throw new IllegalStateException("Semaphore log failed: release called without acquire");       }     }   }   // Private -------------------------------------------------------   // Inner classes -------------------------------------------------   private class Info {     private Info(Thread t, int i, String s) {       m_thread = t;       m_counter = i;       m_trace = s;     }     private Thread m_thread;     private int m_counter;     private String m_trace;     public boolean equals(Object o) {       Info other = (Info) o;       return m_thread == other.m_thread;     }     public String toString() {       return m_thread + " - " + m_counter + "\n" + m_trace;     }   } } /*  * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and  * individual contributors as indicated by the @authors tag. See the  * copyright.txt in the distribution for a full listing of individual  * contributors.  *   * This is free software; you can redistribute it and/or modify it under the  * terms of the GNU Lesser General Public License as published by the Free  * Software Foundation; either version 2.1 of the License, or (at your option)  * any later version.  *   * This software is distributed in the hope that it will be useful, but WITHOUT  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more  * details.  *   * You should have received a copy of the GNU Lesser General Public License  * along with this software; if not, write to the Free Software Foundation,  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF  * site: http://www.fsf.org.  */ /**  * Interface that gives synchronization semantic to implementors  *   * @see Semaphore  *   * @author <a href="mailto:simone.bordet@compaq.com">Simone Bordet</a>  * @version $Revision: 2787 $  */ interface Sync {   /**    * Acquires this sync    *     * @see #release    * @throws InterruptedException    */   void acquire() throws InterruptedException;   /**    * Releases this sync    *     * @see #acquire    */   void release(); } /*  * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and  * individual contributors as indicated by the @authors tag. See the  * copyright.txt in the distribution for a full listing of individual  * contributors.  *   * This is free software; you can redistribute it and/or modify it under the  * terms of the GNU Lesser General Public License as published by the Free  * Software Foundation; either version 2.1 of the License, or (at your option)  * any later version.  *   * This software is distributed in the hope that it will be useful, but WITHOUT  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more  * details.  *   * You should have received a copy of the GNU Lesser General Public License  * along with this software; if not, write to the Free Software Foundation,  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF  * site: http://www.fsf.org.  */ /**  * Interface that gives wait - notify primitives to implementors.  *   * @see Semaphore  *   * @author <a href="mailto:simone.bordet@compaq.com">Simone Bordet</a>  * @version $Revision: 2787 $  */ interface WaitSync extends Sync {   /**    * Pone in wait status this sync, until {@link #doNotify} is called to wake it    * up.    *     * @see #doNotify    * @throws InterruptedException    */   void doWait() throws InterruptedException;   /**    * Wakes up this sync that has been posed in wait status by a {@link #doWait}    * call. If this sync is not waiting, invoking this method should have no    * effect.    *     * @see #doWait    * @throws InterruptedException    */   void doNotify() throws InterruptedException; }