JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    package org.dbreplicator.replication.xml;
021    
022    import java.io.*;
023    import java.net.*;
024    import java.sql.*;
025    import java.util.*;
026    
027    import org.dbreplicator.replication.*;
028    import org.dbreplicator.replication.DBHandler.*;
029    import org.dbreplicator.replication.zip.*;
030    import org.apache.log4j.Logger;
031    import java.math.BigDecimal;
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    
035    /**
036     * This class was implemented for handling all the issues that will occure at
037     * the time of creating XML file for synchronization purpose.
038     */
039    
040    public class SyncXMLCreator {
041    
042      String local_pub_sub_name, shadowTable, remoteServerName;
043      ConnectionPool connectionPool;
044      AbstractDataBaseHandler dbDataTypeHandler;
045    //  private ArrayList viewedIds;
046      private HashMap viewedIds;
047      private String[] primaryColNames;
048      private int[] primaryColumnTypes;
049      private PreparedStatement primaryPreparedStatement,primaryPreparedStatementBackwardTraversing ,commonPreparedStatement,
050          commonPreparedStatementForBackwardTraversing, PSForLastRecordSameRecordUpdatedExceptPK,
051          PSToGetSyncidForSameOldPKEqualsNewPks;
052      private Statement commonStatement=null;
053      public ServerSocket serverSocket = null;
054      RepTable repTable;
055      String filterClause;
056      String tableName;
057      SchemaQualifiedName sname;
058      private String[] parameters;
059      private boolean USE_getLastRecord = true;
060      protected static Logger log = Logger.getLogger(SyncXMLCreator.class.getName());
061      int countWriteDelementelement=0,countWriteupdateElement=0,countWriteInsertElement=0,commPreCount=0,primaryCount=0;
062      /**
063       * Creates an XML file for synchronizing data between server and client i.e. Subscriber and publisher.
064       * @param pub_sub_name0
065       * @param connectionPool0
066       * @param dbDataTypeHandler0
067       */
068      public SyncXMLCreator(String pub_sub_name0, ConnectionPool connectionPool0, AbstractDataBaseHandler dbDataTypeHandler0) {
069        local_pub_sub_name = pub_sub_name0;
070        connectionPool = connectionPool0;
071        dbDataTypeHandler = dbDataTypeHandler0;
072      }
073    
074      /**
075          if operation is I , insert the record
076          if operation is U check for its last record and also make extra tag for primary key of the initial record
077          and also make attribute for columns which are updated.
078          if operation id D then mark it for deletion.
079    
080          case1.
081          =====
082           If a user isnert a new record then to delete it. It does not considered
083           and we do not write it in XML file.
084    
085           If a new record is inserted and after that it is updated in that case it
086           considered as a new inserted record not a updated record
087       */
088      public Object[] createXMLFile(String xmlFileURL, String zipFileURL,
089                                     String xmlFileName, String remote_Pub_Sub_Name,
090                                     ArrayList pubRepTables,
091                                     String clientServerName,
092                                     int noOfTables, boolean DeleteXML,
093                                     String local_pub_subName,boolean isSchemaSupported, _FileUpload fileUpload,String localMachineAddress,String remoteMachineAddress) throws RepException {
094            Connection pub_sub_Connection = null;
095        ResultSet rows = null;
096        try {
097          ArrayList EnCodedcols;
098          FileOutputStream fos = new FileOutputStream(xmlFileURL);
099          OutputStreamWriter os = new OutputStreamWriter(fos);
100          BufferedWriter bw = new BufferedWriter(os);
101          pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
102          XMLWriter xmlWriter = new XMLWriter(bw, dbDataTypeHandler,pub_sub_Connection);
103          bw.write("<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>");
104          bw.write("<root>");
105          ArrayList usedActualTables = new ArrayList();
106          String[] primarycols;
107          remoteServerName = clientServerName;
108          Object[] lastIdArray = new Object[noOfTables];
109              /* bjt */ 
110                    /* grab database at this point and ignore further changes until next
111                     * sync session by starting transaction and setting isolation level to
112                     * 'serializable'.  This is SQL92 compliant, so should be fine... */
113                    /* begin work */
114                    /* set transaction isolation level serializable */
115                Statement transtmt = pub_sub_Connection.createStatement();
116                    transtmt.executeUpdate("begin work"); /* bjt */
117                    /* bjt - only do for postgres...until we test on other dbs */
118            if (dbDataTypeHandler.getvendorName() == Utility.DataBase_PostgreSQL) {
119                            transtmt.executeUpdate("set transaction isolation level serializable"); /* bjt */
120                    }
121              /* bjt - set autocommit false for select */
122          for (int i = 0; i < noOfTables; i++) {
123            try {
124              repTable = ( (RepTable) pubRepTables.get(i));
125              if (repTable.getCreateShadowTable().equalsIgnoreCase(RepConstants.NO))
126                continue;
127              sname = repTable.getSchemaQualifiedName();
128              tableName = isSchemaSupported ? sname.toString() : sname.getTableName();
129              EnCodedcols = PathHandler.getEncodedColumns(tableName);
130              shadowTable = RepConstants.shadow_Table(repTable.getSchemaQualifiedName().toString());
131              lastIdArray[i] = getLastUIDFromShadowTable(shadowTable);
132    //          viewedIds = new ArrayList();
133                viewedIds = new HashMap();
134              long lastId = getLastSyncId(pub_sub_Connection, remote_Pub_Sub_Name, tableName);
135              primaryColNames = repTable.getPrimaryColumns();
136              xmlWriter.setNoOFPrimaryColumnNumber(primaryColNames.length);
137              primaryPreparedStatement = dbDataTypeHandler.
138                              makePrimaryPreperedStatement(pub_sub_Connection, primaryColNames, shadowTable, local_pub_sub_name);
139              commonPreparedStatement = makeCommonPreparedStatement(pub_sub_Connection, shadowTable);
140              //statement whose resulset is set in tracer in getlastrecord in update case
141              commonStatement = pub_sub_Connection.createStatement();
142              // deleting records from shadow table with primaryColumns of Main Table as NULL
143             deleteRecordsFromShadowTableWithNullPk();
144    
145                      /* bjt */
146                      pub_sub_Connection.setAutoCommit(false);
147              String query = "Select * from " + shadowTable + " where " +
148                  RepConstants.shadow_sync_id1 +
149                  " > " + lastId + " and " + RepConstants.shadow_serverName_n +
150                  " != '" + remoteServerName + "' order by " +RepConstants.shadow_sync_id1;
151              filterClause = repTable.getFilterClause();
152              rows = getResultSet(pub_sub_Connection, query);
153              primaryColumnTypes = new int[primaryColNames.length];
154              ResultSetMetaData rsmt = rows.getMetaData();
155              int noOfColumns = rsmt.getColumnCount();
156              String operation;
157                      ArrayList updatedPrimaryKey = getUpdatedPrimaryKey(); /* bjt */
158              if (rows.next()) {
159                bw.write("<tableName>");
160                bw.write(tableName);
161                //if (usedActualTables.contains(tableName))
162                            /* bjt - should be negative...if it does not contain the tableName, add it */
163                if (!usedActualTables.contains(tableName))
164                  usedActualTables.add(tableName);
165                do {
166    //            int icount =0;
167    //            long time = System.currentTimeMillis();
168                  operation = rows.getString(RepConstants.shadow_operation3);
169                  if (operation.equalsIgnoreCase(RepConstants.insert_operation)) {
170                    // Write the insert element in XML file.
171                    makeInsertElement(bw, operation, noOfColumns, xmlWriter, rows,rsmt, remoteServerName, EnCodedcols, updatedPrimaryKey); /* bjt */
172                  }
173                  else if (operation.equals(RepConstants.update_operation)) {
174                    // Write the update element in XML file.
175                    makeUpdateElement(bw, noOfColumns, rsmt, xmlWriter, rows,shadowTable, remoteServerName,EnCodedcols, updatedPrimaryKey); /* bjt */
176                  }
177                  else if (operation.equals(RepConstants.delete_operation)) {
178                    //  Write the delete element in XML file.
179                    makeDeleteElement(bw, noOfColumns, xmlWriter, rows, EnCodedcols);
180                  }
181    
182    //            System.out.println((i++)+"  time taken in insert  "+ (System.currentTimeMillis()-time));
183                }
184                while (rows.next());
185                bw.write("</tableName>\r\n");
186              }
187                      /* bjt - set autocommit back to true */
188                      pub_sub_Connection.setAutoCommit(true);
189    
190              // change last_sync id in bookmarks table
191    //        updateBookMarkLastSyncId(shadowTable, tableName, remote_Pub_Sub_Name);
192            }
193            finally {
194              try {
195                if (rows != null) {
196                  Statement st = rows.getStatement();
197                  rows.close();
198                  st.close();
199                }
200                  if (primaryPreparedStatement != null) {
201                    primaryPreparedStatement.close();
202                  }
203                  if (commonPreparedStatement != null) {
204                    commonPreparedStatement.close();
205                  }
206                  if (commonStatement != null) {
207                    commonStatement.close();
208                  }
209                  if(primaryPreparedStatementBackwardTraversing!=null){
210                     primaryPreparedStatementBackwardTraversing.close();
211                  }if(commonPreparedStatementForBackwardTraversing!=null){
212                    commonPreparedStatementForBackwardTraversing.close();
213                 }
214              }
215              catch (SQLException ex1) {
216                //Ignore Exception
217              }
218            }
219    
220          }
221    //commented By Nancy on 29-03-2005
222    //to avoid record skipping during realTime Scheduling
223          /* for (int i = 0; i < noOfTables; i++) {
224            repTable = ( (RepTable) pubRepTables.get(i));
225            tableName = repTable.getSchemaQualifiedName().toString();
226             shadowTable = RepConstants.shadow_Table(repTable.getSchemaQualifiedName().toString());
227             updateBookMarkLastSyncId(shadowTable, tableName, remote_Pub_Sub_Name,lastIdArray[i]);
228           }*/
229    
230              /* bjt */
231                    /* commit */
232                transtmt.executeUpdate("commit"); /* bjt */
233                    transtmt.close();
234          bw.write("</root>");
235          bw.close();
236          os.close();
237          fos.close();
238         if(!localMachineAddress.equalsIgnoreCase(remoteMachineAddress))  {
239           // making zip file from xml file
240           ZipHandler.makeZip(zipFileURL, xmlFileURL, xmlFileName);
241           // writing zip file on socket
242    //      writeZIPFileOnClientSocket(socket, zipFileURL);
243           WriteOnSocket writeOnSocket = new WriteOnSocket(zipFileURL, xmlFileURL,DeleteXML, xmlFileName, fileUpload, true);
244           writeOnSocket.start();
245           writeOnSocket.join();
246         }
247          return new Object[] {
248              usedActualTables, lastIdArray};
249        }
250        catch (Exception ex) {
251          log.error(ex.getMessage(), ex);
252          RepException rep = new RepException("REP057", new Object[] {ex.getMessage()});
253          rep.setStackTrace(ex.getStackTrace());
254          throw rep;
255        }
256       finally {
257                    connectionPool.returnConnection(pub_sub_Connection);
258       }
259      }
260    
261      /**
262       * prepared statement for getting common record for the copmmon id
263       * @param shadowTable
264       * @return
265       * @throws SQLException
266       */
267      private PreparedStatement makeCommonPreparedStatement(Connection pub_sub_Connection, String shadowTable) throws SQLException, RepException {
268        StringBuffer query = new StringBuffer();
269        query.append(" select * from ");
270        query.append(shadowTable);
271        query.append(" where ");
272        query.append(RepConstants.shadow_sync_id1);
273        query.append(" > ");
274        query.append("? ");
275        query.append(" and ");
276        query.append(RepConstants.shadow_common_id2);
277        query.append(" = ");
278        query.append("? ");
279        query.append(" and ");
280        query.append(RepConstants.shadow_status4);
281        query.append(" = '");
282        query.append(RepConstants.afterUpdate);
283        query.append("' ");
284        //check (dbDataTypeHandler.getvendorName() == Utility.DataBase_DB2) added by nancy to handle blob clob case
285    //System.out.println(" commonPreapredStatement::  "+query.toString().toUpperCase());
286    //    if (dbDataTypeHandler.getvendorName() == Utility.DataBase_PostgreSQL ||
287    //        dbDataTypeHandler.getvendorName() == Utility.DataBase_DB2) {
288          return pub_sub_Connection.prepareStatement(query.toString());
289    //    }
290    //    return pub_sub_Connection.prepareStatement(query.toString(),
291    //                                               ResultSet.TYPE_SCROLL_INSENSITIVE,
292    //                                               ResultSet.CONCUR_UPDATABLE);
293    
294      }
295    
296      /**
297       * returns last synchronization id from bookmark TableName
298       * @param remote_Pub_Sub_subName
299       * @param tableName
300       * @return
301       */
302      private long getLastSyncId(Connection pub_sub_Connection, String remote_Pub_Sub_subName, String tableName) {
303        ResultSet rs = null;
304        try {
305          StringBuffer query = new StringBuffer();
306          query.append(" Select ").append(RepConstants.bookmark_lastSyncId4).append(" from ")
307              .append(dbDataTypeHandler.getBookMarkTableName()).append(" where ")
308              .append(RepConstants.bookmark_LocalName1)
309              .append(" = '").append(local_pub_sub_name).append("' and ")
310              .append(RepConstants.bookmark_RemoteName2)
311              .append(" = '").append(remote_Pub_Sub_subName).append("'")
312              .append(" and ").append(RepConstants.bookmark_TableName3)
313              .append("= '")
314              .append(tableName).append("'");
315          log.debug(query.toString());
316          rs = getResultSet(pub_sub_Connection, query.toString());
317          Object lastIdObj = null;
318          if (rs.next()) {
319            // It may rise class cast exception and require casting after check object type
320            // use insetanceof java feature for multiple handling
321            lastIdObj = rs.getObject(RepConstants.bookmark_lastSyncId4);
322          }
323          long lastId = 0;
324          if (lastIdObj instanceof Long) {
325            lastId = lastIdObj == null ? 0 : ( (Long) lastIdObj).longValue();
326          }
327          else if (lastIdObj instanceof BigDecimal) {
328            lastId = lastIdObj == null ? 0 : ( (BigDecimal) lastIdObj).longValue();
329          }
330          else if (lastIdObj instanceof Integer) {
331            lastId = lastIdObj == null ? 0 : ( (Integer) lastIdObj).longValue();
332          }
333          else if (lastIdObj instanceof Double) {
334            lastId = lastIdObj == null ? 0 : ( (Double) lastIdObj).longValue();
335          } else if(lastIdObj instanceof String) {
336            lastId = lastIdObj == null ? 0 : ( Long.parseLong((String)lastIdObj)  );
337          }
338          return lastId;
339        }
340        catch (Exception ex) {
341          return 0;
342        }
343        finally {
344          try {
345            if (rs != null) {
346              Statement st = rs.getStatement();
347              rs.close();
348              if (st != null)
349                st.close();
350            }
351          }
352          catch (SQLException ex1) {
353          }
354        }
355      }
356    
357      private ResultSet getResultSet(Connection pub_sub_Connection, String nonPreparedQuery) throws SQLException, RepException {
358        //Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
359            
360        Statement st;
361        if (dbDataTypeHandler.getvendorName() == Utility.DataBase_PostgreSQL ||
362            dbDataTypeHandler.getvendorName() == Utility.DataBase_Cloudscape ||
363            dbDataTypeHandler.getvendorName() == Utility.DataBase_DB2 ||
364            dbDataTypeHandler.getvendorName() == Utility.DataBase_SqlServer) {
365          st = pub_sub_Connection.createStatement();
366        }
367        else {
368          st = pub_sub_Connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
369        }
370        log.debug(nonPreparedQuery);
371        return st.executeQuery(nonPreparedQuery);
372      }
373    
374      /**
375       * creates and XML Insert Element
376       * @param operation
377       * @param noOfColumns
378       * @param xmlWriter
379       * @param rows_I
380       * @param rsmt
381       * @param remoteServerName
382       * @throws SQLException
383       * @throws IOException
384       * @throws RepException
385       */
386      private void makeInsertElement(Writer bw, String operation, int noOfColumns,
387                                     XMLWriter xmlWriter, ResultSet rows_I,
388                                     ResultSetMetaData rsmt,
389                                     String remoteServerName,
390                                     ArrayList encodedCols, ArrayList updatedPrimaryKey) throws SQLException,
391          IOException, RepException {
392    
393        Object Uid = rows_I.getObject(RepConstants.shadow_sync_id1);
394    
395        if (viewedIds.containsKey(Uid)) {
396          viewedIds.remove(Uid);
397          return;
398        }
399    
400        Object[] primaryColValues = new Object[primaryColNames.length];
401    
402        for (int i = 0; i < primaryColNames.length; i++) {
403          primaryColValues[i] = rows_I.getObject(primaryColNames[i]);
404        }
405    
406        if (USE_getLastRecord) { // can comment this code if no error comes
407          makeInsertElement_GetLastRecord(bw, operation, noOfColumns, xmlWriter,
408                                          rows_I, rsmt, remoteServerName,
409                                          primaryColValues,
410                                          encodedCols, updatedPrimaryKey);
411        }
412        else {
413          if (filterResultSet(rows_I)) {
414            writeInsertElement(bw, operation, noOfColumns, xmlWriter, rows_I, rsmt,
415                               primaryColValues,  encodedCols);
416          }
417        }
418      }
419    
420      /**
421       * creates and XML Insert Element
422       * iF USED with getLast Records for shadow Table
423       * @param operation
424       * @param noOfColumns
425       * @param xmlWriter
426       * @param rows_I
427       * @param rsmt
428       * @param remoteServerName
429       * @param primaryColValues
430       * @throws IOException
431       * @throws RepException
432       * @throws SQLException
433       */
434      private void makeInsertElement_GetLastRecord(Writer bw, String operation,
435                                                   int noOfColumns,
436                                                   XMLWriter xmlWriter,
437                                                   ResultSet rows_I,
438                                                   ResultSetMetaData rsmt,
439                                                   String remoteServerName,
440                                                   Object[] primaryColValues,
441                                                   ArrayList encodedCols, ArrayList updatedPrimaryKey) throws
442          IOException, RepException, SQLException {
443        Object UId = rows_I.getObject(RepConstants.shadow_sync_id1);
444        Tracer tracer = new Tracer();
445        getLastRecord(primaryColValues, UId, tracer, remoteServerName, updatedPrimaryKey);
446        if (!tracer.recordFound) {
447          if (filterResultSet(rows_I)) {
448            writeInsertElement(bw, operation, noOfColumns, xmlWriter, rows_I, rsmt,
449                               primaryColValues,  encodedCols);
450          }
451        }
452        else if (tracer.type.equals(RepConstants.update_operation)) {
453          ResultSet rsTracer = tracer.rs;
454          if (filterResultSet(rsTracer)) {
455            writeInsertElement(bw, operation, noOfColumns, xmlWriter, rsTracer,
456                               rsTracer.getMetaData(), tracer.primaryKeyValues,
457                                encodedCols);
458          }
459        }
460    //    else if (tracer.type.equalsIgnoreCase(RepConstants.delete_operation)) {
461    //    }
462      }
463    
464      /**
465       * creates a XML element for Insert
466       * @param operation
467       * @param noOfColumns
468       * @param xmlWriter
469       * @param rows_I
470       * @param rsmt
471       * @param primaryColValues
472       * @throws SQLException
473       * @throws IOException
474       * @throws RepException
475       */
476      private void writeInsertElement(Writer bw, String operation, int noOfColumns,
477                                      XMLWriter xmlWriter, ResultSet rows_I,
478                                      ResultSetMetaData rsmt,
479                                      Object[] primaryColValues,
480                                      ArrayList encodedCols) throws
481          SQLException, IOException, RepException {
482        /*
483              // just an example not mandat implementation.
484              <operation>Insert
485         <row>
486           <columnName name="c1">
487              <![CDATA[ value value value ]]>
488          </columnName>
489    
490          <columnName name="c2"> // suppose column is of blob/clob type
491             <start> start position index</start>
492             <length> number of characters</length>
493          </columnName>
494         </row>
495         <primary>
496           <primaryColumnNames name="p1">
497                  <![CDATA[ value value value ]]>
498           </primaryColumnNames>
499         </primary>
500              </operation>
501         */
502    
503          long startTime = System.currentTimeMillis();
504          bw.write("<operation>");
505          bw.write(operation);
506          xmlWriter.writeRowElement(noOfColumns, rows_I, rsmt, primaryColNames,primaryColValues, tableName, encodedCols);
507          xmlWriter.writePrimaryKeyElement(primaryColNames, primaryColValues,encodedCols);
508          bw.write("</operation>\r\n");
509          if(countWriteInsertElement<=5) {
510    //System.out.println(" Time taken in write the INSERT element : " +(System.currentTimeMillis() - startTime));
511           countWriteInsertElement++;
512          }
513      }
514    
515      /**
516       * Returns a resutSet for the other common record corresponding to the common Id passed.
517       * @param rs
518       * @param tracer
519       * @return
520       * @throws SQLException
521       */
522      private ResultSet getOtherCommonRecord(ResultSet rs, Tracer tracer) throws SQLException, RepException {
523        // so that if the method is called recursively then result set created is not closed.
524    //    commonPreparedStatement = makeCommonPreparedStatement(shadowTable);
525        Object commonId = rs.getObject(RepConstants.shadow_common_id2);
526        String serverName = rs.getString(RepConstants.shadow_serverName_n);
527        Object UId = rs.getObject(RepConstants.shadow_sync_id1);
528        if (!serverName.equalsIgnoreCase(remoteServerName)) {
529          viewedIds.put(UId,null);
530        }
531        commonPreparedStatement.setObject(1, UId);
532            /* bjt - try for quotes */
533        //commonPreparedStatement.setObject(1, UId.toString());
534        commonPreparedStatement.setObject(2, commonId);
535            /* bjt - try for quotes */
536        //commonPreparedStatement.setObject(2, commonId.toString());
537        long starttime = System.currentTimeMillis();
538         ResultSet resultSet = commonPreparedStatement.executeQuery();
539    //    if(commPreCount<5) {
540    //System.out.println("  TIME TAKNE IN EXECUTION OF COMMON PREPARED STATEMENT : " +(System.currentTimeMillis() - starttime));
541    //     commPreCount++;
542    //     }
543    
544        return resultSet;
545      }
546    
547      /**
548       * creates and XML Element for Update
549       * @param noOfColumns
550       * @param rsmt
551       * @param xmlWriter
552       * @param rows_U
553       * @param shadowTableName
554       * @param remoteServerName
555       * @throws java.lang.Exception
556       */
557      private void makeUpdateElement(Writer bw, int noOfColumns,
558                                     ResultSetMetaData rsmt
559                                     , XMLWriter xmlWriter, ResultSet rows_U,
560                                     String shadowTableName,
561                                     String remoteServerName,
562                                     ArrayList encodedCols, ArrayList updatedPrimaryKey) throws Exception {
563        Tracer tracer = new Tracer();
564        ResultSet rs = null;
565        try {
566          Object Uid = rows_U.getObject(RepConstants.shadow_sync_id1);
567          if (viewedIds.containsKey(Uid)) {
568            viewedIds.remove(Uid);
569            return;
570          }
571    //      if (rows_U.getString(RepConstants.shadow_status4).equalsIgnoreCase(
572    //          RepConstants.afterUpdate)) {
573    //        throw new Exception(" NOT POSSIBLE ");
574    //      }
575    
576          Object[] oldPrimaryColValues = new Object[primaryColNames.length];
577          for (int i = 0; i < primaryColNames.length; i++) {
578            oldPrimaryColValues[i] = rows_U.getObject(primaryColNames[i]);
579          }
580          rs = getOtherCommonRecord(rows_U, tracer);
581          boolean recordPresent = rs.next();
582          Object[] primaryColValues = new Object[primaryColNames.length];
583          for (int i = 0; i < primaryColNames.length; i++) {
584            primaryColValues[i] = rs.getObject(primaryColNames[i]);
585          }
586          String serverName = rs.getString(RepConstants.shadow_serverName_n);
587          Object currentUId = rs.getObject(RepConstants.shadow_sync_id1);
588          if (!serverName.equalsIgnoreCase(remoteServerName)) {
589            viewedIds.put(currentUId,null);
590          }
591    
592          if (USE_getLastRecord) {
593            makeUpdateElement_getLastRecord(bw, noOfColumns, xmlWriter, rows_U,
594                                            remoteServerName, oldPrimaryColValues,
595                                            tracer, rs, primaryColValues,
596                                            currentUId,  encodedCols, updatedPrimaryKey);
597          }
598          else {
599            makeUpdateEment_ForFilter(bw, noOfColumns, xmlWriter, rows_U,
600                                      oldPrimaryColValues, rs, primaryColValues,
601                                      tracer, encodedCols);
602          }
603        }
604        finally {
605          if (tracer.rs != null) {
606            ResultSet rstemp = tracer.rs;
607            rstemp.close();
608          }
609          if (rs != null) {
610            rs.close();
611          }
612        }
613    
614      }
615    
616      /**
617       * creates an XML element for update With filter clause
618       * @param noOfColumns
619       * @param xmlWriter
620       * @param rows_U
621       * @param oldPrimaryColValues
622       * @param newRs
623       * @param primaryColValues
624       * @throws java.lang.Exception
625       */
626      private void makeUpdateEment_ForFilter(Writer bw, int noOfColumns,
627                                             XMLWriter xmlWriter,
628                                             ResultSet rows_U,
629                                             Object[] oldPrimaryColValues,
630                                             ResultSet newRs,
631                                             Object[] primaryColValues,
632                                             Tracer tracer,
633                                             ArrayList encodedCols) throws
634          Exception {
635        boolean oldFilterResult = filterResultSet(rows_U);
636        boolean newFilterResult = filterResultSet(newRs);
637        if (oldFilterResult && newFilterResult) {
638          writeUpdateElement(bw, noOfColumns, xmlWriter, newRs, newRs.getMetaData(),
639                             oldPrimaryColValues, primaryColValues, rows_U, tracer,
640                              encodedCols);
641        }
642        else if (oldFilterResult && (!newFilterResult)) {
643          writeDeleteElement(bw, xmlWriter, rows_U, oldPrimaryColValues,
644                              encodedCols);
645        }
646        else if ( (!oldFilterResult) && newFilterResult) {
647          writeInsertElement(bw, RepConstants.insert_operation,
648                             newRs.getMetaData().getColumnCount(), xmlWriter, newRs,
649                             newRs.getMetaData(), primaryColValues,
650                             encodedCols);
651        }
652      }
653    
654      /**
655       * creates an XML element for Update is Get Last record from shadow Table is Used.
656       * @param noOfColumns
657       * @param xmlWriter
658       * @param rows_U
659       * @param remoteServerName
660       * @param oldPrimaryColValues
661       * @param tracer
662       * @param rs
663       * @param primaryColValues
664       * @param currentUId
665       * @throws java.lang.Exception
666       */
667      private void makeUpdateElement_getLastRecord(Writer bw, int noOfColumns,
668                                                   XMLWriter xmlWriter,
669                                                   ResultSet rows_U,
670                                                   String remoteServerName,
671                                                   Object[] oldPrimaryColValues,
672                                                   Tracer tracer, ResultSet rs,
673                                                   Object[] primaryColValues,
674                                                   Object currentUId,
675                                                   ArrayList encodedCols, ArrayList updatedPrimaryKey) throws
676          Exception {
677        getLastRecord(primaryColValues, currentUId, tracer, remoteServerName, updatedPrimaryKey);
678        if (!tracer.recordFound) {
679          makeUpdateEment_ForFilter(bw, noOfColumns, xmlWriter, rows_U,
680                                    oldPrimaryColValues, rs, primaryColValues,
681                                    tracer,  encodedCols);
682        }
683        else if (tracer.type.equals(RepConstants.delete_operation)) {
684          if (filterResultSet(rows_U)) {
685            writeDeleteElement(bw, xmlWriter, rows_U, oldPrimaryColValues,
686                                encodedCols);
687          }
688        }
689        else if (tracer.type.equals(RepConstants.update_operation)) {
690          ResultSet rsTracer = tracer.rs;
691          makeUpdateEment_ForFilter(bw, noOfColumns, xmlWriter, rows_U,
692                                    oldPrimaryColValues, rsTracer,
693                                    primaryColValues, tracer,
694                                    encodedCols);
695    
696        }
697        else {
698          ResultSet rsTracer = tracer.rs;
699          if (rsTracer == null) {
700            rsTracer = rs;
701            tracer.primaryKeyValues = primaryColValues;
702          }
703          if (filterResultSet(rsTracer)) {
704            writeInsertElement(bw, RepConstants.insert_operation, noOfColumns,
705                               xmlWriter, rsTracer,
706                               rsTracer.getMetaData(), tracer.primaryKeyValues,
707                                encodedCols);
708    
709          }
710        }
711    
712      }
713    
714      // Update Operation Only
715      private void writeUpdateElement(Writer bw, int noOfColumns,
716                                      XMLWriter xmlWriter,
717                                      ResultSet rs,
718                                      ResultSetMetaData rsmt,
719                                      Object[] primaryKeyValues,
720                                      Object[] newprimaryKeyValues,
721                                      ResultSet originalResultSet,
722                                      Tracer tracer
723                                      , ArrayList encodedCols
724                                      ) throws Exception {
725          long startTime =System.currentTimeMillis();
726          bw.write("<operation>");
727          bw.write(RepConstants.update_operation);
728          if (tracer.primaryKeyValues != null) {
729            xmlWriter.writeRowElementForUpdate(noOfColumns, rs, rsmt,
730                                               originalResultSet, primaryColNames,
731                                               tracer.primaryKeyValues, tableName,
732                                               encodedCols);
733          }
734          else {
735            xmlWriter.writeRowElementForUpdate(noOfColumns, rs, rsmt,
736                                               originalResultSet, primaryColNames,
737                                               newprimaryKeyValues, tableName,
738                                               encodedCols);
739          }
740    
741          xmlWriter.writePrimaryKeyElement(primaryColNames, primaryKeyValues,encodedCols);
742          bw.write("</operation>\r\n");
743    //      if(countWriteupdateElement<=5) {
744    //        System.out.println(" TIME TAKEN TO WRITE THE UPDATE ELEMENT : " +(System.currentTimeMillis() - startTime));
745    //        countWriteupdateElement++;
746    //      }
747      }
748    
749      // Updated record is deleted
750      private void writeDeleteElement(Writer bw, XMLWriter xmlWriter, ResultSet rs,
751                                      Object[] primaryKeyValues,
752                                       ArrayList encodedCols) throws
753          Exception {
754        long startTime = System.currentTimeMillis();
755          bw.write("<operation>");
756          bw.write(RepConstants.delete_operation);
757          xmlWriter.writePrimaryKeyElement(primaryColNames, primaryKeyValues, encodedCols);
758          bw.write("</operation>\r\n");
759    //    if(countWriteDelementelement<=5) {
760    //      System.out.println(" TIME TAKEN TO WRITE THE DELETE ELEMENT  " +(System.currentTimeMillis() - startTime));
761    //      countWriteDelementelement++;
762    //    }
763      }
764    
765    // writes a n delete Element in XML file
766      private void makeDeleteElement(Writer bw, int noOfColumns,
767                                     XMLWriter xmlWriter, ResultSet rows_D,
768                                      ArrayList encodedCols) throws
769          Exception {
770        Object Uid = rows_D.getObject(RepConstants.shadow_sync_id1);
771        if (viewedIds.containsKey(Uid)) {
772          viewedIds.remove(Uid);
773          return;
774        }
775    
776        Object[] primaryColValues = new Object[primaryColNames.length];
777        for (int i = 0; i < primaryColNames.length; i++) {
778          primaryColValues[i] = rows_D.getObject(primaryColNames[i]);
779        }
780    //  primaryColValues =  getLastRecordBackwardtraversing(primaryColValues,Uid);
781    //    if (primaryColValues!=null&&filterResultSet(rows_D)) {
782           if (filterResultSet(rows_D)) {
783          writeDeleteElement(bw, xmlWriter, rows_D, primaryColValues, encodedCols);
784        }
785      }
786    
787      /**
788       * get last unique id from the corresponding the shadow Table
789       * @param shadowTable
790       * @return
791       * @throws java.lang.Exception
792       */
793      /*  private Object getLastUIDFromShadowTable(String shadowTable) throws Exception {
794          StringBuffer query = new StringBuffer();
795          ResultSet rs = null;
796          try {
797       query.append(" Select max(").append(RepConstants.shadow_sync_id1).append(
798                ")").append(" from ").append(shadowTable);
799            Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
800       rs = pub_sub_Connection.createStatement().executeQuery(query.toString());
801            boolean flag = rs.next();
802            Object lastId = rs.getObject(1);
803            return flag ? (lastId == null ? new Long(0) : lastId) : new Long(0);
804          }
805          finally {
806            Statement st = rs.getStatement();
807            rs.close();
808            st.close();
809          }
810        }
811       */
812    
813      private Object getLastUIDFromShadowTable(String shadowTable) throws Exception {
814            Connection pub_sub_Connection = null;
815        StringBuffer query = new StringBuffer();
816        ResultSet rs = null;
817        boolean flag = false;
818        Object lastId = null;
819        try {
820    //RepConstants.writeMessage_FILE(" pub_sub_Connection ="+pub_sub_Connection);
821          //query.append(" Select max(").append(RepConstants.shadow_sync_id1).append(")").append(" from ").append(shadowTable);
822              /* bjt - for postgres performance */
823          query.append(" Select ").append(RepConstants.shadow_sync_id1).append(" from ").append(shadowTable).append(" order by ").append(RepConstants.shadow_sync_id1).append(" desc limit 1");
824          pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
825          rs = pub_sub_Connection.createStatement().executeQuery(query.toString());
826    //RepConstants.writeMessage_FILE("getLastUIDFromShadowTable rs :: "+rs);
827          flag = rs.next();
828              if (flag)
829            lastId = rs.getObject(1);
830        }
831        catch (Exception ex) {
832          RepConstants.writeERROR_FILE(ex);
833        }
834        finally {
835          if (rs != null) {
836            Statement st = rs.getStatement();
837            rs.close();
838            if (st != null)
839              st.close();
840          }
841             connectionPool.returnConnection(pub_sub_Connection);
842        }
843        return flag ? (lastId == null ? new Long(0) : lastId) : new Long(0);
844      }
845    
846      /**
847       * update the bookmarks table after XML file for synchronisation is ready with the last UniqueID.
848       * @param shadowTable
849       * @param tableName
850       * @param remote_Pub_Sub_Name
851       * @throws java.lang.Exception
852       */
853      private void updateBookMarkLastSyncId(String shadowTable, String tableName,
854                                            String remote_Pub_Sub_Name,
855                                            Object lastId) throws Exception {
856    
857        Statement stmt = null;
858            Connection pub_sub_Connection = null;
859        try {
860    //        Object lastId = getLastUIDFromShadowTable(shadowTable);
861          String updateQuery = "update " + dbDataTypeHandler.getBookMarkTableName() +" set " +
862              RepConstants.bookmark_lastSyncId4 + "=" + lastId + " where  " +
863              RepConstants.bookmark_LocalName1 + " = '" + local_pub_sub_name +"' and " +
864              RepConstants.bookmark_RemoteName2 + " = '" + remote_Pub_Sub_Name +"' and " +
865              RepConstants.bookmark_TableName3 + " = '" + tableName + "' ";
866          pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
867          stmt = pub_sub_Connection.createStatement();
868          int updateNumber = stmt.executeUpdate(updateQuery);
869        }
870        finally {
871          if (stmt != null)
872            stmt.close();
873                    connectionPool.returnConnection(pub_sub_Connection);
874        }
875      }
876    
877      /**
878       * writes the zip file on the other nodes socket.
879       * @param s
880       * @param xmlFilePath
881       * @throws IOException
882       */
883      private void writeZIPFileOnClientSocket(Socket s, String xmlFilePath) throws IOException {
884        FileInputStream fis = new FileInputStream(xmlFilePath);
885        OutputStream sos = s.getOutputStream();
886        s.setSendBufferSize(Integer.MAX_VALUE);
887        BufferedOutputStream bos = new BufferedOutputStream(sos);
888    
889        byte[] buf = new byte[1024];
890        int len = 0;
891        int kb = 0;
892        while ( (len = fis.read(buf)) > 0) {
893          bos.write(buf, 0, len);
894        }
895        bos.flush();
896        bos.close();
897        sos.close();
898        fis.close();
899      }
900    
901    //  public Object[] createXMLFile(String xmlFileURL, String zipFielURL,
902    //                                String xmlFileName, String remote_Pub_Sub_Name,
903    //                                ArrayList subRepTables,String clientServerName,
904    //                                 int noOfTables, boolean DeleteXML,
905    //                                String local_pub_sub_name,boolean isSchemaSupported,_FileUpload fileUpload) throws RepException {
906    //    return createXMLFile(xmlFileURL, zipFielURL, xmlFileName,
907    //                         remote_Pub_Sub_Name, subRepTables, clientServerName,noOfTables, DeleteXML, local_pub_sub_name,isSchemaSupported,fileUpload);
908    //  }
909    
910      /**
911       * deletes all records from the shadow tablw having null primary key set corresponding to the actual table.
912       * @throws SQLException
913       */
914      private void deleteRecordsFromShadowTableWithNullPk() throws SQLException,
915          RepException {
916        StringBuffer query = new StringBuffer();
917        query.append(" delete from ").append(shadowTable).append(" where ");
918        for (int i = 0; i < primaryColNames.length; i++) {
919          if (i > 0) {
920            query.append(" and ");
921          }
922          query.append(primaryColNames[i]).append(" is null ");
923        }
924        Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
925        Statement st = null;
926        try {
927          st = pub_sub_Connection.createStatement();
928          st.executeUpdate(query.toString());
929        }
930        finally {
931          if (st != null)
932            st.close();
933                    connectionPool.returnConnection(pub_sub_Connection);
934        }
935      }
936    
937      /**
938       * filters the result set with the filter clause given.
939       * @param rs
940       * @return
941       * @throws SQLException
942       * @throws SQLException
943       */
944      private boolean filterResultSet(ResultSet rs) throws SQLException, SQLException, RepException {
945            Connection pub_sub_Connection = null;
946        StringBuffer query = new StringBuffer();
947        if ( (filterClause != null) && (!filterClause.trim().equals(""))) {
948          Object Uid = rs.getObject(RepConstants.shadow_sync_id1);
949          query.append("Select * from ").append(shadowTable).append(" where ")
950              .append(RepConstants.shadow_sync_id1)
951              .append(" = ").append(Uid)
952              .append(" and ").append(RepConstants.shadow_serverName_n)
953              .append(" != '")
954              .append(remoteServerName).append("' and ").append(filterClause);
955          PreparedStatement pstmt = null;
956          ResultSet filteredSet = null;
957          try {
958            pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
959            pstmt = pub_sub_Connection.prepareStatement(query.toString());
960            if (parameters != null) {
961              for (int i = 0, size = parameters.length; i < size; i++) {
962                pstmt.setString(i + 1, parameters[0]);
963              }
964            }
965            filteredSet = pstmt.executeQuery();
966            boolean f1 = filteredSet.next();
967            return f1;
968          }
969          finally {
970            if (filteredSet != null)
971              filteredSet.close();
972            if (pstmt != null)
973              pstmt.close();
974                      connectionPool.returnConnection(pub_sub_Connection);
975          }
976        }
977        else {
978          return true;
979        }
980      }
981    
982    
983    //  Changed by neeraj sir as stackoverflow was coming
984    //  when a single row was updated more than 2500 times(ANZ case)
985    //  now getlastrecord not called recursively
986    //  but executed in infinite loop
987    
988      private boolean getLastRecord(Object[] primaryColValues0, Object UId0,
989                                    Tracer tracer, String remoteServerName, ArrayList updatedPrimaryKey) throws
990          RepException, SQLException {
991        boolean recordFound = false;
992        Object[] primaryColValues = primaryColValues0;
993        Object[] oldPrimaryColValues = primaryColValues0;
994        Object UId = UId0;
995        String returnOperation = "";
996    //    ArrayList updatedPrimaryKey = getUpdatedPrimaryKey(); /* bjt */
997        boolean isPrimaryKeyUpdated =true;
998        Object[] UIdTemp=null;
999        while (true) {
1000          primaryPreparedStatement.setObject(1, UId);
1001          for (int i = 0; i < primaryColValues.length; i++) {
1002            primaryPreparedStatement.setObject(i + 2, primaryColValues[i]);
1003          }
1004    
1005          ResultSet rs = primaryPreparedStatement.executeQuery();
1006    //    execute prepared statement and check the record in result set
1007          if (!rs.next()) {
1008            rs.close();
1009            break;
1010          }
1011          else {
1012            String operation = rs.getString(RepConstants.shadow_operation3);
1013            String serverName = rs.getString(RepConstants.shadow_serverName_n);
1014            if (operation.equalsIgnoreCase(RepConstants.delete_operation)) {
1015             // Delete Operation
1016              if (!serverName.equalsIgnoreCase(remoteServerName)) {
1017                Object currentUId = rs.getObject(RepConstants.shadow_sync_id1);
1018                viewedIds.put(currentUId,null);
1019                recordFound = true;
1020                returnOperation = RepConstants.delete_operation;
1021              }
1022              break;
1023            }
1024            else if (operation.equals(RepConstants.update_operation)) {
1025              ResultSet resultSet = getOtherCommonRecord(rs, tracer);
1026              resultSet.next();
1027              oldPrimaryColValues = primaryColValues;
1028              primaryColValues = new Object[primaryColNames.length];
1029              for (int i = 0; i < primaryColNames.length; i++) {
1030                primaryColValues[i] = resultSet.getObject(primaryColNames[i]);
1031              }
1032              serverName = resultSet.getString(RepConstants.shadow_serverName_n);
1033              UId = resultSet.getObject(RepConstants.shadow_sync_id1);
1034              if (!serverName.equalsIgnoreCase(remoteServerName)) {
1035                viewedIds.put(UId,null);
1036              }
1037              isPrimaryKeyUpdated = isPrimaryKeyUpdated(primaryColValues,updatedPrimaryKey);
1038    //System.out.println("  isPrimaryKeyUpdated ::  "+isPrimaryKeyUpdated);
1039              if(!isPrimaryKeyUpdated) {
1040              UIdTemp = getUIDRecordUpdatedExceptPK(primaryColNames, primaryColValues, oldPrimaryColValues);
1041                if (UIdTemp != null) {
1042                  UId = UIdTemp[0];
1043                  viewedIds.put(UId,null);
1044                }
1045              }
1046              recordFound = true;
1047              returnOperation = RepConstants.update_operation;
1048              resultSet.close();
1049            }
1050            else {
1051              // for insert type operation case produced by yana's table having triggers       .
1052              primaryColValues = new Object[primaryColNames.length];
1053              for (int i = 0; i < primaryColNames.length; i++) {
1054                primaryColValues[i] = rs.getObject(primaryColNames[i]);
1055              }
1056              serverName = rs.getString(RepConstants.shadow_serverName_n);
1057              UId = rs.getObject(RepConstants.shadow_sync_id1);
1058              if (!serverName.equalsIgnoreCase(remoteServerName)) {
1059                viewedIds.put(UId,null);
1060              }
1061              recordFound = true;
1062              returnOperation = RepConstants.insert_operation;
1063            }
1064          } // end if (!rs.next())
1065              // bjt - close result set 
1066              rs.close();
1067        } // end while (true)
1068        // bjt - move above return to set tracer before returning
1069        if (!recordFound)
1070          return recordFound;
1071        tracer.recordFound = recordFound;
1072        tracer.type = returnOperation;
1073        if (returnOperation.equals(RepConstants.update_operation)) {
1074          tracer.primaryKeyValues = primaryColValues;
1075          if(!isPrimaryKeyUpdated) {
1076            tracer.rs = (ResultSet)UIdTemp[1];
1077          } else {
1078            String finalResultSetQuery = "select * from " + shadowTable + " where " +RepConstants.shadow_sync_id1 + " = " + UId;
1079                    /* bjt - try to make postgres use index */
1080            //String finalResultSetQuery = "select * from " + shadowTable + " where " +RepConstants.shadow_sync_id1 + " = " + UId.toString();
1081            ResultSet resultSet = commonStatement.executeQuery(finalResultSetQuery);
1082            resultSet.next();
1083            tracer.rs = resultSet;
1084                    // bjt - close resultSet 
1085                    //resultSet.close();
1086          }
1087        }
1088        return recordFound;
1089      }
1090    
1091      /**
1092       * This method is implemented to traverse the record backward for writing
1093       * the delete element in XML file.It traverse the record backward.
1094       * If a record found the status 'B' then record is trvarsed continue up to
1095       * original record that have status 'I'.
1096       */
1097      private Object[] getLastRecordBackwardtraversing(Object[] primaryColValues0, Object UId) throws SQLException, RepException {
1098          String serverName =null;
1099          Object[] primaryColValues =new Object[primaryColValues0.length] ;
1100          for (int i = 0; i < primaryColValues0.length; i++) {
1101            primaryColValues[i] =primaryColValues0[i];
1102          }
1103            while(true) {
1104              primaryPreparedStatementBackwardTraversing.setObject(1, UId);
1105              for (int i = 0; i < primaryColValues.length; i++) {
1106                primaryPreparedStatementBackwardTraversing.setObject(i + 2, primaryColValues[i]);
1107              }
1108              ResultSet rs = primaryPreparedStatementBackwardTraversing.executeQuery();
1109              boolean flag = rs.next();
1110              if (!flag) {
1111                rs.close();
1112                return primaryColValues;
1113              }
1114              String operation = rs.getString(RepConstants.shadow_operation3);
1115              if (operation.equals(RepConstants.insert_operation)) {
1116                if(rs!=null)
1117                 rs.close();
1118                return null;
1119              }
1120              else {
1121        //         String operation = rs.getString(RepConstants.shadow_operation3);
1122                if (operation.equalsIgnoreCase(RepConstants.delete_operation)) { // Delete Operation
1123                  if(rs!=null)
1124                   rs.close();
1125                  return primaryColValues;
1126                }
1127                else {
1128                  ResultSet resultSet = getOtherCommonRecordBackwardTraversing(rs);
1129                  boolean updatedREcord = resultSet.next();
1130                  if (!updatedREcord) { // other common record not found
1131                    throw new RepException("REP051",new Object[] {rs.getObject(RepConstants.shadow_common_id2), shadowTable});
1132                  }
1133                  primaryColValues = new Object[primaryColNames.length];
1134                  for (int i = 0; i < primaryColNames.length; i++) {
1135                    primaryColValues[i] = resultSet.getObject(primaryColNames[i]);
1136                  }
1137                  UId= resultSet.getObject(RepConstants.shadow_sync_id1);
1138                  serverName = resultSet.getString(RepConstants.shadow_serverName_n);
1139                  if (!serverName.equalsIgnoreCase(remoteServerName)) {
1140                  viewedIds.put(UId,null);
1141                }
1142                if(resultSet!=null)
1143                  resultSet.close();
1144                }
1145              }
1146            }
1147           }
1148    
1149           private ResultSet getOtherCommonRecordBackwardTraversing(ResultSet rs) throws SQLException,RepException {
1150              Object commonId = rs.getObject(RepConstants.shadow_common_id2);
1151              Object UId = rs.getObject(RepConstants.shadow_sync_id1);
1152              commonPreparedStatementForBackwardTraversing.setObject(1, UId);
1153              commonPreparedStatementForBackwardTraversing.setObject(2, commonId);
1154              for (int i = 0; i < primaryColNames.length; i++) {
1155                commonPreparedStatementForBackwardTraversing.setObject(i + 3,rs.getObject("REP_OLD_" +primaryColNames[i]));
1156              }
1157              return commonPreparedStatementForBackwardTraversing.executeQuery();
1158            }
1159    
1160            private PreparedStatement makeCommonPreparedStatementBackwardTraversing() throws
1161                SQLException, RepException {
1162              StringBuffer query = new StringBuffer();
1163              query.append(" select * from ");
1164              query.append(shadowTable);
1165              query.append(" where ");
1166              query.append(RepConstants.shadow_sync_id1);
1167              query.append(" != ");
1168              query.append(" ? ");
1169              query.append(" and ");
1170              query.append(RepConstants.shadow_common_id2);
1171              query.append(" = ");
1172              query.append(" ? ");
1173          // for matching oldPrimary keys
1174              for (int i = 0; i < primaryColNames.length; i++) {
1175                query.append(" and ");
1176                query.append("REP_OLD_" + primaryColNames[i]);
1177                query.append(" = ");
1178                query.append(" ? ");
1179              }
1180              query.append(" and ");
1181              query.append(RepConstants.shadow_status4);
1182              query.append(" = '");
1183              query.append(RepConstants.beforeUpdate + "'");
1184              query.append(" order by ").append(RepConstants.shadow_sync_id1);
1185              Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
1186              return pub_sub_Connection.prepareStatement(query.toString());
1187            }
1188    
1189            /**
1190             * Get the updated priamry key columns values from trackpriamrykey table
1191             */
1192            private ArrayList getUpdatedPrimaryKey() throws SQLException, RepException {
1193              ResultSet rs=null;
1194              Statement stmt=null;
1195              ArrayList list = new ArrayList();
1196              try
1197              {
1198                stmt = ( (Connection) connectionPool.getConnection(local_pub_sub_name)).createStatement();
1199                rs = stmt.executeQuery(makeQueryForUpdatedPriamryKey());
1200                ResultSetMetaData metaData = rs.getMetaData();
1201                int columnCount = metaData.getColumnCount();
1202                while (rs.next()) {
1203                  Object[] columnValues = new Object[columnCount];
1204                  for (int i = 1; i <= columnCount; i++) {
1205                    columnValues[i - 1] = rs.getObject(i);
1206                  }
1207          //       System.out.println("UPDATED PRIMARY KEY ::"+Arrays.asList(columnValues));
1208                  list.add(columnValues);
1209                }
1210              } finally {
1211                try {
1212                  if (rs != null) {
1213                    rs.close();
1214                  }
1215                 if(stmt!=null)  {
1216                   stmt.close();
1217                 }
1218                }
1219                catch (SQLException ex) {
1220                  // Ignore Exception
1221                }
1222              }
1223              return list;
1224            }
1225    
1226            /**
1227             * It return false if record is not updated else return true
1228             */
1229            private boolean isPrimaryKeyUpdated(Object[] priamryColValues,ArrayList updatedPrimaryKey) throws SQLException, RepException {
1230              boolean isPrimaryKeyUpdated = false;
1231              if(updatedPrimaryKey.size()==0) {
1232                 return isPrimaryKeyUpdated;
1233              }
1234    updatedPkList: for (int i = 0; i < updatedPrimaryKey.size(); i++) {
1235                     Object[] updatedPk = (Object[]) updatedPrimaryKey.get(i);
1236                                 /* bjt fix loop */
1237    updatedPkFields: for (int j = 0; j < updatedPk.length; j++) {
1238                       //if (updatedPk[j].equals(priamryColValues[j])) {
1239                       //if (!updatedPk[j].equals(priamryColValues[j])) {
1240                       if (!updatedPk[j].toString().equals(priamryColValues[j].toString())) {
1241                         //isPrimaryKeyUpdated=true;
1242                                             continue updatedPkList; /* field does not equal, so look at next record */
1243                       }
1244                     }
1245                                     /* bjt - if we make it here, all the fields are equal, so the records match */
1246                     //if(isPrimaryKeyUpdated) {
1247                                     /* bjt */
1248                                     isPrimaryKeyUpdated = true; /* now set to true */
1249                     //updatedPrimaryKey.remove(i); /* faster if we could remove, but later records might reference this
1250                                                                    /* same primary key so we can't remove it from the array.  Previous incarnation
1251                                                                                                     * refreshed this array for every record considered, so removing was no issue.  
1252                                                                                                     * refreshing the array for every record seems like a gigantic waste of time, so
1253                                                                                                     * we don't clear this array once we've seen a record, b/c we might see it later - bjt */
1254                     break;
1255                     //}
1256                   }
1257              return isPrimaryKeyUpdated;
1258            }
1259    
1260            /*
1261             SELECT  *  FROM  Rep_Shadow_PUSHTABLE WHERE  (Rep_sync_id = (SELECT  MAX(rep_sync_id)
1262             FROM  Rep_Shadow_PUSHTABLE WHERE   rollno = 11 AND (rep_old_rollNo = 11) AND rep_Status='A'))
1263    
1264             This method is used if same record is updated many times(except primary column i.e primary
1265             column is not updated) This leads to improving performance(As suggested by parveen sir) And
1266             to avoid traversing of the syncIds which couldnt add in the viewId we use
1267             addSyncidToViewIdForSameOldPKEqualsNewPks(String primaryCols[],Object[] primaryColValues,
1268             Object MaxUID) method.
1269             */
1270            private Object[] getUIDRecordUpdatedExceptPK(String
1271                primaryCols[], Object[] primaryColValues, Object[] oldPrimaryColValues) throws SQLException, RepException {
1272              ResultSet rs = null;
1273    
1274                //check if old primary keys and new primary keys are same or not.
1275                // If Yes,then return else coontinue
1276                for (int i = 0; i < primaryColValues.length; i++) {
1277                  if (!primaryColValues[i].toString().equals(oldPrimaryColValues[i].toString()))
1278                    return null;
1279                }
1280                PSForLastRecordSameRecordUpdatedExceptPK = makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(primaryColNames);
1281                int indexPrimaryColumn = 0;
1282                for (indexPrimaryColumn = 0; indexPrimaryColumn < primaryCols.length;indexPrimaryColumn++) {
1283                  PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +1, primaryColValues[indexPrimaryColumn]);
1284                }
1285                for (int j = 0; j < primaryCols.length; j++) {
1286                  PSForLastRecordSameRecordUpdatedExceptPK.setObject(indexPrimaryColumn +j + 1, oldPrimaryColValues[j]);
1287                }
1288                rs = PSForLastRecordSameRecordUpdatedExceptPK.executeQuery();
1289                rs.next();
1290                Object Uid = rs.getObject(1);
1291                //Add the SyncIds to viewIds Which we have considered indirectly through above Query
1292                addSyncidToViewIdForSameOldPKEqualsNewPks(primaryColValues, Uid);
1293                return  new Object[]{ Uid,rs};
1294            }
1295    
1296            /*
1297                Query to get syncIds :
1298                " SELECT REP_SYNC_ID FROM SHADOW_TABLE WHERE
1299                primarycolumns=rep_old_primarycolumns  and pk=?; "
1300                SyncID which we get through this query if not
1301                already present in viewIDs and less than the MaxUId
1302                then add into viewID. MaxUId is the SyncId upto which
1303                we have considered shadow table record for that particular
1304                primary key.We require to add id into view becasue all of
1305    
1306             */
1307    
1308      private void addSyncidToViewIdForSameOldPKEqualsNewPks(Object[]
1309          primaryColValues, Object MaxUID) throws SQLException, RepException {
1310        ResultSet rs = null;
1311        try {
1312          PSToGetSyncidForSameOldPKEqualsNewPks = makeQueryToGetViewId(MaxUID);
1313          for (int i = 0; i < primaryColValues.length; i++)
1314            PSToGetSyncidForSameOldPKEqualsNewPks.setObject(i + 1,primaryColValues[i]);
1315          rs = PSToGetSyncidForSameOldPKEqualsNewPks.executeQuery();
1316          while (rs.next()) {
1317            Object Uid = rs.getObject(1);
1318            if (!viewedIds.containsKey(Uid)) {
1319    //      && ( (Number) Uid).longValue() < ( (Number) MaxUID).longValue()) {
1320              viewedIds.put(Uid, null);
1321            }
1322          }
1323        }
1324        finally {
1325          try {
1326            if (rs != null) {
1327              rs.close();
1328            }
1329            if (PSToGetSyncidForSameOldPKEqualsNewPks != null)
1330              PSToGetSyncidForSameOldPKEqualsNewPks.close();
1331          }
1332          catch (SQLException ex2) {
1333            //ignore SQLException
1334          }
1335        }
1336      }
1337    
1338      private PreparedStatement makeQueryToGetRecordIfPrimaryKeyIsNotUpdated(String[]
1339          primaryColsName) throws SQLException, RepException {
1340        StringBuffer query = new StringBuffer();
1341        query.append("SELECT  * ") /*.append(RepConstants.shadow_sync_id1).*/.
1342            append("  FROM  ").append(shadowTable).append(" WHERE ( ")
1343            .append(RepConstants.shadow_sync_id1)
1344            .append(" = (SELECT  MAX(").append(RepConstants.shadow_sync_id1)
1345            .append(") FROM ")
1346            .append(shadowTable).append(" WHERE ");
1347        for (int i = 0; i < primaryColsName.length; i++) {
1348          if (i != 0) {
1349            query.append(" AND ");
1350          }
1351          query.append(primaryColsName[i]).append("= ? ");
1352        }
1353        query.append(" AND ");
1354        for (int i = 0; i < primaryColsName.length; i++) {
1355          if (i != 0) {
1356            query.append(" AND ");
1357          }
1358          query.append(" REP_OLD_").append(primaryColsName[i]).append("= ? ");
1359        }
1360        query.append(" AND ").append(RepConstants.shadow_status4)
1361            .append(" = '").append(RepConstants.afterUpdate).append(" ')) ");
1362        Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
1363    //System.out.println("Get UID  query.toString() :: "+query.toString());
1364        return pub_sub_Connection.prepareStatement(query.toString());
1365      }
1366    
1367      private PreparedStatement makeQueryToGetViewId(Object MaxUID) throws  SQLException, RepException {
1368        StringBuffer query = new StringBuffer();
1369        query.append("SELECT ").append(RepConstants.shadow_sync_id1)
1370            .append("  FROM  ").append(shadowTable).append(" WHERE ");
1371        for (int i = 0; i < primaryColNames.length; i++) {
1372          if (i != 0)
1373            query.append(" AND ");
1374            query.append(primaryColNames[i]).append("= ").append(" REP_OLD_")
1375            .append(primaryColNames[i]);
1376        }
1377        query.append(" AND ");
1378        for (int i = 0; i < primaryColNames.length; i++) {
1379          if (i != 0)
1380            query.append(" AND ");
1381            query.append(primaryColNames[i]).append("= ?");
1382        }
1383        query.append(" AND ")
1384            .append(RepConstants.shadow_sync_id1)
1385            .append(" < ")
1386            .append( ( (Number) MaxUID).longValue());
1387        Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
1388    //System.out.println(" makeQuery To get View ID ::  "+query.toString().toUpperCase());
1389        return pub_sub_Connection.prepareStatement(query.toString());
1390      }
1391    
1392            private String makeQueryForUpdatedPriamryKey() {
1393              StringBuffer query = new StringBuffer();
1394              query.append("SELECT ");
1395              for (int i = 0; i < primaryColNames.length; i++) {
1396                if (i != 0)
1397                query.append(",");
1398                query.append(primaryColNames[i]);
1399              }
1400                  query.append(" FROM ")
1401                  .append(dbDataTypeHandler.getShadowTableName(tableName))
1402                  .append(" where ").append(RepConstants.shadow_PK_Changed)
1403                  .append(" = 'Y'");
1404    //System.out.println("  MAKE QUERY FOR UPDATED PRIMARY KEY :: "+query.toString());
1405              return query.toString();
1406            }
1407    
1408         private String makeQueryToSelectRecordsMarkedDeleted(long lastId) {
1409           StringBuffer query =new StringBuffer();
1410          query.append( "Select * from ")
1411               .append(shadowTable)
1412               .append(" where ")
1413               .append(RepConstants.shadow_sync_id1)
1414               .append(" > ")
1415               .append(lastId)
1416               .append(" and ")
1417               .append(RepConstants.shadow_operation3)
1418               .append(" ='D'  order by ")
1419               .append(RepConstants.shadow_sync_id1);
1420             return query.toString();
1421         }
1422    
1423    
1424            /**
1425             * used for debugging. shows values stored in a resultSet.
1426             * @param rs
1427             * @throws SQLException
1428             */
1429          //  public static void showResultSet(ResultSet rs) throws SQLException {
1430          //    ResultSetMetaData metaData = rs.getMetaData();
1431          //    int columnCount = metaData.getColumnCount();
1432          //    Object[] displayColumn = new Object[columnCount];
1433          //    for (int i = 1; i <= columnCount; i++)
1434          //      displayColumn[i - 1] = metaData.getColumnName(i);
1435          //    System.out.println(Arrays.asList(displayColumn));
1436          //    while (rs.next()) {
1437          //      Object[] columnValues = new Object[columnCount];
1438          //      for (int i = 1; i <= columnCount; i++)
1439          //        columnValues[i - 1] = rs.getObject(i);
1440          //      System.out.println(Arrays.asList(columnValues));
1441          //    }
1442          //  }
1443    
1444    
1445     }
1446    





























































Powered by Drupal - Theme by Danger4k