JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    
021    package org.dbreplicator.replication.synchronize;
022    
023    import java.io.BufferedWriter;
024    import org.dbreplicator.replication.RepConstants;
025    import org.dbreplicator.replication.RepException;
026    import java.io.*;
027    import java.sql.Timestamp;
028    import org.apache.log4j.Logger;
029    import org.dbreplicator.replication.DBHandler.AbstractDataBaseHandler;
030    import org.dbreplicator.replication.column.AbstractColumnObject;
031    import org.dbreplicator.replication.DBHandler.PostgreSQLHandler;
032    import org.dbreplicator.replication.column.BlobObject;
033    import org.dbreplicator.replication.column.ClobObject;
034    import java.sql.Connection;
035    import java.sql.SQLException;
036    import org.dbreplicator.replication.Utility;
037    import java.util.TreeMap;
038    import org.dbreplicator.replication.MetaDataInfo;
039    import org.dbreplicator.replication.RepTable;
040    import org.dbreplicator.replication.xml.XMLElement;
041    import java.util.ArrayList;
042    import java.sql.ResultSet;
043    import java.sql.PreparedStatement;
044    import java.sql.Statement;
045    import java.sql.ResultSetMetaData;
046    
047    public class AbstractSynchronize {
048      protected static Logger log = Logger.getLogger(AbstractSynchronize.class.getName());
049    
050      protected boolean setAutoCommitFlag = true;
051      protected boolean isFirstPass = false, isCurrentTableCyclic = false;
052      protected TreeMap allColumnsMap;
053      protected MetaDataInfo mdi;
054      protected BufferedWriter bw;
055      protected AbstractDataBaseHandler dbHandler;
056      protected RepTable repTable;
057      protected Connection connection;
058      protected TreeMap columnObjectTreeMap;
059      protected String remoteServerName, shadowTable, tableName,replicationType, transactionLogType;
060      public int insertCount = 0,updateCount = 0, deleteCount=0;
061      protected String[] primaryColumnNames, changedColumnNames, changedColumnValues, tableColumnNames;
062      protected Object conisderedId, lastShadowUid, lastSyncId; /* bjt - add last sync id */
063      protected XMLElement xmlElement_NULL;
064      protected AbstractColumnObject[] primaryKeyColumnsObject;
065      protected PreparedStatement PSForLastRecordSameRecordUpdatedExceptPK,PSToGetSyncidForSameOldPKEqualsNewPks;
066      protected String localName;
067      protected String remoteName;
068    
069      public AbstractSynchronize() {
070      }
071    
072      /**
073       * This method has been implemented to
074       * write the insert operation in transaction
075       * file.
076       */
077      protected void writeInsertOperationInTransactionLogFile(BufferedWriter bw,
078          String tableName, Object[] insertedRecrods, String replicationType,
079          String transactionLogType) throws RepException {
080    
081        if (Utility.createTransactionLogFile  &&  transactionLogType.equalsIgnoreCase("true")) {
082          try {
083    //       Timestamp dt = new Timestamp(System.currentTimeMillis());
084    //       bw.write("\n\n");
085    //       bw.write( (dt + "\n"));
086    
087            bw.write("\n");
088            bw.write("[" + replicationType + "]");
089            bw.write("[" + tableName + "]");
090            bw.write("[" + RepConstants.insert_operation + "]");
091            bw.write("[");
092            for (int i = 0; i < insertedRecrods.length; i++) {
093              if (i != 0) {
094                bw.write("," + insertedRecrods[i]);
095              }
096              bw.write("" + insertedRecrods[i]);
097            }
098            bw.write("]");
099            bw.flush();
100          }
101          catch (IOException ex1) {
102            RepConstants.writeERROR_FILE(ex1);
103            throw new RepException("REP351", new Object[] {ex1.getMessage()});
104          }
105          catch (Exception ex) {
106            RepConstants.writeERROR_FILE(ex);
107          }
108        }
109      }
110    
111      /**
112       * This method has been implemented to
113       * write the Delete operation in transaction
114       * file.
115       */
116    
117      protected void writeDeleteOperationInTransactionLogFile(BufferedWriter bw,
118          String tableName, String[] pkCols, Object[] deletedRecordPky,
119          String replicationType, String transactionLogType) throws RepException {
120        if (Utility.createTransactionLogFile && transactionLogType.equalsIgnoreCase("true")) {
121          try {
122    //      Timestamp dt = new Timestamp(System.currentTimeMillis());
123    //      bw.write("\n\n");
124    //      bw.write( (dt + "\n"));
125    
126            bw.write("\n");
127            bw.write("[" + replicationType + "]");
128            bw.write("[" + tableName + "]");
129            bw.write("[" + RepConstants.delete_operation + "]");
130            bw.write("[PRIMARY KEY VALUES  ");
131            for (int i = 0; i < deletedRecordPky.length; i++) {
132              if (i != 0) {
133                bw.write("," + pkCols[i] + " = " + deletedRecordPky[i]);
134              }
135              bw.write(pkCols[i] + " = " + deletedRecordPky[i]);
136            }
137            bw.write("]");
138            bw.flush();
139          }
140          catch (IOException ex1) {
141            RepConstants.writeERROR_FILE(ex1);
142            throw new RepException("REP351", new Object[] {ex1.getMessage()});
143          }
144          catch (Exception ex) {
145            RepConstants.writeERROR_FILE(ex);
146          }
147        }
148      }
149    
150      /**
151       * This method has been implemented to
152       * write the Update operation in transaction
153       * file.
154       */
155      protected void writeUpdateOperationInTransactionLogFile(BufferedWriter bw,
156          String tableName, String[] pkCols, Object[] oldPky,
157          String updatedColsName[], String[] updatedValues, String replicationType,
158          String transactionLogType) throws RepException {
159        if (Utility.createTransactionLogFile && transactionLogType.equalsIgnoreCase("true")) {
160          try {
161    //        Timestamp dt = new Timestamp(System.currentTimeMillis());
162    //        bw.write("\n\n");
163    //        bw.write( (dt + "\n"));
164    
165            bw.write("\n");
166            bw.write("[" + replicationType + "]");
167            bw.write("[" + tableName + "]");
168            bw.write("[" + RepConstants.update_operation + "]");
169            bw.write("[PRIMARY KEY VALUES  ");
170            for (int i = 0; i < oldPky.length; i++) {
171              if (i != 0) {
172                bw.write("," + pkCols[i] + " = " + oldPky[i]);
173              }
174              bw.write(pkCols[i] + " = " + oldPky[i]);
175            }
176            bw.write("]");
177    
178            bw.write("[ CHANGED COLUMNS ");
179            for (int i = 0; i < updatedColsName.length; i++) {
180              if (i != 0) {
181                bw.write("," + updatedColsName[i] + " = " + updatedValues[i]);
182              }
183              bw.write("" + updatedColsName[i] + " = " + updatedValues[i]);
184            }
185            bw.write("]");
186            bw.flush();
187          }
188          catch (IOException ex1) {
189            RepConstants.writeERROR_FILE(ex1);
190            throw new RepException("REP351", new Object[] {ex1.getMessage()});
191          }
192    
193          catch (Exception ex) {
194            RepConstants.writeERROR_FILE(ex);
195          }
196    
197        }
198      }
199    
200      public static void writeOperationInTransactionLogFile(BufferedWriter bw,
201          int insertOperation,
202          int updateOperation,
203          int deleteOperation,
204          String replicationType) throws Exception {
205         bw.write("\n\n");
206         bw.write(" Synchronization Type [" + replicationType + "]");
207         bw.write("\n\n");
208         bw.write(" Inserts [" + insertOperation + "]");
209         bw.write("\n");
210         bw.write(" Updates [" + updateOperation + "]");
211         bw.write("\n");
212         bw.write(" Deletes [" + deleteOperation + "]");
213         bw.write("\n\n");
214         bw.write(" [" + replicationType + " COMPLETED SUCCESSFULLY]");
215         bw.write("\n");
216    
217      }
218    
219      public static void writeDateInTransactionLogFile(BufferedWriter bw) throws
220          Exception {
221        if(Utility.createTransactionLogFile){
222          Timestamp dt = new Timestamp(System.currentTimeMillis());
223          bw.write("\n\n");
224          bw.write(" Operation Performed on Date: ");
225          bw.write( (dt + "\n"));
226        }
227      }
228    
229      public static void writeUnsuccessfullOperationInTransaction(BufferedWriter bw) {
230        try {
231          if(Utility.createTransactionLogFile) {
232            bw.write("\n\n");
233            bw.write(" [ SYNCHRONIZE OPERATION NOT COMPLETED SUCCESSFULLY]");
234            bw.write("\n");
235          }
236        }
237        catch (Exception ex1) {
238        }
239      }
240    
241      protected void loggingInsertOperation(String tableName,
242                                            Object[] insertedRecrods,
243                                            String replicationType) {
244    
245        log.debug("[" + replicationType + "]" + "[" + tableName + "]" + "[" +
246                  RepConstants.insert_operation + "]");
247        for (int i = 0; i < insertedRecrods.length; i++) {
248          log.debug("" + insertedRecrods[i]);
249        }
250      }
251    
252      protected void loggingDeleteOperation(String tableName, String[] pkCols,
253                                            Object[] deletedRecordPky,
254                                            String replicationType) {
255        log.debug("[" + replicationType + "]" + "[" + tableName + "]" + "[" +
256                  RepConstants.delete_operation + "]" + "[PRIMARY KEY VALUES ");
257        for (int i = 0; i < deletedRecordPky.length; i++) {
258          log.debug(pkCols[i] + " = " + deletedRecordPky[i]);
259    
260        }
261      }
262    
263      protected void loggingUpdateOperation(String tableName, String[] pkCols,
264                                            Object[] oldPky, String updatedColsName[],
265                                            String[] updatedValues,
266                                            String replicationType) {
267        log.debug("[" + replicationType + "]" + "[" + tableName + "]" + "[" +RepConstants.update_operation + "]" + "PRIMARY KEY VALUES ");
268        for (int i = 0; i < oldPky.length; i++) {
269          log.debug(pkCols[i] + " = " + oldPky[i]);
270        }
271        log.debug(" CHANGED COLUMNS ");
272        for (int i = 0; i < updatedColsName.length; i++) {
273          log.debug("" + updatedColsName[i] + " = " + updatedValues[i]);
274        }
275      }
276    
277      /**
278       * This has been implemented to handle the problem
279       * related to CLOB and BLOB data type. Postgre
280       * do not to insert LOB object in autocommit mode.
281       * @param dbHandler   AbstractDataBaseHandler
282       * @param aco                 AbstractColumnObject
283       * @return boolean
284       */
285      public boolean checkAutocommit(AbstractDataBaseHandler dbHandler,
286                                     AbstractColumnObject aco) {
287        if (dbHandler instanceof PostgreSQLHandler &&
288            (aco instanceof ClobObject || aco instanceof BlobObject)) {
289          return false;
290        }
291        return true;
292      }
293    
294      public void setAutocomitTrueAndCommitRecord(Connection conn) throws
295          SQLException {
296        conn.commit();
297        conn.setAutoCommit(true);
298      }
299    
300      //////////////////////////////////
301    
302    
303      /**
304     * It return false if record is not updated else return true
305     */
306    protected boolean isPrimaryKeyUpdated(Object[] priamryColValues,ArrayList updatedPrimaryKey) throws SQLException, RepException {
307      boolean isPrimaryKeyUpdated = false;
308      if(updatedPrimaryKey.size()==0) {
309         return isPrimaryKeyUpdated;
310      }
311    
312      for (int i = 0; i < updatedPrimaryKey.size(); i++) {
313        Object[] updatedPk = (Object[]) updatedPrimaryKey.get(i);
314        for (int j = 0; j < updatedPk.length; j++) {
315          if (updatedPk[j].equals(priamryColValues[j])) {
316            isPrimaryKeyUpdated=true;
317          }
318        }
319        if(isPrimaryKeyUpdated) {
320          updatedPrimaryKey.remove(i);
321          break;
322        }
323      }
324      return isPrimaryKeyUpdated;
325    }
326    
327    /*
328     SELECT  *  FROM  Rep_Shadow_PUSHTABLE WHERE  (Rep_sync_id = (SELECT  MAX(rep_sync_id)
329     FROM  Rep_Shadow_PUSHTABLE WHERE   rollno = 11 AND (rep_old_rollNo = 11) AND rep_Status='A'))
330    
331     This method is used if same record is updated many times(except primary column i.e primary
332     column is not updated) This leads to improving performance(As suggested by parveen sir) And
333     to avoid traversing of the syncIds which couldnt add in the viewId we use
334     addSyncidToViewIdForSameOldPKEqualsNewPks(String primaryCols[],Object[] primaryColValues,
335     Object MaxUID) method.
336     */
337    protected Object getUIDRecordUpdatedExceptPK(String
338        primaryCols[], Object[] primaryColValues, Object[] oldPrimaryColValues) throws SQLException, RepException {
339      ResultSet rs = null;
340      try {
341        //check if old primary keys and new primary keys are same or not.
342        // If Yes,then return else coontinue
343        for (int i = 0; i < primaryColValues.length; i++) {
344          if (!primaryColValues[i].toString().equals(oldPrimaryColValues[i].toString()))
345            return null;
346        }
347        PSForLastRecordSameRecordUpdatedExceptPK = makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(primaryColumnNames);
348        int indexPrimaryColumn = 0;
349        for (indexPrimaryColumn = 0; indexPrimaryColumn < primaryCols.length;indexPrimaryColumn++) {
350          PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +1, primaryColValues[indexPrimaryColumn]);
351        }
352        for (int j = 0; j < primaryCols.length; j++) {
353          PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +j + 1, oldPrimaryColValues[j]);
354        }
355    //    long startTime =System.currentTimeMillis();
356        rs = PSForLastRecordSameRecordUpdatedExceptPK.executeQuery();
357    //System.out.println(" SSSSSSSSSS TIME TAKEN IN UID QUERY EXECUTION :: "+(System.currentTimeMillis()-startTime));
358        rs.next();
359        Object Uid = rs.getObject(1);
360        //Add the SyncIds to viewIds Which we have considered indirectly through above Query
361    //    addSyncidToViewIdForSameOldPKEqualsNewPks(primaryColValues, Uid);
362        return Uid;
363      }
364      finally {
365        if (rs != null) {
366          try {
367            rs.close();
368          }
369          catch (SQLException ex2) { //ignore SQLException
370          }
371        }
372        if (PSForLastRecordSameRecordUpdatedExceptPK != null) {
373          try {
374            PSForLastRecordSameRecordUpdatedExceptPK.close();
375          }
376          catch (SQLException ex2) { //ignore SQLException
377          }
378    
379        }
380      }
381    }
382    
383    /*
384        Query to get syncIds :
385        " SELECT REP_SYNC_ID FROM SHADOW_TABLE WHERE
386        primarycolumns=rep_old_primarycolumns  and pk=?; "
387        SyncID which we get through this query if not
388        already present in viewIDs and less than the MaxUId
389        then add into viewID. MaxUId is the SyncId upto which
390        we have considered shadow table record for that particular
391        primary key
392     */
393    
394    protected void addSyncidToViewIdForSameOldPKEqualsNewPks(Object[]
395        primaryColValues, Object MaxUID,ArrayList viewedIds) throws SQLException, RepException {
396      ResultSet rs = null;
397      try {
398        PSToGetSyncidForSameOldPKEqualsNewPks = makeQueryToGetViewId();
399        for (int i = 0; i < primaryColValues.length; i++)
400          PSToGetSyncidForSameOldPKEqualsNewPks.setObject(i + 1,primaryColValues[i]);
401        rs = PSToGetSyncidForSameOldPKEqualsNewPks.executeQuery();
402        while (rs.next()) {
403          Object Uid = rs.getObject(1);
404          if (!viewedIds.contains(Uid) &&
405              ( (Number) Uid).longValue() < ( (Number) MaxUID).longValue()) {
406            viewedIds.add(Uid);
407          }
408        }
409      }
410      finally {
411        if (rs != null) {
412          try {
413            rs.close();
414          }
415          catch (SQLException ex2) { //ignore SQLException
416          }
417        }
418      }
419    }
420    
421    protected PreparedStatement makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(String[] primaryColsName) throws SQLException, RepException {
422      StringBuffer query = new StringBuffer();
423      query.append("SELECT ").append(RepConstants.shadow_sync_id1).append("  FROM  ")
424          .append(shadowTable).append(" WHERE  (").append(RepConstants.shadow_sync_id1)
425          .append(" = (SELECT  MAX(").append(RepConstants.shadow_sync_id1).append(") FROM ")
426          .append(shadowTable).append(" WHERE ");
427      for (int i = 0; i < primaryColsName.length; i++) {
428        if (i != 0) {
429          query.append(" AND ");
430        }
431        query.append(primaryColsName[i]).append("= ? ");
432      }
433      query.append(" AND ");
434      for (int i = 0; i < primaryColsName.length; i++) {
435        if (i != 0) {
436          query.append(" AND ");
437        }
438        query.append(" REP_OLD_").append(primaryColsName[i]).append("= ? ");
439      }
440      query.append(" AND ").append(RepConstants.shadow_status4)
441          .append(" = '").append(RepConstants.afterUpdate).append(" ')) ");
442      return connection.prepareStatement(query.toString());
443    }
444    
445    protected PreparedStatement makeQueryToGetViewId() throws SQLException, RepException {
446      StringBuffer query = new StringBuffer();
447      query.append("SELECT ").append(RepConstants.shadow_sync_id1)
448          .append("  FROM  ").append(shadowTable).append(" WHERE ");
449      for (int i = 0; i < primaryColumnNames.length; i++) {
450        if (i != 0)
451        query.append(" AND ");
452        query.append(primaryColumnNames[i]).append("= ").append(" REP_OLD_").append(primaryColumnNames[i]);
453      }
454      query.append(" AND ");
455      for (int i = 0; i < primaryColumnNames.length; i++) {
456        if (i != 0)
457        query.append(" AND ");
458        query.append(primaryColumnNames[i]).append("= ?");
459      }
460      return connection.prepareStatement(query.toString());
461    }
462    
463    
464    }





























































Powered by Drupal - Theme by Danger4k