JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    package org.dbreplicator.replication.DBHandler;
021    
022    import java.sql.*;
023    import java.util.*;
024    
025    import org.dbreplicator.replication.*;
026    import org.dbreplicator.replication.column.*;
027    import org.apache.log4j.Logger;
028    
029    /**
030     * Method overrides specific to the PostgreSQL database engine, from versions 7.3 to 8.3.
031     */
032    public class PostgreSQLHandler
033        extends AbstractDataBaseHandler  {
034    
035      private String pg_publication_TableName;
036      private String pg_subscription_TableName;
037      private String pg_bookmark_TableName;
038      private String pg_rep_TableName;
039      private String pg_log_Table;
040      private String pg_schedule_Table;
041      private String pg_ignoredColumns_Table;
042      private String pg_trackReplicationTablesUpdation_Table;
043      private String pg_trackPrimaryKeyUpdation_Table;
044      protected static Logger log = Logger.getLogger(PostgreSQLHandler.class.
045                                                     getName());
046      public PostgreSQLHandler() {}
047    
048      public PostgreSQLHandler(ConnectionPool connectionPool0) {
049        connectionPool = connectionPool0;
050        vendorType = Utility.DataBase_PostgreSQL;
051        pg_publication_TableName = "public." + publication_TableName;
052        pg_subscription_TableName = "public." + subscription_TableName;
053        pg_bookmark_TableName = "public." + bookmark_TableName;
054        pg_rep_TableName = "public." + rep_TableName;
055        pg_log_Table = "public." + log_Table;
056        pg_schedule_Table = "public." + Schedule_TableName;
057        pg_ignoredColumns_Table="public."+ignoredColumns_Table;
058        pg_trackReplicationTablesUpdation_Table="public."+trackReplicationTablesUpdation_Table;
059        pg_trackPrimaryKeyUpdation_Table ="public."+trackPrimaryKeyUpdation_Table;
060      }
061    
062      protected void createSuperLogTable(String pubName) throws SQLException,
063          RepException {
064        StringBuffer logTableQuery = new StringBuffer();
065        logTableQuery.append(" Create Table ")
066            .append(pg_log_Table)
067            .append(" ( " + RepConstants.logTable_commonId1 + " bigserial , " +
068                    RepConstants.logTable_tableName2 + " varchar(255) ) ");
069        runDDL(pubName, logTableQuery.toString());
070        StringBuffer indexQuery = new StringBuffer();
071        indexQuery.append("CREATE INDEX ")
072            .append(RepConstants.log_Index)
073            .append(" ON " + getLogTableName())
074            .append("(")
075            .append(RepConstants.logTable_commonId1)
076            .append(")");
077    //System.out.println(" Create Index on LogTable : "+indexQuery.toString());
078        runDDL(pubName, indexQuery.toString());
079    
080      }
081    
082      /**
083       * Because changes has been made in structure of RepTable
084       * by Hisar team.So old method has been commented. After
085       * proper testing with all data base it should be deleted.
086       */
087    
088      /*  protected void createRepTable(String pubName) throws SQLException,
089            RepException
090        {
091            StringBuffer repTableQuery = new StringBuffer();
092            repTableQuery.append(" Create Table ").append(getRepTableName())
093       .append("(  "+RepConstants.repTable_pubsubName1+" varchar(255) ,  "+RepConstants.repTable_tableId2+"  bigserial, ")
094                .append("  "+RepConstants.repTable_tableName2+"  varchar(255) ,  "+RepConstants.repTable_filter_clause3+"  varchar(255), ")
095                .append("  "+RepConstants.repTable_conflict_resolver4+"  varchar(255) , Primary Key ( "+RepConstants.repTable_pubsubName1+" ,  "+RepConstants.repTable_tableName2+" ) ) ");
096    
097            runDDL(pubName, repTableQuery.toString());
098        } */
099    
100      protected void createRepTable(String pubName) throws SQLException,
101          RepException {
102        StringBuffer repTableQuery = new StringBuffer();
103        repTableQuery.append(" Create Table ").append(getRepTableName()).append(
104            " ( ")
105            .append(RepConstants.repTable_pubsubName1).append(" varchar(255) , ")
106            .append(RepConstants.repTable_tableId2).append("  bigserial, ")
107            .append(RepConstants.repTable_tableName2).append("  varchar(255) , ")
108            .append(RepConstants.repTable_filter_clause3).append(
109            "  varchar(255) , ")
110            .append(RepConstants.repTable_createshadowtable6).append(
111            "  char(1) Default 'Y', ")
112            .append(RepConstants.repTable_cyclicdependency7).append(
113            "  char(1) Default 'N', ")
114            .append(RepConstants.repTable_conflict_resolver4).append(
115            "  varchar(255), ")
116            .append("   Primary Key (").append(RepConstants.repTable_pubsubName1).
117            append(" , ")
118            .append(RepConstants.repTable_tableName2).append(" ) ) ");
119        runDDL(pubName, repTableQuery.toString());
120      }
121    
122      protected void createBookMarkTable(String pubName) throws SQLException,
123          RepException {
124        StringBuffer bookmarkTableQuery = new StringBuffer();
125        bookmarkTableQuery.append(" Create Table ")
126            .append(getBookMarkTableName())
127            .append(" (  " + RepConstants.bookmark_LocalName1 +
128                    "  varchar(255) ,  " + RepConstants.bookmark_RemoteName2 +
129                    "  varchar(255) , ")
130            .append("  " + RepConstants.bookmark_TableName3 + "  varchar(255) ,  " +
131                    RepConstants.bookmark_lastSyncId4 + "  bigint , ")
132            .append(
133            "  " + RepConstants.bookmark_ConisderedId5 + "  bigint ," +
134            RepConstants.bookmark_IsDeletedTable +
135            " char(1) default 'N' , Primary Key ( " +
136            RepConstants.bookmark_LocalName1 + " ,  " +
137            RepConstants.bookmark_RemoteName2 + " ,  " +
138            RepConstants.bookmark_TableName3 + " ) ) ");
139        runDDL(pubName, bookmarkTableQuery.toString());
140      }
141    
142      public void createShadowTable(String pubsubName, String tableName,
143                                    String allColseq,String[] primaryColumns) throws RepException {
144        StringBuffer shadowTableQuery = new StringBuffer();
145        shadowTableQuery.append(" Create Table ")
146            .append(RepConstants.shadow_Table(tableName))
147            .append(" (  " + RepConstants.shadow_sync_id1 + "  bigserial , ")
148            .append("    " + RepConstants.shadow_common_id2 + "   bigint , ")
149            .append("    " + RepConstants.shadow_operation3 + "   character(1) , ")
150            .append("    " + RepConstants.shadow_status4 + "   character(1) ")
151            .append(allColseq)
152            .append(" ,  " + RepConstants.shadow_serverName_n + "  varchar(255) ")
153            .append(" ,  " + RepConstants.shadow_PK_Changed + "  char(1) ) ");
154        try {
155    //System.out.println(" shadowTableQuery.toString() ="+shadowTableQuery.toString());
156          runDDL(pubsubName, shadowTableQuery.toString());
157    
158        }
159        catch (RepException ex) {
160          throw ex;
161        }
162        catch (SQLException ex) {
163    //          ex.printStackTrace();
164          // Ignore the Exception
165        }
166        createIndex(pubsubName,RepConstants.shadow_Table(tableName));
167            /* bjt - create pk index to mirror main table index, only not unique */
168            createPkIndex(pubsubName,RepConstants.shadow_Table(tableName),primaryColumns);
169      }
170    
171      protected void createScheduleTable(String subName) throws SQLException,
172          RepException {
173        StringBuffer ScheduleTableQuery = new StringBuffer();
174    
175        ScheduleTableQuery.append(" Create Table ")
176            .append(pg_schedule_Table)
177            .append(" ( " + RepConstants.schedule_Name + " varchar(255) , " +
178                    RepConstants.subscription_subName1 + " varchar(255) unique , ")
179            .append("  " + RepConstants.schedule_type + " varchar(255) , ")
180            .append(" " + RepConstants.publication_serverName3 + " varchar (255) ," +
181                    RepConstants.publication_portNo + " varchar(255) ,")
182            .append(" " + RepConstants.recurrence_type + " varchar(255) , " +
183                    RepConstants.replication_type + " varchar(255) ,")
184            .append(" " + RepConstants.schedule_time + " bigint , ")
185            .append(" " + RepConstants.schedule_counter + " bigint , Primary Key (" +
186                    RepConstants.schedule_Name + " , " +
187                    RepConstants.subscription_subName1 + ") ) ");
188        runDDL(subName, ScheduleTableQuery.toString());
189      }
190    
191      public void createShadowTableTriggers(String pubsubName, String tableName,
192                                            ArrayList colInfoList,
193                                            String[] primCols) throws RepException {
194    
195        String serverName = getLocalServerName();
196    //    RepPrinter.print(" Columns are :::::: "  + java.util.Arrays.asList(columnTypeInfoMap.keySet().toArray(new String[0])));
197    //    String[] colNames = (String[]) columnTypeInfoMap.keySet().toArray(new String[0]);
198        int size = colInfoList.size();
199        String[] colNames = new String[size];
200        for (int i = 0; i < size; i++) {
201          colNames[i] = ( (ColumnsInfo) colInfoList.get(i)).getColumnName();
202        }
203        //RepPrinter.print(" Columns are :::::: "  + java.util.Arrays.asList(colNames));
204        String colNameSeq = getColumnNameSequence(colNames, "").toString();
205        String colNameSeqPrefixOldRow = getColumnNameSequence(colNames, "old.").
206            toString();
207        String colNameSeqPrefixNewRow = getColumnNameSequence(colNames, "new.").
208            toString();
209        String shadowTableName = RepConstants.shadow_Table(tableName);
210        String primColumnNamesSeq = getColumnNameSequence(primCols, "rep_old_");
211        String primColNameSeqPrefixOldRow = getColumnNameSequence(primCols, "old.").toString();
212        String primColNameSeqPrefixNewRow = getColumnNameSequence(primCols, "new.").toString();
213    
214        String[] primColsOld = getColumnNameWithOldOrNewPrefix(primCols, "old.");
215        String[] primColsNew = getColumnNameWithOldOrNewPrefix(primCols, "new.");
216    
217    //create trigger abc after delete on t2 for each row  execute procedure delete_insert();
218        String table = tableName.substring(tableName.indexOf('.') + 1);
219        StringBuffer insTriggerQuery = new StringBuffer();
220        insTriggerQuery.append(" Create trigger ")
221            .append(getInsertTriggerName(table))
222            .append(" after insert on ").append(tableName)
223            .append(" For each Row execute procedure  insert_" + table + "()");
224    
225        StringBuffer delTriggerQuery = new StringBuffer();
226        delTriggerQuery.append(" Create trigger ")
227            .append(getDeleteTriggerName(table))
228            .append(" after delete on ").append(tableName)
229            .append(" For each Row execute  procedure  delete_" + table + "()");
230    
231        StringBuffer updTriggerQuery = new StringBuffer();
232        updTriggerQuery.append(" Create trigger ")
233            .append(getUpdateTriggerName(table))
234            .append(" after update on  ").append(tableName)
235            .append(" For each Row  execute  procedure  update_" + table + "()");
236    
237        try {
238          runDDL(pubsubName, createFunctionHandler());
239        }
240        catch (RepException ex1) {
241        }
242        catch (SQLException ex1) {
243        }
244    
245        try {
246          runDDL(pubsubName, createFunctionLanguage());
247        }
248        catch (RepException ex2) {
249        }
250        catch (SQLException ex2) {
251    
252        }
253    
254        try {
255          runDDL(pubsubName,
256                 functionForInsertTrigger(tableName, shadowTableName,
257                                          primColumnNamesSeq,
258                                          colNameSeq, colNameSeqPrefixNewRow,
259                                          primColNameSeqPrefixNewRow, serverName));
260        }
261        catch (RepException ex) {
262          throw ex;
263        }
264        catch (SQLException ex) {
265    //          ex.printStackTrace();
266          // Ignore Exception
267        } try {
268               runDDL(pubsubName,
269                 functionForDeleteTrigger(tableName, colNameSeqPrefixOldRow,
270                                          primColNameSeqPrefixOldRow,
271                                          shadowTableName, colNameSeq,
272                                          primColumnNamesSeq, serverName));
273    
274        }
275        catch (RepException ex) {
276          throw ex;
277        }
278        catch (SQLException ex) {
279    //          ex.printStackTrace();
280          // Ignore Exception
281        }
282        try {
283                 runDDL(pubsubName,
284                    functionForUpdateTrigger(tableName, colNameSeqPrefixOldRow,
285                                             primColNameSeqPrefixOldRow,
286                                             shadowTableName, colNameSeq,
287                                             primColumnNamesSeq, serverName,
288                                             colNameSeqPrefixNewRow,primColsOld,primColsNew));
289    
290             }
291           catch (RepException ex) {
292             throw ex;
293           }
294           catch (SQLException ex) {
295    //          ex.printStackTrace();
296             // Ignore Exception
297           }
298           try {
299                    runDDL(pubsubName, insTriggerQuery.toString());
300              }
301              catch (RepException ex) {
302                throw ex;
303              }
304              catch (SQLException ex) {
305    //          ex.printStackTrace();
306                // Ignore Exception
307              }
308              try {
309                   runDDL(pubsubName, delTriggerQuery.toString());
310                 }
311                 catch (RepException ex) {
312                   throw ex;
313                 }
314                 catch (SQLException ex) {
315    //          ex.printStackTrace();
316                   // Ignore Exception
317                 }
318                 try {
319                      runDDL(pubsubName, updTriggerQuery.toString());
320                    }
321                    catch (RepException ex) {
322                      throw ex;
323                    }
324                    catch (SQLException ex) {
325    //          ex.printStackTrace();
326                      // Ignore Exception
327                    }
328    }
329    
330      public String createFunctionHandler() {
331        StringBuffer sb = new StringBuffer();
332        sb.append("CREATE FUNCTION plpgsql_call_handler() RETURNS language_handler")
333            .append("  AS '$libdir/plpgsql' ")
334            .append(" LANGUAGE C ");
335        return sb.toString();
336      }
337    
338      public String createFunctionLanguage() {
339        StringBuffer sb = new StringBuffer();
340        sb.append("CREATE LANGUAGE plpgsql ")
341            .append(" HANDLER plpgsql_call_handler");
342        return sb.toString();
343      }
344    
345      public String functionForInsertTrigger(String tableName,
346                                             String shadowTableName,
347                                             String primColumnNamesSeq,
348                                             String colNameSeq,
349                                             String colNameSeqPrefixNewRow,
350                                             String primColNameSeqPrefixNewRow,
351                                             String serverName) {
352    
353        String table = tableName.substring(tableName.indexOf('.') + 1);
354        StringBuffer sb = new StringBuffer();
355        sb.append("CREATE FUNCTION " + "\"" + "insert_" + table + "\"" +
356                  "() RETURNS Trigger AS '")
357            .append(" BEGIN ")
358            .append(insertRecordIntoLogTable(tableName)).append(" Insert Into ")
359            .append(shadowTableName).append(" ( ")
360            .append(RepConstants.shadow_common_id2).append(", ")
361            .append(RepConstants.shadow_operation3).append(", ")
362            .append(RepConstants.shadow_status4).append(", ")
363            .append(colNameSeq).append(primColumnNamesSeq)
364            .append(RepConstants.shadow_serverName_n)
365            .append(" ) Values ( null , ''I'' , null , ")
366            .append(colNameSeqPrefixNewRow).append(primColNameSeqPrefixNewRow)
367            .append(" ''").append(serverName).append("'') ; ")
368            .append("  RETURN NULL;")
369            .append(" END;")
370            .append(" ' LANGUAGE 'plpgsql' VOLATILE ");
371        return sb.toString();
372      }
373    
374      public String functionForUpdateTrigger(String tableName,
375                                             String colNameSeqPrefixOldRow,
376                                             String primColNameSeqPrefixOldRow,
377                                             String shadowTableName,
378                                             String colNameSeq,
379                                             String primColumnNamesSeq,
380                                             String serverName,
381                                             String colNameSeqPrefixNewRow,String[] primColsOld,String[] primColsNew) {
382        String table = tableName.substring(tableName.indexOf('.') + 1);
383        StringBuffer sb = new StringBuffer();
384        sb.append("CREATE FUNCTION " + "\"" + "update_" + table + "\"" +
385                  "() RETURNS Trigger AS '")
386            .append(" Declare ")
387            .append(" maxlogid bigint; pkchanged char(1); ")
388            .append(" BEGIN ")
389            .append(insertRecordIntoLogTable(tableName))
390            .append(" Select max( " + RepConstants.logTable_commonId1 +
391                    " ) into maxlogid from ")
392            .append(pg_log_Table).append(" ; ")
393            .append(" if( ");
394            for (int i = 0; i < primColsOld.length; i++) {
395              if (i != 0)
396                sb.append(" or "); // bjt - changed from 'and' to 'or'.  if ANY of the values change, the pk changes
397                sb.append(primColsOld[i] )
398                  .append("!=" )
399                  .append(primColsNew[i]);
400            }
401            sb.append(" ) then ")
402            .append(" pkchanged = ''Y''; end if;") // bjt - need two single quotes instead of one
403            .append(" Insert Into ")
404            .append(shadowTableName).append(" ( ")
405            .append(RepConstants.shadow_common_id2).append(", ")
406            .append(RepConstants.shadow_operation3).append(", ")
407            .append(RepConstants.shadow_status4).append(", ")
408            .append(colNameSeq).append(primColumnNamesSeq)
409            .append(RepConstants.shadow_serverName_n)
410            .append(" ) Values ( maxlogid , ''U'' , ''B'' , ")
411            .append(colNameSeqPrefixOldRow).append(primColNameSeqPrefixOldRow)
412            .append(" ''").append(serverName).append("'') ; Insert Into ")
413            .append(shadowTableName).append(" ( ")
414            .append(RepConstants.shadow_common_id2).append(", ")
415            .append(RepConstants.shadow_operation3).append(", ")
416            .append(RepConstants.shadow_status4).append(", ")
417            .append(colNameSeq).append(primColumnNamesSeq)
418            .append(RepConstants.shadow_serverName_n).append(" , ")
419            .append(RepConstants.shadow_PK_Changed)
420            .append(" ) Values ( maxlogid , ''U'' , ''A'' , ")
421            .append(colNameSeqPrefixNewRow).append(primColNameSeqPrefixOldRow)
422            .append(" ''").append(serverName).append("'',pkchanged) ;")
423            .append(" RETURN NULL;")
424            .append(" END;")
425            .append(" ' LANGUAGE 'plpgsql' VOLATILE ");
426        return sb.toString();
427      }
428    
429      public String functionForDeleteTrigger(String tableName,
430                                             String colNameSeqPrefixOldRow,
431                                             String primColNameSeqPrefixOldRow,
432                                             String shadowTableName,
433                                             String colNameSeq,
434                                             String primColumnNamesSeq,
435                                             String serverName) {
436    
437        String table = tableName.substring(tableName.indexOf('.') + 1);
438        StringBuffer sb = new StringBuffer();
439        sb.append("CREATE FUNCTION  " + "\"" + "delete_" + table + "\"" +
440                  "() RETURNS Trigger AS '")
441            .append(" BEGIN ")
442            .append(insertRecordIntoLogTable(tableName)).append("  Insert Into ")
443            .append(shadowTableName).append(" ( ")
444            .append(RepConstants.shadow_common_id2).append(", ")
445            .append(RepConstants.shadow_operation3).append(", ")
446            .append(RepConstants.shadow_status4).append(", ")
447            .append(colNameSeq).append(primColumnNamesSeq)
448            .append(RepConstants.shadow_serverName_n)
449            .append(" ) Values ( null , ''D'' , null , ")
450            .append(colNameSeqPrefixOldRow).append(primColNameSeqPrefixOldRow)
451            .append("''").append(serverName).append("'') ; ")
452            .append(" RETURN NULL ;")
453            .append(" END;")
454            .append("'  LANGUAGE 'plpgsql' VOLATILE ");
455        return sb.toString();
456      }
457    
458      public String insertRecordIntoLogTable(String tableName) {
459        StringBuffer insertLogTable = new StringBuffer();
460        insertLogTable.append(" Insert into ")
461            .append(pg_log_Table)
462            .append(" ( ").append(RepConstants.logTable_tableName2)
463            .append(" ) values ( ''")
464            .append(tableName).append("''); ");
465        return insertLogTable.toString();
466    
467      }
468    
469      private String getInsertTriggerName(String schematable) {
470        String tableInsert;
471        int index = schematable.indexOf('.');
472        if (index == -1) {
473          tableInsert = schematable;
474        }
475        tableInsert = schematable.substring(schematable.indexOf(".") + 1).trim();
476        return getObjectName(tableInsert, "tri_");
477      }
478    
479      private String getDeleteTriggerName(String schematable) {
480        String tableDelete;
481        int index = schematable.indexOf('.');
482        if (index == -1) {
483          tableDelete = schematable;
484        }
485        tableDelete = schematable.substring(schematable.indexOf(".") + 1).trim();
486        return getObjectName(tableDelete, "trd_");
487      }
488    
489      private String getUpdateTriggerName(String schematable) {
490        String tableUpdate;
491        int index = schematable.indexOf('.');
492        if (index == -1) {
493          tableUpdate = schematable;
494        }
495        tableUpdate = schematable.substring(schematable.indexOf(".") + 1).trim();
496        return getObjectName(tableUpdate, "tru_");
497      }
498    
499      public void dropTriggersAndShadowTable(Connection connection, String table,
500                                             String pubsubName) throws SQLException,
501          RepException {
502        String tableName = table.substring(table.indexOf('.') + 1);
503        String insertTriggerName = getInsertTriggerName(tableName);
504        String updateTriggerName = getDeleteTriggerName(tableName);
505        String DeleteTriggerName = getUpdateTriggerName(tableName);
506    
507        fireDropQuery(connection,
508                      " drop trigger " + insertTriggerName + " on " + table);
509    
510        fireDropQuery(connection,
511                      " drop trigger " + updateTriggerName + " on " + table);
512    
513        fireDropQuery(connection,
514                      " drop trigger " + DeleteTriggerName + " on " + table);
515    
516        fireDropQuery(connection, " drop table  " + RepConstants.shadow_Table(table));
517    
518        fireDropQuery(connection, " drop function " + "insert_" + tableName + "()");
519    
520        fireDropQuery(connection, " drop function " + "delete_" + tableName + "()");
521    
522        fireDropQuery(connection, " drop function " + "update_" + tableName + "()");
523    
524        fireDropQuery(connection, " delete from " + getLogTableName() + " where " +
525                      RepConstants.logTable_tableName2 + " = '" + table + "'");
526      }
527    
528      private static String getObjectName(String schematable, String prefix) {
529        int index = schematable.indexOf('.');
530        if (index == -1) {
531          return prefix + schematable;
532        }
533        String schema = schematable.substring(0, index);
534        String table = schematable.substring(index + 1);
535        return schema + "." + prefix + table;
536      }
537    
538      public void createSubscribedTablesTriggersAndShadowTables(String subName,
539          String[] pubTableQueries, HashMap primCols, int pubVendorType,ArrayList repTables) throws
540          RepException,SQLException {
541        Connection connection = connectionPool.getConnection(subName);
542            MetaDataInfo mdi = Utility.getDatabaseMataData(connection);
543        Statement stt = connection.createStatement();
544        try {
545          for (int i = 0; i < pubTableQueries.length; i++) {
546            try {
547              stt.execute(pubTableQueries[i].toLowerCase());
548            }
549            catch (SQLException ex) {
550              SchemaQualifiedName sname = new SchemaQualifiedName(mdi,
551                  getTableName(pubTableQueries[i]));
552              String existing = null;
553              try {
554                existing = mdi.getExistingTableQuery(this, sname, pubVendorType).
555                    toLowerCase().replaceAll(" ", "");
556              }
557              catch (RepException ex1) {
558                if (ex1.getRepCode().equalsIgnoreCase("REP033")) {
559                  throw ex;
560                }
561              }
562              if (pubTableQueries[i].toLowerCase().replaceAll(" ",
563                  "").indexOf(existing.toLowerCase()) != 0) {
564                throw new RepException("REP024", new Object[] {subName,
565                                       sname.toString()});
566              }
567              // Ignore the exception if the same table exists with same colums and primary key
568            }
569          }
570        }
571        finally {
572          if (stt != null)
573            stt.close();
574                    connectionPool.returnConnection(connection);
575        }
576        // Just Get the Column Sequenes from Create Table Queries
577        // and create Shadow Table and Triggers on Shadow Table
578        for (int i = 0; i < pubTableQueries.length; i++) {
579          String tableName = getTableName(pubTableQueries[i]);
580          SchemaQualifiedName sname = new SchemaQualifiedName(mdi,
581              getTableName(pubTableQueries[i]));
582          String colSeqenceWithDataType = getColumnSequenceWithDataTypes(
583              pubTableQueries[i]);
584          ArrayList colInfoList = getColumnNamesAndDataTypes(colSeqenceWithDataType.
585              trim());
586          String[] pcols = (String[]) ( (ArrayList) primCols.get(tableName.
587              toLowerCase())).toArray(new String[0]);
588          String allColSequence = getShadowTableColumnDataTypeSequence(colInfoList,
589              pcols,(RepTable)repTables.get(i));
590          createShadowTable(subName, sname.toString(), allColSequence,pcols);
591          makeProvisionForLOBDataTypes(colInfoList);
592          createShadowTableTriggers(subName, sname.toString(), colInfoList, pcols);
593          //createShadowTablesAndSubscriptionTableTriggers(subName,tableName,colSeqenceWithDataType,(ArrayList) primCols.get(tableName.toLowerCase()));
594        }
595      }
596    
597      public boolean isDataTypeOptionalSizeSupported(TypeInfo typeInfo) {
598        int sqlType = typeInfo.getSqlType();
599        String typeName = typeInfo.getTypeName();
600        switch (sqlType) {
601          case 4:
602          case 5:
603          case 7:
604          case -4:
605          case -5:
606          case -7:
607          case 91:
608          case 92:
609          case 93:
610          case -6:
611          case 8:
612          case -2:
613          case -3:
614          case 2000:
615          case 16:
616          case 2004:
617          case 2005:
618          case 1111:
619          case 2006:
620          case -1:
621    
622    //            case 1 :
623            return false;
624    
625          case 12:
626            if (typeInfo.getTypeName().equalsIgnoreCase("text")) {
627              return false;
628            }
629          default:
630            return true;
631        }
632      }
633    
634      public void setTypeInfo(TypeInfo typeInfo, ResultSet rs) throws RepException,
635          SQLException {
636    //System.out.println("PostgreSQLHandler = "+typeInfo);
637        switch (typeInfo.getSqlType()) {
638          case Types.BOOLEAN:
639            typeInfo.setTypeName("boolean");
640            break; // //16;
641          case Types.BIT:
642            if (typeInfo.getTypeName().equalsIgnoreCase("bool")) {
643              typeInfo.setTypeName("bool");
644              return;
645            }
646            typeInfo.setTypeName("bit");
647            break; // //-7;
648          case Types.TINYINT:
649            typeInfo.setTypeName("int2");
650            break; //-6;
651          case Types.SMALLINT:
652            typeInfo.setTypeName("int2");
653            break; // // 5;
654          case Types.INTEGER:
655            typeInfo.setTypeName("int4");
656            break; // // 4;
657          case Types.BIGINT:
658            typeInfo.setTypeName("int8");
659            break; // //-5;
660          case Types.FLOAT:
661            typeInfo.setTypeName("numeric");
662            break; // // 6;
663          case Types.REAL:
664            typeInfo.setTypeName("float4");
665            break; // // 7;
666          case Types.DOUBLE:
667            typeInfo.setTypeName("float8");
668            break; // // 8;
669          case Types.NUMERIC:
670            typeInfo.setTypeName("numeric");
671            break; // // 2;
672          case Types.DECIMAL:
673            typeInfo.setTypeName("numeric");
674            break; // // 3;
675          case Types.CHAR:
676            typeInfo.setTypeName(" character");
677            break; // // 1;
678          case Types.VARCHAR:
679            if (typeInfo.getTypeName().equalsIgnoreCase("text")) {
680              typeInfo.setTypeName("text");
681              return;
682            }
683            typeInfo.setTypeName("varchar");
684            break; // //12;
685          case Types.LONGVARCHAR:
686            typeInfo.setTypeName("text");
687            break; // //-1;
688          case Types.DATE:
689            typeInfo.setTypeName("date");
690            break; // //91;
691          case Types.TIME:
692            typeInfo.setTypeName("time");
693            break; // //92;
694          case Types.TIMESTAMP:
695            if (typeInfo.getTypeName().equalsIgnoreCase("DATE")) {
696              typeInfo.setTypeName("DATE");
697              typeInfo.setSqlType(Types.DATE);
698              return; //  92
699            }
700            typeInfo.setTypeName("timestamp");
701            break; // //93;
702          case Types.BINARY:
703            typeInfo.setTypeName("bytea");
704            break; // //-2;
705          case Types.VARBINARY:
706    
707    //                typeInfo.setTypeName("bytea");
708            // commented by sunil to test the
709            // case of RAW data type in oracle
710            if (typeInfo.getTypeName().equalsIgnoreCase("RAW")) {
711              typeInfo.setTypeName("text");
712              typeInfo.setSqlType(Types.VARCHAR);
713              return; //  12
714            }
715            typeInfo.setTypeName("bytea");
716            break; // //-3;
717          case Types.LONGVARBINARY:
718            typeInfo.setTypeName("bytea");
719            break; // //-4;
720          case Types.BLOB:
721            typeInfo.setTypeName("bytea");
722            break; // //2004;
723          case Types.CLOB:
724            typeInfo.setTypeName("text");
725            break; //2005;
726          case Types.OTHER:
727            if (typeInfo.getTypeName().equalsIgnoreCase("FLOAT")) { //1111
728              typeInfo.setTypeName("numeric");
729              typeInfo.setSqlType(Types.FLOAT);
730              return; //  6
731            }
732            else if (typeInfo.getTypeName().equalsIgnoreCase("clob")) {
733              typeInfo.setTypeName("text");
734              typeInfo.setSqlType(Types.CLOB); //2005
735              return;
736            }
737            else if (typeInfo.getTypeName().equalsIgnoreCase("blob")) {
738              throw new RepException("REP031", new Object[] {typeInfo.getTypeName()});
739            }
740            typeInfo.setTypeName("text");
741            break; //1111;
742          case Types.NULL: //0
743          case Types.DISTINCT: //2001;
744          case Types.STRUCT: //2002;
745          case Types.ARRAY: //2003;
746          case Types.DATALINK: //70;
747          case Types.REF:
748          case Types.JAVA_OBJECT:
749    //      case Types.BLOB           :
750          default:
751            throw new RepException("REP031", new Object[] {typeInfo.getTypeName()});
752        }
753      }
754    
755      public AbstractColumnObject getColumnObject(TypeInfo typeInfo) throws
756          RepException {
757    //System.out.println("PostgreSQL  typeInfo :: "+typeInfo);
758        int sqlType = typeInfo.getSqlType();
759        switch (sqlType) {
760          case 1: // char
761          case -1: // long varchar
762            return new StringObject(sqlType,this);
763          case 12: // char varying
764            if (typeInfo.getTypeName().equalsIgnoreCase("text")) {
765              return new ClobStreamObject(sqlType,this);
766            }
767            else {
768              return new StringObject(sqlType,this);
769            }
770          case 2005: // clob
771          case 1111:
772          case 2000: // long varbinary
773          case 2006: //
774            return new ClobStreamObject(sqlType,this);
775          case -3: // bitvarying
776          case 2004: // blob
777          case -2: // binary
778          case -4: // long varbiary
779            return new BlobObject(sqlType,this);
780          case 4: // int
781          case 5: // small int
782          case -6: // tinyint
783            return new IntegerObject(sqlType,this);
784          case -5: // long
785            return new LongObject(sqlType,this);
786          case 3: // decimal
787          case 8: // double precision
788          case 2: // numeric
789          case 6: // float
790            return new DoubleObject(sqlType,this);
791          case 7: // real
792            return new FloatObject(sqlType,this);
793          case -7: // bit
794          case 16: // boolean
795            return new BooleanObject(sqlType,this);
796          case 91: // date
797            return new DateObject(sqlType,this);
798          case 92: // time
799            return new TimeObject(sqlType,this);
800          case 93: // time stamp
801            return new TimeStampObject(sqlType,this);
802          default:
803            throw new RepException("REP031", new Object[] {new Integer(sqlType)});
804        }
805      }
806    
807      public void createSchemas(String pubName, ArrayList schemas) throws
808          SQLException, RepException {
809        int num = schemas.size();
810        if (num == 0) {
811          return;
812        }
813        Connection con = connectionPool.getConnection(pubName);
814        String authorization = connectionPool.getUserName();
815        for (int i = 0; i < num; i++) {
816          String schemaName = (String) schemas.get(i);
817          if (schemaName.equalsIgnoreCase("users")) {
818            continue;
819          }
820          StringBuffer sb = new StringBuffer();
821          sb.append(" Create Schema ").append(schemaName)
822              .append(" authorization ").append(authorization);
823          try {
824            runDDL(pubName, sb.toString());
825          }
826          catch (SQLException ex) {
827            RepConstants.writeERROR_FILE(ex);
828            // Ignore the Exception
829          }
830          catch (RepException ex) {
831                    connectionPool.returnConnection(con);
832            throw ex;
833          }
834        }
835            connectionPool.returnConnection(con);
836      }
837    
838      public String getPublicationTableName() {
839        return pg_publication_TableName;
840      }
841    
842      public String getSubscriptionTableName() {
843        return pg_subscription_TableName;
844      }
845    
846      public String getRepTableName() {
847        return pg_rep_TableName;
848      }
849    
850      public String getLogTableName() {
851        return pg_log_Table;
852      }
853    
854      public String getBookMarkTableName() {
855        return pg_bookmark_TableName;
856      }
857    
858      public boolean isColumnSizeExceedMaximumSize(TypeInfo typeInfo) throws
859          SQLException, RepException {
860        boolean flag = false;
861        int sqlType = typeInfo.getSqlType();
862        int columnsize = typeInfo.getcolumnSize();
863        switch (sqlType) {
864          case 1: // char
865          case 12: //varchar
866            if (columnsize > 4192) {
867              flag = true;
868              break;
869            }
870          case -6: // tinyint
871            if (columnsize > 127) {
872              flag = true;
873              break;
874            }
875        }
876        return flag;
877      }
878    
879      public boolean isColumnSizeExceedMaximumSize1(ResultSet rs, TypeInfo typeInfo) throws
880          SQLException, RepException {
881        boolean flag = false;
882        int sqlType = typeInfo.getSqlType();
883        int columnsize = rs.getInt("COLUMN_SIZE");
884        switch (sqlType) {
885          case 1: // char
886          case 12: //varchar
887            if (columnsize > 4192) {
888              flag = true;
889              break;
890            }
891          case -6: // tinyint
892            if (columnsize > 127) {
893              flag = true;
894              break;
895            }
896        }
897        return flag;
898      }
899    
900      public boolean getPrimaryKeyErrorCode(SQLException ex) throws SQLException {
901        if (ex.getMessage().indexOf("_pkey") != -1) {
902          return true;
903        }
904        return false;
905      }
906    
907      public int getAppropriatePrecision(int columnSize, String datatypeName) {
908        if (datatypeName.equalsIgnoreCase("numeric") && columnSize > 38) {
909          columnSize = 38;
910        }
911            // new jdbc driver for postgres returns 2147483647 as size of character varying fields with no predetermined size
912            if (datatypeName.equalsIgnoreCase("varchar") && columnSize > 2147483646)
913              columnSize = 0;
914        return columnSize;
915    
916      }
917    
918      protected void createIndex(String pubsubName, String tableName) throws
919          RepException {
920        StringBuffer createIndexQuery = new StringBuffer();
921    //      create index ind on cmsadm2.R_S_Bank(Rep_sync_id);
922        createIndexQuery.append("create index  ")
923            .append(RepConstants.Index_Name(tableName))
924            .append(" on ")
925            .append(tableName)
926            .append("(")
927            .append(RepConstants.shadow_sync_id1)
928                    .append(",")
929            .append(RepConstants.shadow_common_id2)
930                    .append(",")
931            .append(RepConstants.shadow_status4)
932            .append(")");
933    // System.out.println(" createIndexQuery : "+createIndexQuery.toString());
934        try {
935          runDDL(pubsubName, createIndexQuery.toString());
936        }
937        catch (RepException ex) {
938          // Ignore the Exception
939        }
940        catch (SQLException ex) {
941          // Ignore the Exception
942        }
943      }
944    
945      /* bjt - create pk index to mirror main table index, only not unique */
946      protected void createPkIndex(String pubsubName, String tableName, String[] primaryColumns) throws
947          RepException {
948        StringBuffer createPkIndexQuery = new StringBuffer();
949        createPkIndexQuery.append("create index  ")
950            .append(RepConstants.Pk_Index_Name(tableName))
951            .append(" on ")
952            .append(tableName)
953            .append("(");
954                    int i;
955                    for (i = 0; i < primaryColumns.length - 1; i++) {
956                    createPkIndexQuery.append(primaryColumns[i]);
957                    createPkIndexQuery.append(",");
958                    }
959            createPkIndexQuery.append(primaryColumns[i]);
960            createPkIndexQuery.append(")");
961     System.out.println(" createPkIndexQuery : "+createPkIndexQuery.toString());
962        try {
963          runDDL(pubsubName, createPkIndexQuery.toString());
964        }
965        catch (RepException ex) {
966          // Ignore the Exception
967        }
968        catch (SQLException ex) {
969          // Ignore the Exception
970        }
971      }
972    
973      //scale 0 to 23 only
974      public int getAppropriateScale(int columnScale) throws RepException {
975        if (columnScale < 0) {
976          throw new RepException("REP026", new Object[] {"1", "23"});
977        }
978        else if (columnScale >= 23) {
979          columnScale = 23;
980        }
981        else if (columnScale >= 0 && columnScale < 23)
982          columnScale = columnScale;
983        log.debug("returning columnScale::" + columnScale);
984        return columnScale;
985      }
986    
987      public PreparedStatement makePrimaryPreperedStatement(Connection pub_sub_Connection, String[] primaryColumns, String shadowTable, String local_pub_sub_name) throws SQLException, RepException {
988                      /* bjt */
989        StringBuffer query = new StringBuffer();
990       query.append(" select * from ");
991       query.append(shadowTable);
992       query.append(" where ");
993       query.append(RepConstants.shadow_sync_id1);
994       query.append(" > ");
995       query.append(" ? ");
996       for (int i = 0; i < primaryColumns.length; i++) {
997         query.append(" and ");
998         query.append(primaryColumns[i]);
999         query.append("= ? ");
1000       }
1001       query.append(" order by " + RepConstants.shadow_sync_id1);
1002       query.append(" limit 1 ");
1003        return pub_sub_Connection.prepareStatement(query.toString());
1004      }
1005    
1006    
1007    
1008      public boolean isForeignKeyException(SQLException ex) throws SQLException {
1009        //System.out.println(" ex Message  : "+ex.getMessage()+" ex Error code : "+ex.getErrorCode());
1010        if (ex.getErrorCode() == 0)
1011          return true;
1012        else
1013          return false;
1014      }
1015    
1016      /**
1017       * isPrimaryKeyException
1018       *
1019       * @param ex SQLException
1020       * @return boolean
1021       */
1022      public boolean isPrimaryKeyException(SQLException ex) throws SQLException
1023       {
1024           if (ex.getMessage().indexOf("_pkey") != -1)
1025           {
1026               return true;
1027           }
1028           return false;
1029       }
1030    
1031       public String getIgnoredColumns_Table() {
1032         return pg_ignoredColumns_Table;
1033       }
1034    
1035       public String getTrackReplicationTablesUpdation_Table() {
1036         return pg_trackReplicationTablesUpdation_Table;
1037       }
1038    
1039       public String getTrackPrimayKeyUpdation_Table() {
1040        return pg_trackPrimaryKeyUpdation_Table;
1041      }
1042    
1043       protected void createIgnoredColumnsTable(String pubName) throws SQLException,
1044           RepException {
1045         StringBuffer ignoredColumnsQuery = new StringBuffer();
1046         ignoredColumnsQuery.append(" Create Table ").append(getIgnoredColumns_Table()).
1047             append(" ( ")
1048             .append(RepConstants.ignoredColumnsTable_tableId1).append("  int , ")
1049             .append(RepConstants.ignoredColumnsTable_ignoredcolumnName2).append(
1050             "  varchar(255) , ")
1051             .append(" Primary Key (").append(RepConstants.
1052                                                ignoredColumnsTable_tableId1).append(
1053             " , ")
1054             .append(RepConstants.ignoredColumnsTable_ignoredcolumnName2).append(
1055             " ) ) ");
1056         runDDL(pubName, ignoredColumnsQuery.toString());
1057       }
1058    
1059       protected void createTrackReplicationTablesUpdationTable(String pubSubName) throws
1060             RepException, SQLException {
1061           StringBuffer trackRepTablesUpdationQuery = new StringBuffer();
1062           trackRepTablesUpdationQuery.append(" CREATE  TABLE ").append(getTrackReplicationTablesUpdation_Table()).append(" ( " +
1063               RepConstants.trackUpdation + " smallint  PRIMARY KEY) ");
1064           runDDL(pubSubName, trackRepTablesUpdationQuery.toString());
1065           runDDL(pubSubName,"Insert into "+getTrackReplicationTablesUpdation_Table()+" values(1)" );
1066         }
1067       //implement this method for providing provision to stop updations done on shadow table
1068         protected  void createTriggerForTrackReplicationTablesUpdationTable(String
1069              pubSubName) throws RepException, SQLException {
1070        /*    StringBuffer trackRepTablesUpdationTriggerQuery = new StringBuffer();
1071            trackRepTablesUpdationTriggerQuery.append(" CREATE  TRIGGER TRI_")
1072                .append(getTrackReplicationTablesUpdation_Table()).append(
1073                    " ON " + getTrackReplicationTablesUpdation_Table())
1074                .append(" AFTER INSERT AS  DELETE FROM " +
1075                        getTrackReplicationTablesUpdation_Table() + " WHERE ")
1076                .append(RepConstants.trackUpdation + " NOT IN(SELECT * FROM inserted)");
1077            runDDL(pubSubName, trackRepTablesUpdationTriggerQuery.toString());*/
1078          }
1079    
1080          public PreparedStatement makePrimaryPreperedStatementBackwardTraversing(String[] primaryColumns, long lastId, String local_pub_sub_name, String shadowTable) throws SQLException, RepException {
1081            StringBuffer query = new StringBuffer();
1082            query.append(" select top 1 * from ")
1083            .append(shadowTable)
1084            .append(" where ")
1085            .append(RepConstants.shadow_sync_id1)
1086            .append(" < ?  ")
1087            .append(" and ")
1088            .append(RepConstants.shadow_sync_id1)
1089            .append(" > ")
1090            .append(lastId);
1091            for (int i = 0; i < primaryColumns.length; i++) {
1092              query.append(" and ")
1093              .append(primaryColumns[i])
1094              .append(" = ?  ");
1095            }
1096            query.append(" order by limit 1 ")
1097            .append(RepConstants.shadow_sync_id1)
1098            .append(" desc ");
1099            log.debug(query.toString());
1100    //System.out.println("PostgreSQLHandler  makePrimaryPreperedStatementDelete  ::  " +query.toString());
1101            Connection pub_sub_Connection = connectionPool.getConnection(local_pub_sub_name);
1102            return pub_sub_Connection.prepareStatement(query.toString());
1103          }
1104    
1105      /**
1106       * isSchemaSupported
1107       *
1108       * @return boolean
1109       */
1110      public boolean isSchemaSupported() {
1111        return true;
1112      }
1113    
1114      /*
1115          CREATE FUNCTION merge_db (key INT, data TEXT) RETURNS VOID AS
1116           $$
1117           BEGIN
1118               LOOP
1119                   UPDATE db SET b = data WHERE a = key;
1120                   IF found THEN
1121                       RETURN;
1122                   END IF;
1123    
1124                   BEGIN
1125                       INSERT INTO db(a,b) VALUES (key, data);
1126                       RETURN;
1127                   EXCEPTION WHEN unique_violation THEN
1128                       -- do nothing
1129                   END;
1130               END LOOP;
1131           END;
1132           $$
1133       */
1134    
1135     }





























































Powered by Drupal - Theme by Danger4k