JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    package org.dbreplicator.replication.xml;
021    
022    import java.sql.*;
023    import java.util.*;
024    
025    import org.xml.sax.*;
026    import org.xml.sax.helpers.*;
027    import org.dbreplicator.replication.*;
028    import org.dbreplicator.replication.DBHandler.*;
029    import org.dbreplicator.replication.column.*;
030    import org.apache.log4j.Logger;
031    
032    /**
033     * This class is the implementation class for ContentHandler for parsing the
034     * snapshot XML file. This class implements differnt event methods which automatically
035     * are called by the parser. It implements these methods to get and use the different
036     * values stored on the XML file.
037     * These methods helps at the time of taking snapshot, as when some information is
038     * found by the parser these methods stores the relative value and perform different
039     * operations like inserting, updating and deleting records from respective tables.
040     */
041    
042    public class SnapshotHandler
043        extends DefaultHandler {
044      XMLElement currentElement;
045      Connection subConnection;
046      Statement statement;
047      Subscription subscription;
048      TreeMap treeMap;
049      PreparedStatement preparedStatement, psForUpdateBookMarksTable;
050      String remoteServerName,subName, pubName;
051      AbstractDataBaseHandler dbHandler;
052      boolean setAutoCommitFlag = true;
053      protected static Logger log = Logger.getLogger(SnapshotHandler.class.getName());
054      private boolean isFirstPass, isCurrentTableCyclic;
055      private RepTable currentRepTable;
056      private XMLElement tableElement;
057      private TreeMap allColumnsMap;
058      int batchCntr = 0; int batchMax = 0;
059    
060      /**
061       * Default Handler for parsing and reading the contents from XML file
062       * for getting Snapshot
063       * @param subConnection0
064       * @param subscription0
065       * @throws SQLException
066       */
067      public SnapshotHandler(boolean isFirstPass0, Connection subConnection0,
068                             Subscription subscription0,
069                             AbstractDataBaseHandler dbHandler0,
070                             String remoteServerName0) throws SQLException {
071        currentElement = new XMLElement("root");
072        subConnection = subConnection0;
073        statement = subConnection.createStatement();
074        subscription = subscription0;
075        dbHandler = dbHandler0;
076        remoteServerName = remoteServerName0;
077        isFirstPass = isFirstPass0;
078            //bjt
079        DatabaseMetaData dbmd = subConnection.getMetaData();
080            //batchMax = (dbmd.supportsBatchUpdates() ? 10000 : 0);
081            batchMax = 0;
082            log.debug("Batch Max = " + batchMax);
083            if (batchMax > 0) {
084                    log.debug("Using statement pooling in batches of " + batchMax);
085            } else {
086                    log.debug("NOT Using batch updates, database does not seem to support it");
087            }
088      }
089    
090      /**
091       * Initilazing an XML element and adding its childs.
092       * @param namespace
093       * @param localname
094       * @param qname
095       * @param atts
096       * @throws SAXException
097       */
098      public void startElement(String namespace, String localname, String qname, Attributes atts) throws SAXException {
099        try {
100          XMLElement childElement = null;
101    
102          /*
103            Below given if condition and structure of XML file has been changed
104            after merging of branch "Vinesreplication".
105            A new element Operation is added to make the
106            common element of row and primary key element.
107            Note : - to get the copy of old source check out
108            it from branch replicator1_9
109           */
110    
111         if ( (currentElement == null ||
112             !currentElement.elementName.equals("operation")) &&
113             qname.equals("tableName")) {
114    
115          /**
116           * In place of "XMLElement" class "XMLTableElement"
117           * class is used to control the outofmemory error.
118           * If a table have records in lakh then outofmemoryerror
119           * occurs because in "XMLElement" class one by one all the
120           *  element are added in elementlist.
121           */
122          childElement = new XMLTableElement(qname);
123    //       childElement = new XMLElement(qname);
124            tableElement = childElement;
125          }
126          else {
127            if (tableElement != null) {
128              createStatmement();
129            }
130            childElement = new XMLElement(qname);
131            tableElement = null;
132          }
133          currentElement.addChild(childElement);
134          childElement.setParentElement(currentElement);
135          childElement.addAtt(atts.getValue("name"));
136    
137          //add Encode
138          childElement.addEncodeAtt(atts.getValue("Encode"));
139          currentElement = childElement;
140        }
141        catch (NullPointerException ex) {
142          RepConstants.writeERROR_FILE(ex);
143          throw new SAXException(ex);
144        }
145        catch (Exception ex) {
146          RepConstants.writeERROR_FILE(ex);
147          throw new SAXException(ex);
148        }
149      }
150    
151      /**
152       * Called after end of an element is reached for calling delete and then insert on the respective tables on client side.
153       * @param namespace
154       * @param localname
155       * @param qname
156       * @throws SAXException
157       */
158      public void endElement(String namespace, String localname, String qname) throws SAXException {
159        try {
160          currentElement.checkEncoding();
161          XMLElement parentElement = currentElement.getParentElement();
162          //log.debug(" parentElement  "+parentElement);
163          //log.debug("qname::::"+qname);
164          /*
165            Below given if condition and structure of XML file has been changed
166            after merging of branch "Vinesreplication".
167            A new element Operation is added to make the
168            common element of row and primary key element.
169            Note : - to get the copy of old source check out
170            it from branch replicator1_9
171           */
172          if ( (parentElement == null ||
173              parentElement.elementName.equals("tableName")) &&
174              qname.equals("operation")) {
175    
176    //      currentElement = parentElement;
177    
178            createQuery();
179            currentElement.elementList.clear();
180          }
181          if ( (parentElement == null ||
182                !parentElement.elementName.equals("row") ||
183                !qname.equals("primary")) &&
184              qname.equals("tableName")) {
185            try {
186              String tableNaam = currentElement.elementValue;
187    //        String shadowTableName = RepConstants.shadow_Table(tableNaam);
188              String shadowTableName = dbHandler.getShadowTableName(tableNaam);
189              // delete all entries from Shadow Table as snapshot of table is taken by now
190              StringBuffer query = new StringBuffer();
191              query.append("SELECT MAX(").append(RepConstants.shadow_sync_id1).append(") from ").append(shadowTableName);
192              ResultSet rs = statement.executeQuery(query.toString());
193              rs.next();
194              long lastSyncId = rs.getLong(1);
195              rs.close();
196    
197              query = new StringBuffer();
198              query.append("delete from ").append(shadowTableName).append(" where ")
199                  .append(RepConstants.shadow_sync_id1).append(" != ").append(lastSyncId);
200              statement.execute(query.toString());
201    
202              String updateServerNameQuery = "update " + shadowTableName + " set " +
203                  RepConstants.shadow_serverName_n + " =  '" + remoteServerName +"'";
204              statement.execute(updateServerNameQuery);
205    
206              // updating records Lastsyncid and Concidered Id on BookMarks Table
207              if (psForUpdateBookMarksTable == null) {
208                psForUpdateBookMarksTable = makeUpdateBookMarksTable();
209              }
210              psForUpdateBookMarksTable.setLong(1, lastSyncId);
211              psForUpdateBookMarksTable.setLong(2, lastSyncId);
212              psForUpdateBookMarksTable.setString(3, subName);
213              psForUpdateBookMarksTable.setString(4, pubName);
214              psForUpdateBookMarksTable.setString(5, tableNaam);
215              int count = psForUpdateBookMarksTable.executeUpdate();
216              currentElement.elementList.clear();
217    
218              // inserting and deleting records from log Table..
219              insert_dummyRecordInLogTable();
220              deleteRecordsFromSuperLogTable(tableNaam);
221            }
222            catch (Exception ex) {
223              RepConstants.writeERROR_FILE(ex);
224              throw new SAXException(ex.getMessage(), ex);
225            }
226            if (preparedStatement != null) {
227              try {
228                            if (batchMax > 0) {
229                            log.debug("Sending last " + batchCntr + " records to database with batch processing");
230                            preparedStatement.executeBatch();
231                    setAutocomitTrueAndCommitRecord();
232                            preparedStatement.clearBatch();
233                            log.debug("Batch submitted");
234                            batchCntr = 0;
235                            }
236                preparedStatement.close();
237              }
238              catch (SQLException ex2) {
239              }
240            }
241          }
242          if (parentElement != null) {
243            currentElement = parentElement;
244          }
245        }
246        catch (ClassCastException ex) {
247          RepConstants.writeERROR_FILE(ex);
248          throw new SAXException(ex.getMessage(), ex);
249        }
250        catch (NullPointerException ex1) {
251          ex1.printStackTrace();
252          RepConstants.writeERROR_FILE(ex1);
253          throw new SAXException(ex1);
254        }
255        catch (Exception ex1) {
256          ex1.printStackTrace();
257          RepConstants.writeERROR_FILE(ex1);
258          throw new SAXException(ex1);
259        }
260      }
261    
262      /**
263       * getting the value for XML element.
264       * @param ch
265       * @param start
266       * @param len
267       * @throws SAXException
268       */
269      public void characters(char[] ch, int start, int len) throws SAXException {
270        try {
271          String elementValue = new String(ch, start, len);
272          if (elementValue.equalsIgnoreCase("") ||
273              elementValue.equalsIgnoreCase("\n")) {
274            return;
275          }
276          currentElement.setElementValue(elementValue);
277        }
278        catch (NullPointerException ex1) {
279          throw new SAXException(ex1.getMessage(), ex1);
280        }
281      }
282    
283      private void createStatmement() throws SAXException {
284        try {
285          String elementValue = currentElement.elementValue;
286          XMLElement parentElement = currentElement.getParentElement();
287          if ( (parentElement == null ||
288                !parentElement.elementName.equals("row")) &&
289              currentElement.elementName.equals("tableName")) {
290            RepTable repTable = subscription.getRepTable(elementValue);
291            isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES);
292            if (isFirstPass) {
293    //     statement.execute("DELETE FROM " + elementValue);
294    //     ResultSet rs = statement.getResultSet();
295              treeMap = repTable.getColumnTreeMap(subConnection,subscription.getDBDataTypeHandler());
296              String preparedQuery = repTable.createInsertQueryForSnapShot();
297              preparedStatement = subConnection.prepareStatement(preparedQuery);
298              //log.debug(preparedQuery);
299            }
300            else {
301              if (isCurrentTableCyclic) {
302                treeMap = repTable.getColumnTreeMap(subConnection,subscription.getDBDataTypeHandler());
303                String preparedQuery = repTable.createUpdateQueryForSnapShot();
304                preparedStatement = subConnection.prepareStatement(preparedQuery);
305              }
306            }
307            currentRepTable = repTable;
308            allColumnsMap=currentRepTable.getAllColumns();
309          }
310        }
311        catch (ClassCastException ex) {
312          RepConstants.writeERROR_FILE(ex);
313          throw new SAXException(ex.getMessage(), ex);
314        }
315        catch (NullPointerException ex1) {
316          ex1.printStackTrace();
317          log.error(ex1.getMessage(), ex1);
318          throw new SAXException(ex1.getMessage(), ex1);
319        }
320        catch (Exception ex) {
321          RepConstants.writeERROR_FILE(ex);
322          throw new SAXException(ex.getMessage(), ex);
323        }
324    
325      }
326    
327      /**
328       * creating and firing Insert Query for client subscribed tables.
329       * @throws SAXException
330       */
331      public void createQuery() throws SAXException, SQLException {
332    
333        try {
334          ArrayList elements = currentElement.getChildElements();
335    //     XMLElement  parentElement =currentElement.getParentElement();
336    //     ArrayList elements = parentElement.getChildElements();
337          ArrayList InsertElements = ( (XMLElement) elements.get(0)).getChildElements();
338    //      ArrayList InsertElements = elements;
339          //log.debug(" Is First pass  :  " + isFirstPass);
340    
341          if (isFirstPass) {
342            int j = 0;
343            for (int i = 0; i < InsertElements.size(); i++) {
344              XMLElement element = (XMLElement) InsertElements.get(i);
345              String columnName = element.elementName;
346              columnName = (String) allColumnsMap.get(columnName);
347              //log.debug(" allColumnsMap : "+allColumnsMap);
348              //String value = element.elementValue;
349              //log.debug(" XML Element : " + element);
350              //log.debug(" columnName : " + columnName);
351              //log.debug(" value :  " + value);
352              if (currentRepTable.isIgnoredColumn(columnName)) {
353                //log.debug("isIgnoredColumn: " + columnName);
354                continue;
355              }
356              //log.debug(" isCurrentTableCyclic : " + isCurrentTableCyclic);
357              //log.debug(columnName + " isForeignKeyColumn : " +currentRepTable.isForeignKeyColumn(columnName));
358              if (isCurrentTableCyclic && currentRepTable.isForeignKeyColumn(columnName)) {
359                //log.debug(" Is isCurrentTableCyclic :  YES");
360                //log.debug(" currentRepTable.isForeignKeyColumn(columnName) : "+currentRepTable.isForeignKeyColumn(columnName));
361                AbstractColumnObject columnObject = (AbstractColumnObject) treeMap.get(columnName);
362                //once setAutoCommitFlag is set to false,we shouldn't change it to true
363                // by checking for other columns for that 'if' check is used
364                checkAutocommit(columnObject);
365                columnObject.setColumnObject(preparedStatement, "NULL", j + 1);
366                //log.debug("setting  null to " + columnName);
367              }
368              else {
369                AbstractColumnObject columnObject = (AbstractColumnObject) treeMap.get(columnName);
370                //once setAutoCommitFlag is set to false,we shouldn't change it to true
371                // by checking for other columns for that 'if' check is used
372                checkAutocommit(columnObject);
373                columnObject.setColumnObject(preparedStatement, element, j + 1);
374                //log.debug("setting " + columnName + " to " + element.elementValue);
375              }
376              j++;
377            }
378    
379            try {
380                      if (batchMax > 0) {
381                            if (batchCntr < batchMax) {
382                                    preparedStatement.addBatch();
383                                    batchCntr++;
384                            } else {
385                                    log.debug("Sending next " + batchCntr + " records to database with batch processing");
386                                    preparedStatement.executeBatch();
387                            setAutocomitTrueAndCommitRecord();
388                                    preparedStatement.clearBatch();
389                                    log.debug("Batch submitted");
390                                    batchCntr = 0;
391                            }
392                      } else {
393                    preparedStatement.execute();
394                      }
395            }
396            catch (SQLException ex1) {
397              ex1.printStackTrace();
398            }
399          }
400          else {
401            //log.debug("IS CURRENT TABLE CYCLIC  "+isCurrentTableCyclic);
402            if (isCurrentTableCyclic) {
403              int columnIndex = 0;
404              String[] foreignKeyCols = currentRepTable.getForeignKeyCols();
405              for (int j = 0, size = foreignKeyCols.length; j < size; j++) {
406                for (int i = 0; i < InsertElements.size(); i++) {
407                  XMLElement element = (XMLElement) InsertElements.get(i);
408                   String ColumnName = element.elementName;
409                   ColumnName = (String) allColumnsMap.get(ColumnName);
410                  //log.debug("Second Pass " + ColumnName + " isForeignKeyColumn : " +currentRepTable.isForeignKeyColumn(ColumnName));
411                  if (ColumnName.equalsIgnoreCase(foreignKeyCols[j])) {
412                    //String value = element.elementValue;
413                    ( (AbstractColumnObject) treeMap.get(ColumnName)).setColumnObject(preparedStatement, element, columnIndex + 1);
414                    //log.debug("Second Pass ColumnName::" + ColumnName + ":" + value);
415                    //log.debug("Second Pass columnName : " + ColumnName);
416                    //log.debug("Second Pass value :  " + value);
417                    break;
418                  }
419                  //log.debug("Second Pass XML Element : " + element);
420                }
421                columnIndex++;
422              }
423              // set object for where cluase
424    
425              ArrayList primaryKeyElements = ( (XMLElement) elements.get(1)).getChildElements();
426              //log.debug("Create Query  Primary Key Element  : "+primaryKeyElements);
427              String[] primaryKeyValues = new String[primaryKeyElements.size()];
428              for (int i = 0; i < primaryKeyElements.size(); i++) {
429                String ColumnName = ( ( (XMLElement) primaryKeyElements.get(i)).getAttribute());
430                //log.debug("Create Query   Coulmns  :  "+ColumnName);
431                XMLElement prKeyValuesElement = (XMLElement) primaryKeyElements.get(i);
432                //log.debug("prKeyValuesElement.elementValue  SnapshotHandler.createQuery() "+prKeyValuesElement.elementValue);
433                primaryKeyValues[i] = prKeyValuesElement.elementValue;
434                //log.debug("treeMap::::"+treeMap);
435                ( (AbstractColumnObject) treeMap.get(ColumnName)).setColumnObject(preparedStatement, prKeyValuesElement, columnIndex + 1);
436                columnIndex++;
437              }
438              preparedStatement.execute();
439            }
440          }
441        }
442        catch (SQLException ex) {
443          //exception is dumped in case when  parent table is not included in the publisher
444          /*     RepConstants.writeERROR_FILE(ex);
445               try {
446                 if (!dbHandler.getPrimaryKeyErrorCode(ex)) {
447                   throw new SAXException(ex.getMessage(), ex);
448                 }
449              }
450              catch (SQLException ex1) {
451                  RepConstants.writeERROR_FILE(ex1);
452               } */
453        }
454        finally {
455              if (batchMax != 0) {
456          setAutocomitTrueAndCommitRecord();
457              }
458        }
459      }
460    
461      private PreparedStatement makeUpdateBookMarksTable() throws Exception {
462        StringBuffer query = new StringBuffer();
463        query.append(" UPDATE  ").append(dbHandler.getBookMarkTableName()).append(" set   ")
464            .append(RepConstants.bookmark_ConisderedId5)
465            .append(" = ? , ").append(RepConstants.bookmark_lastSyncId4).append(" = ?   where ( ")
466            .append(RepConstants.bookmark_LocalName1).append(" = ? and ")
467            .append(RepConstants.bookmark_RemoteName2).append(" = ? and ")
468            .append(RepConstants.bookmark_TableName3).append(" = ? ) ");
469        return subConnection.prepareStatement(query.toString());
470      }
471    
472      public void setPubName(String pubName0) {
473        pubName = pubName0;
474      }
475    
476      public void setSubName(String subName0) {
477        subName = subName0;
478      }
479    
480      /**
481       * deletes records from super log table
482       * @param tableName
483       * @throws java.lang.Exception
484       */
485      private void deleteRecordsFromSuperLogTable(String tableName) throws Exception {
486        Statement stmt = subConnection.createStatement();
487        try {
488          // deleting all records from super log table where tableName is passed one
489          // or for whihc xml file has been written.
490          StringBuffer query = new StringBuffer();
491          query.append("delete from ").append(dbHandler.getLogTableName()).append(" where ")
492              .append(RepConstants.logTable_tableName2).append(" = '")
493              .append(tableName).append("'");
494          stmt.executeUpdate(query.toString());
495        }
496        finally {
497          if (stmt != null)
498            stmt.close();
499        }
500      }
501    
502      /**
503       * insert a dummy record in super log table for getting a unique id after snapshot for inserting records in shadow Table.
504       * @throws SQLException
505       */
506      private void insert_dummyRecordInLogTable() throws SQLException {
507        Statement stmt = subConnection.createStatement();
508        StringBuffer query = new StringBuffer();
509        query.append("insert into ").append(dbHandler.getLogTableName()).append(" (")
510            .append(RepConstants.logTable_tableName2).append(") values  ('$$$$$$')");
511        stmt.execute(query.toString());
512        query = new StringBuffer();
513        query.append("Select max(").append(RepConstants.logTable_commonId1).append(") from ")
514            .append(dbHandler.getLogTableName());
515        ResultSet rs = stmt.executeQuery(query.toString());
516        rs.next();
517        long maxCID = rs.getLong(1);
518        rs.close();
519        query = new StringBuffer();
520        query.append("delete from ").append(dbHandler.getLogTableName())
521            .append(" where ").append(RepConstants.logTable_tableName2).append(" = '")
522            .append("$$$$$$").append("' and ").append(RepConstants.logTable_commonId1)
523            .append(" != ").append(maxCID);
524        stmt.executeUpdate(query.toString());
525      }
526    
527      public void closeAllStatementAndResultset() {
528        try {
529          if (statement != null)
530            statement.close();
531        }
532        catch (SQLException ex) {
533        }
534      }
535    
536      /**
537       * This has been implemented to handle the problem
538       * related to CLOB and BLOB data type. Postgre
539       * do not to insert LOB object in autocommit mode.
540       * @param <any> Abs
541       * @return boolean
542       */
543      private void checkAutocommit(AbstractColumnObject aco) throws SQLException {
544        if (setAutoCommitFlag) {
545          if (dbHandler instanceof PostgreSQLHandler &&
546              (aco instanceof ClobObject || aco instanceof BlobObject)) {
547            setAutoCommitFlag = false;
548            subConnection.setAutoCommit(false);
549          }
550        }
551    
552      }
553    
554      private void setAutocomitTrueAndCommitRecord() throws SQLException {
555        if (setAutoCommitFlag == false) {
556          subConnection.commit();
557          subConnection.setAutoCommit(true);
558          setAutoCommitFlag = true;
559        }
560      }
561    
562    }





























































Powered by Drupal - Theme by Danger4k