JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    package org.dbreplicator.replication.schedule;
021    
022    import java.sql.*;
023    import org.dbreplicator.replication.*;
024    import org.dbreplicator.replication.DBHandler.AbstractDataBaseHandler;
025    import org.apache.log4j.Logger;
026    import java.util.HashMap;
027    
028    /**
029     * <p>The actual synchronizer</p>
030     * <p>This class implements Runnable and hence has a Run method overridden
031     * which starts a thread and performs the required replication operation</p>
032     */
033    
034    public class ScheduledSynchronizer implements Runnable {
035      private  _ReplicationServer repServer;
036      private Connection connection;
037      private Statement stmt = null;
038      private String subName = null,
039             scheduleName = null,
040             scheduleType = null,
041             publicationServerName = null,
042             publicationPortNo = null,
043             recurrenceType = null,
044             replicationType = null;
045      private long scheduleTime = 0;
046      private int counter = 0;
047      private _Subscription sub =null;
048      private AbstractDataBaseHandler dbHandler =null;
049      protected boolean stopSchedule=false;
050      private HashMap threadMap =null;
051      protected static Logger log =Logger.getLogger(ScheduledSynchronizer.class.getName());
052    
053      /**
054       *This is main class responible for performing the required replication operation at right scheduled time.
055       * and updating the scheduled time for next schedule in Rep_ScheduleTable.
056       * @param repServer0                  _ReplicationServer
057       * @param dbh                                 AbstractDatabaseHandler
058       * @param subName0                    String
059       * @param scheduleName0               String
060       * @param scheduleType0               String
061       * @param remoteServerName    String
062       * @param remotePortNo                String
063       * @param recurrenceType0             String
064       * @param repType                     String
065       * @param schTime                     long
066       * @param counter0                    int
067       * @param threadMap0                  HashMap
068       * @throws RepException
069       */
070    
071      public ScheduledSynchronizer(_ReplicationServer repServer0,AbstractDataBaseHandler dbh, String subName0,
072                                   String scheduleName0, String scheduleType0,
073                                   String remoteServerName,
074                                   String remotePortNo, String recurrenceType0,
075                                   String repType, long schTime, int counter0, HashMap threadMap0) throws
076          RepException {
077          subName = subName0;
078          scheduleName = scheduleName0;
079          scheduleType = scheduleType0;
080          publicationServerName = remoteServerName;
081          publicationPortNo = remotePortNo;
082          recurrenceType = recurrenceType0;
083          replicationType = repType;
084          scheduleTime = schTime;
085          counter = counter0;
086          repServer = repServer0;
087          connection = repServer.getDefaultConnection();
088          try {
089            stmt = connection.createStatement();
090          }
091          catch (SQLException ex) {
092          }
093          sub = repServer.getSubscription(subName);
094          sub.setRemoteServerUrl(publicationServerName);
095          sub.setRemoteServerPortNo(Integer.parseInt(publicationPortNo));
096          dbHandler = dbh; //Utility.getDatabaseHandler(sub.getConnectionPool(), con);
097          threadMap=threadMap0;
098    
099      }
100    
101      /**
102       * This method performs the required replication operation between publisher and subscriber when the schedule time is reached
103       * It updates schedule time in the Rep_ScheduleTable and then enters into wait till the
104       * time for the next operation.
105       */
106    
107      public void run() {
108        try {
109          //if Schedule Type is realtime
110          if (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_realTime)) {
111            Utility.createTransactionLogFile=false;
112            realTimeSchedule();
113          }
114          //if Schedule is non realtime
115          else if (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_nonRealTime)) {
116            long startTime = scheduleTime;
117            int updatedQuery = 0;
118            Timestamp currentTime = new Timestamp(System.currentTimeMillis());
119            long threadSleepTime = startTime - (System.currentTimeMillis());
120            boolean flag = true;
121            while (flag) {
122              //It means the start time is more then current time,sleep
123              //otherwise depending on counter thread will be terminated or run
124              if (startTime > currentTime.getTime()) {
125                Thread.sleep(threadSleepTime);
126                startTime = System.currentTimeMillis();
127              }
128              //if schedule time has reached or past away
129              if (startTime <= System.currentTimeMillis()) {
130                //if schedule is to repeated more than once
131                if (counter > 0) {
132                  //depending on schedule type and counter update the next schedule time untill it get more or equal than current time.
133                  do {
134                   Timestamp newTime = new Timestamp(startTime);
135                    if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_yearType)) {
136                      newTime = new Timestamp( (newTime.getYear() + counter),
137                                              newTime.getMonth(), newTime.getDate(),
138                                              newTime.getHours(),
139                                              newTime.getMinutes(),
140                                              newTime.getSeconds(),
141                                              newTime.getNanos());
142                    }
143                    else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_monthType)) {
144                      newTime = new Timestamp(newTime.getYear(),
145                                              (newTime.getMonth() + counter),
146                                              newTime.getDate(), newTime.getHours(),
147                                              newTime.getMinutes(),
148                                              newTime.getSeconds(),
149                                              newTime.getNanos());
150                    }
151                    else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_dayType)) {
152                      newTime = new Timestamp(newTime.getYear(), newTime.getMonth(),
153                                              (newTime.getDate() + counter),
154                                              newTime.getHours(),
155                                              newTime.getMinutes(),
156                                              newTime.getSeconds(),
157                                              newTime.getNanos());
158                    }
159                    else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_hourType)) {
160                      newTime = new Timestamp(newTime.getYear(), newTime.getMonth(),
161                                              newTime.getDate(),
162                                              (newTime.getHours() + counter),
163                                              newTime.getMinutes(),
164                                              newTime.getSeconds(),
165                                              newTime.getNanos());
166                    }
167                    else if (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_minuteType)) {
168                     newTime = new Timestamp(newTime.getYear(), newTime.getMonth(),
169                                             newTime.getDate(),
170                                             newTime.getHours(),
171                                             (newTime.getMinutes()+ counter),
172                                             newTime.getSeconds(),
173                                             newTime.getNanos());
174                   }
175    
176                    startTime = newTime.getTime();
177                  }
178                  while ( (startTime <= System.currentTimeMillis()));
179                  threadSleepTime = startTime - System.currentTimeMillis();
180                  StringBuffer query = new StringBuffer();
181                  query.append("UPDATE ").append(dbHandler.getScheduleTableName()).
182                      append(" SET ")
183                      .append(RepConstants.schedule_time)
184                      .append(" = ")
185                      .append(startTime).append(" WHERE ")
186                      .append(RepConstants.schedule_Name)
187                      .append(" = '")
188                      .append(scheduleName).append("' AND ")
189                      .append(RepConstants.subscription_subName1).append(" = '")
190                      .append(subName).append("'");
191                  updatedQuery = stmt.executeUpdate(query.toString());
192                  log.debug(" UPDATE SCHEDULE TABLE QUERY ::  "+query.toString());
193                  //if schedule is already dropped,no updation would have been done,so stopping thread in this case.
194                  if (updatedQuery == 0) {
195                    flag = false;
196                    break;
197                  }
198                }
199                //if schedule is for once only and its time has past away,thread will be stopped.
200                else if (counter == 0 && startTime < System.currentTimeMillis()) {
201                  break;
202                }
203                //performs the replication operation now
204             if (replicationType.equalsIgnoreCase(RepConstants.replication_snapshotType)) {
205               log.debug(" CALLING SNAPSHOT AT "+new Timestamp(System.currentTimeMillis()));
206    //           System.out.println(" GOING TO TAKE SNAPSHOT IN  SCHEDULING ");
207               sub.getSnapShot();
208    //           System.out.println(" SNAPSHOT TAKEN SUCCESSFULLY ");
209               log.debug(" SNAPSHOT DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis()));
210             }
211             else if (replicationType.equalsIgnoreCase(RepConstants.replication_synchronizeType)) {
212               log.debug(" CALLING SYNCHRONIZATION AT "+new Timestamp(System.currentTimeMillis()));
213               sub.synchronize();
214               log.debug("SYNCHRONIZATION DONE SUCCESSFULLY AT  "+new Timestamp(System.currentTimeMillis()));
215             }
216             else if (replicationType.equalsIgnoreCase(RepConstants.replication_pullType)) {
217               log.debug(" CALLING PULL AT  "+new Timestamp(System.currentTimeMillis()));
218               sub.pull();
219               log.debug(" PULL DONE SUCCESSFULLY AT  "+new Timestamp(System.currentTimeMillis()));
220             }
221             else if (replicationType.equalsIgnoreCase(RepConstants.replication_pushType)) {
222               log.debug(" CALLING PUSH AT  "+new Timestamp(System.currentTimeMillis()));
223               sub.push();
224               log.debug(" PUSH DONE SUCCESSFULLY AT  "+new Timestamp(System.currentTimeMillis()));
225             }
226                if (threadSleepTime == 0 || counter == 0||stopSchedule==true){
227                  stopScheduledThread();
228    //              flag = false;
229    //              break;
230                }
231              }
232            }
233    //        if(stopSchedule){
234    //      stopScheduledThread();
235    //    }
236            stmt.close();
237          }
238        }
239        catch (SQLException ex2) {
240          log.error(ex2,ex2);
241          RepConstants.writeERROR_FILE(ex2);
242    //       throw new RuntimeException(ex2);
243        }
244        catch (InterruptedException ex3) {
245          log.error(ex3,ex3);
246          RepConstants.writeERROR_FILE(ex3);
247    //      throw new RuntimeException(ex3);
248        }
249        catch (Exception ex4) {
250          log.error(ex4,ex4);
251          RepConstants.writeERROR_FILE(ex4);
252    //      throw new RuntimeException(ex4);
253        }
254      }
255    
256      private void realTimeSchedule() {
257        while (true) {
258          try {
259            //performs the replication operation now
260            if (replicationType.equalsIgnoreCase(RepConstants.replication_snapshotType)) {
261              log.debug(" CALLING SNAPSHOT AT  "+new Timestamp(System.currentTimeMillis()));
262              sub.getSnapShot();
263              log.debug(" SNAPSHOT DONE SUCCESSFULYY AT  "+new Timestamp(System.currentTimeMillis()));
264            }
265            else if (replicationType.equalsIgnoreCase(RepConstants.replication_synchronizeType)) {
266             log.debug("CALLING SYNCHRONIZATION AT  "+new Timestamp(System.currentTimeMillis()));
267              sub.synchronize();
268            log.debug(" SYNCHRONIZATION DONE SUCCESSFULLY AT "+new Timestamp(System.currentTimeMillis()));
269            }
270            else if (replicationType.equalsIgnoreCase(RepConstants.replication_pullType)) {
271              log.debug("CALLING PULL AT  "+new Timestamp(System.currentTimeMillis()));
272              sub.pull();
273              log.debug(" PULL DONE SUCCESSFULLY AT  "+new Timestamp(System.currentTimeMillis()));
274            }
275            else if (replicationType.equalsIgnoreCase(RepConstants.replication_pushType)) {
276              log.debug(" CALLING PUSH AT  "+new Timestamp(System.currentTimeMillis()));
277              sub.push();
278              log.debug(" PUSH DONE SUCCESSFULLY AT  "+new Timestamp(System.currentTimeMillis()));
279            }
280          }
281          catch (Exception ex1) {
282            log.error(ex1,ex1);
283            RepConstants.writeERROR_FILE(ex1);
284    //      throw new RuntimeException(ex1);
285          }
286          if(!stopSchedule){
287          try {
288            Thread.sleep(10000);
289          }
290          catch (InterruptedException ex) {
291            log.error(ex,ex);
292          RepConstants.writeERROR_FILE(ex);
293          }
294        }else{
295          stopScheduledThread();
296        }
297        }
298      }
299      private void stopScheduledThread(){
300        Thread scheduleThread = (Thread) threadMap.get(scheduleName);
301          if(scheduleThread!=null){
302            Thread thread = (Thread) threadMap.remove(scheduleName);
303            thread.stop();
304          }
305        }
306    }





























































Powered by Drupal - Theme by Danger4k