001 /** 002 * Copyright (c) 2003 Daffodil Software Ltd all rights reserved, 003 * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved. 004 * This program is free software; you can redistribute it and/or modify 005 * it under the terms of version 2 of the GNU General Public License as 006 * published by the Free Software Foundation. 007 * There are special exceptions to the terms and conditions of the GPL 008 * as it is applied to this software. See the GNU General Public License for more details. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU General Public License for more details. 014 * 015 * You should have received a copy of the GNU General Public License 016 * along with this program; if not, write to the Free Software 017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 */ 019 020 package org.dbreplicator.replication.schedule; 021 022 import java.sql.*; 023 import org.dbreplicator.replication.*; 024 import org.dbreplicator.replication.DBHandler.AbstractDataBaseHandler; 025 import org.apache.log4j.Logger; 026 import java.util.HashMap; 027 028 /** 029 * <p>The actual synchronizer</p> 030 * <p>This class implements Runnable and hence has a Run method overridden 031 * which starts a thread and performs the required replication operation</p> 032 */ 033 034 public class ScheduledSynchronizer implements Runnable { 035 private _ReplicationServer repServer; 036 private Connection connection; 037 private Statement stmt = null; 038 private String subName = null, 039 scheduleName = null, 040 scheduleType = null, 041 publicationServerName = null, 042 publicationPortNo = null, 043 recurrenceType = null, 044 replicationType = null; 045 private long scheduleTime = 0; 046 private int counter = 0; 047 private _Subscription sub =null; 048 private AbstractDataBaseHandler dbHandler =null; 049 protected boolean stopSchedule=false; 050 private HashMap threadMap =null; 051 protected static Logger log =Logger.getLogger(ScheduledSynchronizer.class.getName()); 052 053 /** 054 *This is main class responible for performing the required replication operation at right scheduled time. 055 * and updating the scheduled time for next schedule in Rep_ScheduleTable. 056 * @param repServer0 _ReplicationServer 057 * @param dbh AbstractDatabaseHandler 058 * @param subName0 String 059 * @param scheduleName0 String 060 * @param scheduleType0 String 061 * @param remoteServerName String 062 * @param remotePortNo String 063 * @param recurrenceType0 String 064 * @param repType String 065 * @param schTime long 066 * @param counter0 int 067 * @param threadMap0 HashMap 068 * @throws RepException 069 */ 070 071 public ScheduledSynchronizer(_ReplicationServer repServer0,AbstractDataBaseHandler dbh, String subName0, 072 String scheduleName0, String scheduleType0, 073 String remoteServerName, 074 String remotePortNo, String recurrenceType0, 075 String repType, long schTime, int counter0, HashMap threadMap0) throws 076 RepException { 077 subName = subName0; 078 scheduleName = scheduleName0; 079 scheduleType = scheduleType0; 080 publicationServerName = remoteServerName; 081 publicationPortNo = remotePortNo; 082 recurrenceType = recurrenceType0; 083 replicationType = repType; 084 scheduleTime = schTime; 085 counter = counter0; 086 repServer = repServer0; 087 connection = repServer.getDefaultConnection(); 088 try { 089 stmt = connection.createStatement(); 090 } 091 catch (SQLException ex) { 092 } 093 sub = repServer.getSubscription(subName); 094 sub.setRemoteServerUrl(publicationServerName); 095 sub.setRemoteServerPortNo(Integer.parseInt(publicationPortNo)); 096 dbHandler = dbh; //Utility.getDatabaseHandler(sub.getConnectionPool(), con); 097 threadMap=threadMap0; 098 099 } 100 101 /** 102 * This method performs the required replication operation between publisher and subscriber when the schedule time is reached 103 * It updates schedule time in the Rep_ScheduleTable and then enters into wait till the 104 * time for the next operation. 105 */ 106 107 public void run() { 108 try { 109 //if Schedule Type is realtime 110 if (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_realTime)) { 111 Utility.createTransactionLogFile=false; 112 realTimeSchedule(); 113 } 114 //if Schedule is non realtime 115 else if (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_nonRealTime)) { 116 long startTime = scheduleTime; 117 int updatedQuery = 0; 118 Timestamp currentTime = new Timestamp(System.currentTimeMillis()); 119 long threadSleepTime = startTime - (System.currentTimeMillis()); 120 boolean flag = true; 121 while (flag) { 122 //It means the start time is more then current time,sleep 123 //otherwise depending on counter thread will be terminated or run 124 if (startTime > currentTime.getTime()) { 125 Thread.sleep(threadSleepTime); 126 startTime = System.currentTimeMillis(); 127 } 128 //if schedule time has reached or past away 129 if (startTime <= System.currentTimeMillis()) { 130 //if schedule is to repeated more than once 131 if (counter > 0) { 132 //depending on schedule type and counter update the next schedule time untill it get more or equal than current time. 133 do { 134 Timestamp newTime = new Timestamp(startTime); 135 if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_yearType)) { 136 newTime = new Timestamp( (newTime.getYear() + counter), 137 newTime.getMonth(), newTime.getDate(), 138 newTime.getHours(), 139 newTime.getMinutes(), 140 newTime.getSeconds(), 141 newTime.getNanos()); 142 } 143 else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_monthType)) { 144 newTime = new Timestamp(newTime.getYear(), 145 (newTime.getMonth() + counter), 146 newTime.getDate(), newTime.getHours(), 147 newTime.getMinutes(), 148 newTime.getSeconds(), 149 newTime.getNanos()); 150 } 151 else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_dayType)) { 152 newTime = new Timestamp(newTime.getYear(), newTime.getMonth(), 153 (newTime.getDate() + counter), 154 newTime.getHours(), 155 newTime.getMinutes(), 156 newTime.getSeconds(), 157 newTime.getNanos()); 158 } 159 else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_hourType)) { 160 newTime = new Timestamp(newTime.getYear(), newTime.getMonth(), 161 newTime.getDate(), 162 (newTime.getHours() + counter), 163 newTime.getMinutes(), 164 newTime.getSeconds(), 165 newTime.getNanos()); 166 } 167 else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_minuteType)) { 168 newTime = new Timestamp(newTime.getYear(), newTime.getMonth(), 169 newTime.getDate(), 170 newTime.getHours(), 171 (newTime.getMinutes()+ counter), 172 newTime.getSeconds(), 173 newTime.getNanos()); 174 } 175 176 startTime = newTime.getTime(); 177 } 178 while ( (startTime <= System.currentTimeMillis())); 179 threadSleepTime = startTime - System.currentTimeMillis(); 180 StringBuffer query = new StringBuffer(); 181 query.append("UPDATE ").append(dbHandler.getScheduleTableName()). 182 append(" SET ") 183 .append(RepConstants.schedule_time) 184 .append(" = ") 185 .append(startTime).append(" WHERE ") 186 .append(RepConstants.schedule_Name) 187 .append(" = '") 188 .append(scheduleName).append("' AND ") 189 .append(RepConstants.subscription_subName1).append(" = '") 190 .append(subName).append("'"); 191 updatedQuery = stmt.executeUpdate(query.toString()); 192 log.debug(" UPDATE SCHEDULE TABLE QUERY :: "+query.toString()); 193 //if schedule is already dropped,no updation would have been done,so stopping thread in this case. 194 if (updatedQuery == 0) { 195 flag = false; 196 break; 197 } 198 } 199 //if schedule is for once only and its time has past away,thread will be stopped. 200 else if (counter == 0 && startTime < System.currentTimeMillis()) { 201 break; 202 } 203 //performs the replication operation now 204 if (replicationType.equalsIgnoreCase(RepConstants.replication_snapshotType)) { 205 log.debug(" CALLING SNAPSHOT AT "+new Timestamp(System.currentTimeMillis())); 206 // System.out.println(" GOING TO TAKE SNAPSHOT IN SCHEDULING "); 207 sub.getSnapShot(); 208 // System.out.println(" SNAPSHOT TAKEN SUCCESSFULLY "); 209 log.debug(" SNAPSHOT DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 210 } 211 else if (replicationType.equalsIgnoreCase(RepConstants.replication_synchronizeType)) { 212 log.debug(" CALLING SYNCHRONIZATION AT "+new Timestamp(System.currentTimeMillis())); 213 sub.synchronize(); 214 log.debug("SYNCHRONIZATION DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 215 } 216 else if (replicationType.equalsIgnoreCase(RepConstants.replication_pullType)) { 217 log.debug(" CALLING PULL AT "+new Timestamp(System.currentTimeMillis())); 218 sub.pull(); 219 log.debug(" PULL DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 220 } 221 else if (replicationType.equalsIgnoreCase(RepConstants.replication_pushType)) { 222 log.debug(" CALLING PUSH AT "+new Timestamp(System.currentTimeMillis())); 223 sub.push(); 224 log.debug(" PUSH DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 225 } 226 if (threadSleepTime == 0 || counter == 0||stopSchedule==true){ 227 stopScheduledThread(); 228 // flag = false; 229 // break; 230 } 231 } 232 } 233 // if(stopSchedule){ 234 // stopScheduledThread(); 235 // } 236 stmt.close(); 237 } 238 } 239 catch (SQLException ex2) { 240 log.error(ex2,ex2); 241 RepConstants.writeERROR_FILE(ex2); 242 // throw new RuntimeException(ex2); 243 } 244 catch (InterruptedException ex3) { 245 log.error(ex3,ex3); 246 RepConstants.writeERROR_FILE(ex3); 247 // throw new RuntimeException(ex3); 248 } 249 catch (Exception ex4) { 250 log.error(ex4,ex4); 251 RepConstants.writeERROR_FILE(ex4); 252 // throw new RuntimeException(ex4); 253 } 254 } 255 256 private void realTimeSchedule() { 257 while (true) { 258 try { 259 //performs the replication operation now 260 if (replicationType.equalsIgnoreCase(RepConstants.replication_snapshotType)) { 261 log.debug(" CALLING SNAPSHOT AT "+new Timestamp(System.currentTimeMillis())); 262 sub.getSnapShot(); 263 log.debug(" SNAPSHOT DONE SUCCESSFULYY AT "+new Timestamp(System.currentTimeMillis())); 264 } 265 else if (replicationType.equalsIgnoreCase(RepConstants.replication_synchronizeType)) { 266 log.debug("CALLING SYNCHRONIZATION AT "+new Timestamp(System.currentTimeMillis())); 267 sub.synchronize(); 268 log.debug(" SYNCHRONIZATION DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 269 } 270 else if (replicationType.equalsIgnoreCase(RepConstants.replication_pullType)) { 271 log.debug("CALLING PULL AT "+new Timestamp(System.currentTimeMillis())); 272 sub.pull(); 273 log.debug(" PULL DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 274 } 275 else if (replicationType.equalsIgnoreCase(RepConstants.replication_pushType)) { 276 log.debug(" CALLING PUSH AT "+new Timestamp(System.currentTimeMillis())); 277 sub.push(); 278 log.debug(" PUSH DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis())); 279 } 280 } 281 catch (Exception ex1) { 282 log.error(ex1,ex1); 283 RepConstants.writeERROR_FILE(ex1); 284 // throw new RuntimeException(ex1); 285 } 286 if(!stopSchedule){ 287 try { 288 Thread.sleep(10000); 289 } 290 catch (InterruptedException ex) { 291 log.error(ex,ex); 292 RepConstants.writeERROR_FILE(ex); 293 } 294 }else{ 295 stopScheduledThread(); 296 } 297 } 298 } 299 private void stopScheduledThread(){ 300 Thread scheduleThread = (Thread) threadMap.get(scheduleName); 301 if(scheduleThread!=null){ 302 Thread thread = (Thread) threadMap.remove(scheduleName); 303 thread.stop(); 304 } 305 } 306 }

