JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    package org.dbreplicator.replication;
021    
022    import java.io.*;
023    import java.net.*;
024    import java.net.UnknownHostException;
025    import java.rmi.*;
026    import java.rmi.server.*;
027    import java.sql.*;
028    import java.util.*;
029    import javax.xml.parsers.*;
030    
031    import org.xml.sax.*;
032    import org.xml.sax.ContentHandler;
033    import org.dbreplicator.replication.DBHandler.*;
034    import org.dbreplicator.replication.xml.*;
035    import org.dbreplicator.replication.zip.ZipHandler;
036    import org.dbreplicator.replication.xml.SnapshotHandler;
037    import org.dbreplicator.replication.xml.DDLHandler;
038    import org.apache.log4j.Logger;
039    import org.dbreplicator.replication.synchronize.AbstractSynchronize;
040    import java.sql.ResultSet;
041    import java.math.BigDecimal;
042    
043    /**
044     * The Subscription class holds all the methods which are required for the physical
045     * creation of the subscription(i.e system tables) and which are used at the time
046     * of synchronization at the subscriber's end. It implements three interfaces,
047     * <p>
048     * _subscription : It makes this class to implement all the methods which are
049     * called by the user, at the time of subscribig, and for different tasks
050     * related to synchronization.
051     * <p>
052     * _SubImpl : It makes this class to implement the method getServerName which
053     * is called remotely by the publisher to get the local server name at the time
054     * of synchronization.
055     * <p>
056     * _Replicator : It makes this class to implement all the methods which are called
057     * at the time of synchronization to get the subscriber's information.
058     *
059     */
060    
061    public class Subscription
062        extends UnicastRemoteObject
063        implements _SubImpl, _Subscription, _Replicator {
064      public String subName;
065      private int remotePort;
066      private String remoteUrl;
067      private ConnectionPool connectionPool;
068      private ArrayList subRepTables;
069      private String conflictResolver;
070      private String pubName;
071      private _ReplicationServerImpl remoteServer;
072      protected static Logger log = Logger.getLogger(Subscription.class.getName());
073      private ArrayList alterTableAddFKQueries;
074    
075      // To write the file on client socket.
076      private _FileUpload fileUpload;
077    
078      public AbstractDataBaseHandler dbHandler;
079    
080      public void resetRemoteServer() {
081                      remoteServer = null;
082      }
083      SyncXMLCreator syncXMLCreator;
084      private ReplicationServer localServer;
085      HashMap syncIdMap;
086    
087      public Subscription() throws RemoteException {
088      }
089    
090      public Subscription(ConnectionPool connectionPool0, String subName0,
091                          String serverName0, ReplicationServer localServer0) throws
092          RemoteException,
093          RepException {
094        subName = subName0;
095        connectionPool = connectionPool0;
096        // make sure whether you need this connection statement or not
097        Connection subConnection = connectionPool.getConnection(subName);
098        dbHandler = Utility.getDatabaseHandler(connectionPool, subName);
099            connectionPool.returnConnection(subConnection);
100        dbHandler.setLocalServerName(serverName0);
101        dbHandler.setLocalServerName(serverName0);
102        syncXMLCreator = new SyncXMLCreator(subName, connectionPool, dbHandler);
103        localServer = localServer0;
104        fileUpload = new FileUpload();
105    
106    
107      }
108    
109      /**
110       * This method is called after parsing to set the repTables with the structures
111       * same as written on XML file.
112       *
113       * @param tableNames0
114       */
115    
116      public void setSubscriptionTables(ArrayList tableNames0) {
117        subRepTables = tableNames0;
118      }
119    
120      /* todo Added by hisar team. */
121    
122      public void setAlterTableAddFKStatements(ArrayList alterTableAddFKQueries0) {
123        alterTableAddFKQueries = alterTableAddFKQueries0;
124      }
125    
126      /**
127       * This method is called at the time of creating the subscriber. This method
128       * sets the url(i p address.) of the publsher's m/c for the subscriber, i.e
129       * on which m/c the publisher exists. And on which m/c the subscriber will
130       * look for this publisher.
131       *
132       * @param remoteUrl0
133       * @throws RepException
134       */
135    
136      public void setRemoteServerUrl(String remoteUrl0) throws RepException {
137        if (remoteUrl0.equalsIgnoreCase("localhost")) {
138          throw new RepException("REP004", null);
139        }
140        remoteUrl = remoteUrl0;
141      }
142    
143      /**
144       * This method is called at the time of creating the subscriber. This method
145       * sets the port no. of the publsher for the subscriber, i.e on which port
146       * the publisher is running. And on which port the subscriber will look
147       * for the replication server of the publisher.
148       *
149       * @param remotePort0
150       */
151    
152      public void setRemoteServerPortNo(int remotePort0) {
153        remotePort = remotePort0;
154      }
155    
156      public String getRemoteServerUrl() {
157        return remoteUrl;
158      }
159    
160      public int getRemoteServerPortNo() {
161        return remotePort;
162      }
163    
164      public ConnectionPool getConnectionPool() {
165        return connectionPool;
166      }
167    
168      public void setPublicatonName(String pubName0) {
169        pubName = pubName0;
170      }
171    
172      /**
173       * This method is responsible for the physical creation of the subscriber.
174       * This method creates replication system tables for the subscription at the
175       * subscriber's end. For getting the publication's replication tables'
176       * structures, it looks up the remote replication server, gets the object
177       * of the publication's class and then calls remote methods which
178       * create the XML file on the publisher's end. This file holds the queries
179       * needed to create the same structures on the client/subscriber system. Then this file
180       * is transferred over the client socket, parsed and replication
181       * tables are generated according to its specifications. Then the proper triggers are created,
182       * and subscription data is saved.
183       *
184       * @throws RepException
185       */
186    
187      public void subscribe() throws RepException {
188        // Create an Instance of DBHandler With Respect to Client Connection
189            Connection connection = null;
190        Statement stmt = null;
191        try {
192          //Creates Subscription Table
193          //Creates BookMark Table
194          //Creates Super Log Table
195          //Creates Rep Table
196          connection = connectionPool.getConnection(subName);
197          stmt = connection.createStatement();
198          dbHandler.createClientSystemTables(subName);
199          HashMap primCols = new HashMap();
200          _PubImpl pub = getPublication();
201          int pubVendorType = pub.getPubVendorName();
202          //Returns [0] array of schemas [1]publicationtablequeries.
203          ArrayList[] schemaTables = getPublicationTableQueries(primCols, pub);
204          dbHandler.createSchemas(pubName, schemaTables[0]);
205          String[] alterTableQueries = alterTableAddFKQueries == null ? null : (String[]) alterTableAddFKQueries.toArray(new String[0]);
206          dbHandler.createSubscribedTablesTriggersAndShadowTables(subName,(String[]) (schemaTables[1]).toArray(new String[0]), alterTableQueries, primCols, pubVendorType, subRepTables);
207          //Do Entry in the Subscription Table , BookMark Table, Rep Table.
208          saveSubscriptionData(dbHandler, connection, stmt);
209          pub.saveSubscriptionData(subName);
210        }
211        catch (RepException ex) {
212          RepConstants.writeERROR_FILE(ex);
213          throw ex;
214        }
215        catch (Exception ex) {
216          RepConstants.writeERROR_FILE(ex);
217          throw new RepException("REP025", new Object[] {ex.getMessage()});
218        }
219        finally {
220          connectionPool.removeSubPubFromMap(subName);
221          if (stmt != null)
222            try {
223              stmt.close();
224            }
225            catch (SQLException ex1) {
226            }
227                    connectionPool.returnConnection(connection);
228        }
229      }
230    
231      /**
232       * Make an entry in the subscriptions table, and insert all the records
233       * corresponding to each replicated table into the reptable and bookmark table.
234       *
235       * @param dbHandler
236       * @throws SQLException
237       * @throws RepException
238       */
239    
240      private void saveSubscriptionData(AbstractDataBaseHandler dbHandler,
241                                        Connection connection, Statement stmt) throws
242          SQLException, RepException {
243    
244        try {
245    
246          //Inserts into Subscription table
247          stmt.execute("insert into " + dbHandler.getSubscriptionTableName() +" values ( '" + subName + "', '"
248                       + pubName + "' , '" + conflictResolver + "' , '" +dbHandler.getLocalServerName() + "' )");
249          log.info("Query exceuted insert into " +
250                   dbHandler.getSubscriptionTableName() +
251                   " values ( '" + subName + "', '" + pubName + "' , '" +
252                   conflictResolver + "' , '" +
253                   dbHandler.getLocalServerName() + "' )");
254          //For each table makes an entry into the BookMark Table and Rep Table
255          for (int i = 0, size = subRepTables.size(); i < size; i++) {
256            RepTable repTable = (RepTable) subRepTables.get(i);
257            repTable.setConflictResolver(conflictResolver);
258            StringBuffer sb2 = new StringBuffer();
259            sb2.append(" Insert into ").append(dbHandler.getBookMarkTableName())
260                .append(" values ( '").append(subName).append("','")
261                .append(pubName).append("','").append(repTable.getSchemaQualifiedName())
262                .append("',").append("0,0,'N')");
263            dbHandler.saveRepTableData(connection, subName, repTable);
264            stmt.execute(sb2.toString());
265            log.debug(sb2.toString());
266          }
267        }
268        catch (SQLException ex) {
269          RepConstants.writeERROR_FILE(ex);
270          throw new RepException("REP022", new Object[] {subName, pubName});
271        }
272        finally {
273          if (stmt != null)
274            stmt.close();
275        }
276      }
277    
278      /**
279       * remove all the records from subscribed tables at subscriber
280       * and update the subscriber side with the latest records from the publisher.
281       *
282       * @throws RepException
283       */
284      public synchronized void getSnapShot() throws RepException {
285        Connection subConnection = null;
286        Statement subStatment = null;
287        boolean islockedTaken = false,isCurrentTableCyclic = false;
288        _PubImpl publication = null;
289        try {
290          String localAddress = null,remoteServerName=null,remoteAddress=null;
291          try {
292            _ReplicationServerImpl remoteServer = getRemoteReplicationServer();
293            remoteServerName = remoteServer.getServerName();
294            publication = remoteServer.getRemotePublication(pubName);
295            publication.checkForLock(subName);
296            islockedTaken = true;
297            localAddress = InetAddress.getLocalHost().getHostAddress();
298            remoteAddress =remoteServer.getServerName();
299            publication.createSnapShot(subName,isSchemaSupported(),fileUpload,localAddress );
300          }
301          catch (RepException ex) {
302            log.error(ex.getMessage(), ex);
303            throw ex;
304          }
305          catch (Exception ex) {
306            RepConstants.writeERROR_FILE(ex);
307            RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()});
308            rex.setStackTrace(ex.getStackTrace());
309            throw rex;
310          }
311          try {
312            //unzipping the zip file
313            if(!localAddress.equalsIgnoreCase(remoteAddress)) {
314              ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("snapshot_" + pubName + "_" + subName),
315                               PathHandler.getDefaultFilePathForClient("snapshot_" + pubName +"_" + subName));
316            }
317          }
318          catch (IOException ex) {
319            RepConstants.writeERROR_FILE(ex);
320            RepException rex = new RepException("REP052", new Object[] {subName,ex.getMessage()});
321            rex.setStackTrace(ex.getStackTrace());
322            throw rex;
323          }
324    
325          try {
326            subConnection = connectionPool.getConnection(subName);
327            subStatment = subConnection.createStatement();
328                    /* bjt - start transaction */
329            for (int i = 0; i < subRepTables.size(); i++) {
330              RepTable repTable = ( (RepTable) subRepTables.get(i));
331              isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES);
332              if(isCurrentTableCyclic)
333              break;
334            }
335            SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser();
336            XMLReader reader = saxParser.getXMLReader();
337            // Delete records from all tables
338            deleteAllRecordsFromMainTables(subStatment);
339            //Instance for content handler
340                    //bjt
341                    /*
342                DatabaseMetaData dbmd = subConnection.getMetaData();
343                    int batchMax = (dbmd.supportsBatchUpdates() ? 1000 : 0);
344                    log.debug("Batch Max = " + batchMax);
345                    if (batchMax > 0) {
346                            log.debug("Using statement pooling in batches of " + batchMax);
347                    } else {
348                            log.debug("NOT Using statement pooling, database does not seem to support it");
349                    }
350            SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName, batchMax); //  instance for content hanedler
351                    */
352            SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName); //  instance for content hanedler
353            //  instance for content hanedler
354            ch.setPubName(pubName);
355            ch.setSubName(subName);
356            reader.setContentHandler(ch);
357            reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName));
358            ch.closeAllStatementAndResultset();
359            /**
360             * To handle the cyclic table case referenced
361             * columns value is updated in second pass.
362             */
363            if(isCurrentTableCyclic){
364            SnapshotHandler ch1 = new SnapshotHandler(false, subConnection, this,dbHandler, remoteServerName); //  instance for content hanedler
365            ch1.setPubName(pubName);
366            ch1.setSubName(subName);
367            reader.setContentHandler(ch1);
368            reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName));
369            ch1.closeAllStatementAndResultset();
370            }
371            if (_Subscription.xmlAndShadow_entries) {
372              // deleting xml file
373              deleteFile(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName));
374              // deleting zip file
375              deleteFile(PathHandler.getDefaultZIPFilePathForClient("snapshot_" + pubName + "_" +subName));
376            }
377            dbHandler.deleteRecordsFromSuperLogTable(subStatment);
378                    /* bjt - finish transaction */
379          }
380          catch (Exception ex) {
381            RepConstants.writeERROR_FILE(ex);
382            RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()});
383            rex.setStackTrace(ex.getStackTrace());
384            throw rex;
385          }
386        }
387        catch (RepException rex) {
388          RepConstants.writeERROR_FILE(rex);
389          throw rex;
390        }
391        finally {
392          connectionPool.removeSubPubFromMap(subName);
393          try {
394            if (islockedTaken)
395              publication.releaseLOCK();
396          }
397          catch (RemoteException ex2) {
398            RepConstants.writeERROR_FILE(ex2);
399          }
400    
401          try {
402            if (subStatment != null) {
403              subStatment.close();
404    
405            }
406          }
407          catch (SQLException ex1) {
408          }
409             connectionPool.returnConnection(subConnection);
410        }
411      }
412    
413      /**
414       * Delete records from all main tables
415       *
416       * @throws SQLException
417       */
418      private void deleteAllRecordsFromMainTables(Statement subStatment) throws SQLException,  RepException {
419        for (int i = subRepTables.size() - 1; i >= 0; i--) {
420          RepTable repTable = (RepTable) subRepTables.get(i);
421          subStatment.execute("delete from " + repTable.getSchemaQualifiedName());
422        }
423      }
424    
425      /**
426       * This method first gets the object of the publications object,
427       * and makes an entry in the publisher server's bookmark table.
428       * Next, it creates struct_pubname.xml,struct_pubname.zip file , blob.lob, clob.lob
429       * on the publisher and writes these on socket to the subscriber side, then parses
430       * them.
431       *
432       * @param primColMap
433       * @return ArrayList[]
434       * @throws SQLException
435       * @throws RemoteException
436       * @throws RepException
437       */
438    
439      private ArrayList[] getPublicationTableQueries(HashMap primColMap, _PubImpl pub) throws
440          SQLException, RemoteException, RepException {
441    
442        if (pub == null) {
443          throw new RepException("REP036", new Object[] {pubName});
444        }
445        conflictResolver = pub.getConflictResolver();
446        String address = null,remoteMachineAddresss =null;
447          _ReplicationServerImpl replicationServer = getRemoteReplicationServer();
448          remoteMachineAddresss =   replicationServer.getServerName();
449    
450        try {
451          //address = InetAddress.getLocalHost().getHostAddress();
452          address = InetAddress.getLocalHost().getHostName();
453        }
454        catch (UnknownHostException ex) {
455          RepConstants.writeERROR_FILE(ex);
456          throw new RepException("REP025", new Object[] {subName, ex.getMessage()});
457        }
458        //Do entry in the server's bookmark table.
459        //Creates struct_pubname.xml,struct_pubname.zip file , blob.lob, clob.lob on the publisher.
460        //Writes tablequery file i.e XMLfile on the subscribers socket
461        pub.createStructure(Utility.getVendorType(connectionPool, subName), subName,address, isSchemaSupported(),fileUpload,address);
462        ArrayList schemas = null;
463        ArrayList queries = null;
464            Connection connection = null;
465        try {
466              connection = connectionPool.getConnection(subName);
467              MetaDataInfo mdi = Utility.getDatabaseMataData(connection);
468          String filePath = PathHandler.getDefaultFilePathForCreateStructure("struct_" + pubName + "_" + subName);
469          //Creates xml,blob.lob,clob.lob files over the subscriber
470          if(!address.equalsIgnoreCase(remoteMachineAddresss)) {
471            String zipFilePath = PathHandler.getDefaultZIPFilePathForCreateStructure("struct_" + pubName + "_" + subName);
472            ZipHandler.unStructZip(zipFilePath, filePath);
473          }
474          SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser();
475          //Returns XMLReaderImpl class' object in XMLReader interface's reference.
476          XMLReader reader = saxParser.getXMLReader();
477          schemas = new ArrayList();
478          queries = new ArrayList();
479          ContentHandler ddh = new DDLHandler(this, schemas, queries,conflictResolver, primColMap, mdi);
480          reader.setContentHandler(ddh);
481          //Actually called method is XMLReaderImpl->parse.
482          //internally calls startDicument,endDocument methods of DDLHandler.
483          reader.parse(filePath);
484        }
485        catch (SAXException ex) {
486          RepConstants.writeERROR_FILE(ex);
487          throw new RepException("REP081", null);
488        }
489        catch (ParserConfigurationException ex) {
490          RepConstants.writeERROR_FILE(ex);
491          throw new RepException("REP081", null);
492        }
493        catch (FileNotFoundException ex) {
494          RepConstants.writeERROR_FILE(ex);
495          throw new RepException("REP082", null);
496        }
497        catch (IOException ex) {
498          RepConstants.writeERROR_FILE(ex);
499          throw new RepException("REP083", null);
500        }
501            finally {
502                    connectionPool.returnConnection(connection);
503            }
504        return new ArrayList[] {schemas, queries};
505      }
506    
507      /**
508       * Gets the object of the replication server class by looking up the
509       * rmi registry listening at the specified port and url.
510       *
511       * @return
512       * @throws NotBoundException
513       * @throws MalformedURLException
514       * @throws RemoteException
515       * @throws RepException
516       */
517    
518      private _ReplicationServerImpl getRemoteReplicationServer() throws
519          RepException {
520        //remoteUrl & remotePort are set by the user it comes from the functions setRemoteServerPortNo.
521        //& setRemoteServerURL...
522        try {
523          if (remoteServer != null) {
524            return remoteServer;
525          }
526          String ipadd = null;
527          try {
528            //ipadd = InetAddress.getByName(remoteUrl).getHostAddress();
529                    // bjt - remote
530            ipadd = InetAddress.getByName(remoteUrl).getHostName();
531          }
532          catch (UnknownHostException ex) {
533            RepConstants.writeERROR_FILE(ex);
534            throw new RepException("REP001", new Object[] {ex.getMessage()});
535          }
536          String name = "rmi://" + ipadd + ":" + Integer.toString(remotePort) + "/" + remoteUrl + "_" + remotePort;
537              log.debug(" bjt - rmi line ");
538          log.debug("Naming.lookup(" + name + ")");
539          remoteServer = (_ReplicationServerImpl) Naming.lookup(name);
540              log.debug(" bjt - returning remoteServer " + remoteServer);
541          return remoteServer;
542        }
543        catch (RemoteException ex) {
544          RepConstants.writeERROR_FILE(ex);
545          throw new RepException("REP025", new Object[] {subName, ex.getMessage()});
546        }
547        catch (MalformedURLException ex) {
548          RepConstants.writeERROR_FILE(ex);
549          throw new RepException("REP025", new Object[] {subName, ex.getMessage()});
550        }
551    
552        catch (NotBoundException ex1) {
553          RepConstants.writeERROR_FILE(ex1);
554          throw new RepException("REP025", new Object[] {subName, ex1.getMessage()});
555        }
556        catch (RepException ex1) {
557          throw ex1;
558        }
559      }
560    
561      /**
562       * Takes changed records from publisher, transfers them to subscriber and applies them,
563       * then takes changed records from subscriber and applies them back to the publisher.  This
564       * is the main function called for merge replication, and does everything according to the
565       * rules set forth in the conflict resolution algorithm chosen for this publication.
566       *
567       * @throws RepException
568       */
569      public synchronized void synchronize() throws RepException {
570        _PubImpl publication = null;
571        Object[] serverInfo = null;
572        Object[] pubLastSyncId = null;
573        BufferedWriter bw = null;
574            Connection subConnection = null;
575        Statement stmt = null;
576        ResultSet rs = null;
577        boolean isCurrentTableCyclic = false, islockedTaken = false;
578        String localMachineAddress=null,remoteMachineAddress=null;
579        try {
580          try {
581            _ReplicationServerImpl remoteRepServer =  getRemoteReplicationServer();
582            publication = remoteRepServer.getRemotePublication(pubName);
583           remoteMachineAddress = remoteRepServer.getRemoteAddress();
584            publication.checkForLock(subName);
585            islockedTaken = true;
586            //Creates synchronization related files over server and transfers it over client socket. 
587                    //Besides it creates server socket over server and returns serversocket related information.
588            localMachineAddress = InetAddress.getLocalHost().getHostName();
589            serverInfo = publication.createXMLForClient(subName, getServerName(),isSchemaSupported(),fileUpload,localMachineAddress);
590            pubLastSyncId = ( (Object[]) serverInfo[2]);
591          }
592          catch (RemoteException ex1) {
593            RepConstants.writeERROR_FILE(ex1);
594            RepException rex = new RepException("REP001",new Object[] {ex1.getMessage()});
595            rex.setStackTrace(ex1.getStackTrace());
596            throw rex;
597          }
598          catch (Exception ex1) {
599            RepConstants.writeERROR_FILE(ex1);
600            RepException rex = new RepException("REP057", new Object[] {ex1.getMessage()});
601            rex.setStackTrace(ex1.getStackTrace());
602            throw rex;
603          }
604    
605          try {
606            // unzipping zip file
607           if(!localMachineAddress.equalsIgnoreCase(remoteMachineAddress)) {
608                     log.debug(" bjt - localMachineAddress = " + localMachineAddress + ", remoteMachineAddress = " + remoteMachineAddress);
609             ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName),PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName));
610           }
611          }
612          catch (IOException ex) {
613            RepConstants.writeERROR_FILE(ex);
614            throw new RepException("REP084", null);
615          }
616    
617          try {
618            subConnection = connectionPool.getConnection(subName);
619                    /* bjt - start transaction here or below */
620            stmt = subConnection.createStatement();
621            SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser();
622            XMLReader reader = saxParser.getXMLReader();
623    
624            MergeHandler mg = new MergeHandler(true,
625                                               subConnection, this,
626                                               publication.getServerName(),
627                                               dbHandler, bw, "MERGE REPLICATION",
628                                               PathHandler.
629                                               fullOrPartialTransactionLogFile(),
630                                               Utility.getDatabaseMataData(subConnection));
631            mg.setLocalName(subName);
632            mg.setRemoteName(pubName);
633            ContentHandler ch = mg;
634            reader.setContentHandler(ch);
635    
636            //initializing hashmap for maxSyncId for updating consideredId of bookMarkTable further
637            syncIdMap = new HashMap();
638            for (int i = 0; i < subRepTables.size(); i++) {
639                            /* bjt - start transaction here or above - spot 2 */
640                    RepTable repTable  =( (RepTable) subRepTables.get(i));
641                    String tableName = repTable.getSchemaQualifiedName().toString();
642                    StringBuffer query = new StringBuffer();
643                    isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES);
644                    query.append(" select ").append(RepConstants.shadow_sync_id1).
645                                append(" from ").append(RepConstants.shadow_Table(tableName)).
646                                    append(" order by ").append(RepConstants.shadow_sync_id1).
647                                    append(" desc limit 1");
648                    rs = stmt.executeQuery(query.toString());
649                            long syncIdValue = new Long(0);
650                    if(rs.next()) {
651                                    syncIdValue = rs.getLong(1);
652                            }
653                    syncIdMap.put(tableName, syncIdValue);
654                    log.debug("tableName:" + tableName + " syncid: " + syncIdValue);
655            }
656                    /* bjt - do the work of absorbing the sync file into the database here */
657            reader.parse(PathHandler.getDefaultFilePathForClient("server_" + pubName + "_" + subName));
658            mg.closeAllStatementAndResultset();
659             makeSubscriberTransactionLgFile(subName,mg,bw,"MERGE");
660    
661            // Second pass
662                    /* bjt - not being used for cyclic depenency anyway...*/
663            /* if (isCurrentTableCyclic) { */
664              MergeHandler mg1 = new MergeHandler(false,
665                                                  subConnection, this,
666                                                  publication.getServerName(),
667                                                  dbHandler, bw,
668                                                  "MERGE REPLICATION",
669                                                  PathHandler.
670                                                  fullOrPartialTransactionLogFile(),
671                                                  Utility.getDatabaseMataData(subConnection));
672              mg1.setLocalName(subName);
673              mg1.setRemoteName(pubName);
674              ContentHandler ch1 = mg1;
675              reader.setContentHandler(ch1);
676              reader.parse(PathHandler.getDefaultFilePathForClient("server_" +pubName+ "_" + subName));
677              makeSubscriberTransactionLgFile(subName,mg,bw,"MERGE");
678              mg1.closeAllStatementAndResultset();
679            /* } */
680    
681            //updating consideredId on subscriber side
682            for (int i = 0; i < subRepTables.size(); i++) {
683              String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString();
684              if ( ( (RepTable) subRepTables.get(i)).getCreateShadowTable().equalsIgnoreCase(RepConstants.NO))
685                continue;
686              UpdateConisderedForBookMarksTable(pubName, subName, tableName,(Long) syncIdMap.get(tableName),stmt);
687            }
688            syncIdMap.clear();
689          }
690          catch (Exception ex) {
691            if(Utility.createTransactionLogFile) {
692              AbstractSynchronize.writeUnsuccessfullOperationInTransaction(bw);
693            }
694            RepConstants.writeERROR_FILE(ex);
695            RepException rep = null;
696            if (ex instanceof SAXException) {
697              Exception e = ( (SAXException) ex).getException();
698              if (e instanceof RepException) {
699                throw (RepException) e;
700              }
701              else {
702                rep = new RepException("REP057", new Object[] {e.getMessage()});
703                rep.setStackTrace(e.getStackTrace());
704                throw rep;
705              }
706            }
707            else {
708              rep = new RepException("REP057", new Object[] {ex.getMessage()});
709              rep.setStackTrace(ex.getStackTrace());
710            }
711            throw rep;
712          }
713    
714          if (_Subscription.xmlAndShadow_entries) {
715            // deleting xml file
716            deleteFile(PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName));
717            // deleting zip file
718            deleteFile(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName));
719          }
720          //going to update lastsyncId in bookmark table on publisher side
721    
722          try {
723            publication.updateBookMarkLastSyncId(subName, pubLastSyncId);
724          }
725          catch (Exception ex) {
726            RepConstants.writeERROR_FILE(ex);
727            throw new RepException("REP057", new Object[] {subName, ex.getMessage()});
728          }
729    
730    // CREATE n Write XML File from Client Side to Socket for server Side.
731          try {
732            if (subRepTables.size() > 0) {
733              // creating and writing XML file ON SOCKET for server
734             Object[] clientTablesAndLastId =
735             syncXMLCreator.createXMLFile(PathHandler.getDefaultFilePathForClient("client_" + subName + "_" + pubName),
736                                        PathHandler.getDefaultZIPFilePathForClient("client_" + subName +"_" + pubName),
737                                        "client_" + subName + "_" + pubName /*+ ".xml"*/, pubName,subRepTables,publication.getServerName(),subRepTables.size(),
738             _Subscription.xmlAndShadow_entries,subName,isSchemaSupported(),publication.getFileUploader(),localMachineAddress,remoteMachineAddress);
739              ArrayList usedActualTables = (ArrayList) clientTablesAndLastId[0];
740              Object[] subLastSyncId = (Object[]) clientTablesAndLastId[1];
741              dbHandler.deleteRecordsFromSuperLogTable(stmt);
742              //This method let publisher to accept the data written on the publisher's socket by the subscriber. And parse it.
743              //And deletes records from pub's log table.And perform other table operations.
744              publication.synchronize(subName, getServerName(),Utility.createTransactionLogFile,localMachineAddress);
745              for (int i = 0; i < subRepTables.size(); i++) {
746                RepTable repTable = (RepTable) subRepTables.get(i);
747                if (repTable.getCreateShadowTable().equalsIgnoreCase(RepConstants.NO))
748                  continue;
749                String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString();
750                updateBookMarkLastSyncId(tableName, pubName, subLastSyncId[i],stmt);
751              }
752              if (_Subscription.xmlAndShadow_entries) {
753                deletingRecordsFromShadowTable(usedActualTables, stmt);
754              }
755            }
756          }
757          catch (Exception ex) {
758            RepConstants.writeERROR_FILE(ex);
759            if (ex instanceof RepException) {
760              throw (RepException) ex;
761            }
762            RepException rex = new RepException("REP054", new Object[] {subName,ex.getMessage()});
763            rex.setStackTrace(ex.getStackTrace());
764            throw rex;
765          }
766        }
767        catch (RepException rex) {
768          RepConstants.writeERROR_FILE(rex);
769          throw rex;
770        }
771        finally {
772          connectionPool.removeSubPubFromMap(subName);
773          try {
774            if (islockedTaken)
775              publication.releaseLOCK();
776          }
777          catch (RemoteException ex2) {
778            RepConstants.writeERROR_FILE(ex2);
779          }
780          try {
781            if (bw != null)
782              bw.close();
783            if (rs != null) {
784              rs.close();
785            }
786            if (stmt != null)
787              stmt.close();
788          }
789          catch (IOException ex4) {
790          }
791          catch (SQLException ex4) {
792          }
793             connectionPool.returnConnection(subConnection);
794        }
795      }
796    
797      /**
798       * In Push Replication records are uploaded to the publisher database.
799       * Subscriber create a XML file of records which are considered for
800       * synchronization and writes it on client synchronization socket.
801       * The Publisher reads the zip file of client records, parses it and
802       * performs the included operations on the published tables in accordance
803       * with the conflict resolution algorithm chosen for this publication.
804       *
805       * @throws RepException
806       */
807    
808      public synchronized void push() throws RepException {
809        _PubImpl publication = null;
810        Statement stmt = null;
811        ResultSet rs = null;
812        Connection subConnection = null;
813        boolean islockedTaken = false;
814        String localMachineAddress=null,remoteMachineAddress=null;
815        try {
816          subConnection = connectionPool.getConnection(subName);
817    
818          try {
819            _ReplicationServerImpl remoteRepServer =getRemoteReplicationServer();
820            publication = remoteRepServer.getRemotePublication(pubName);
821            remoteMachineAddress =remoteRepServer.getServerName();
822             localMachineAddress =InetAddress.getLocalHost().getHostAddress();
823          }
824          catch (RemoteException ex1) {
825            RepConstants.writeERROR_FILE(ex1);
826            RepException rex = new RepException("REP001",new Object[] {ex1.getMessage()});
827            rex.setStackTrace(ex1.getStackTrace());
828            throw rex;
829          }
830          catch (Exception ex1) {
831            RepConstants.writeERROR_FILE(ex1);
832            RepException rex = new RepException("REP0106",new Object[] {ex1.getMessage()});
833            rex.setStackTrace(ex1.getStackTrace());
834            throw rex;
835          }
836    // CREATE n Write XML File from Client Side to Socket for server Side.
837          try {
838            stmt = subConnection.createStatement();
839            publication.checkForLock(subName);
840            islockedTaken = true;
841            if (subRepTables.size() > 0) {
842              // creating and writing XML file ON SOCKET for server
843              Object[] clientTablesAndLastId = syncXMLCreator.createXMLFile(
844                  PathHandler.getDefaultFilePathForClient("client_" + subName + "_" +pubName),
845                  PathHandler.getDefaultZIPFilePathForClient("client_" + subName +"_" + pubName), "client_" + subName + "_" + pubName /*+ ".xml"*/,
846                  pubName,subRepTables,publication.getServerName(),subRepTables.size(),
847                  _Subscription.xmlAndShadow_entries, subName,isSchemaSupported(),publication.getFileUploader(),localMachineAddress,remoteMachineAddress);
848              ArrayList usedActualTables = (ArrayList) clientTablesAndLastId[0];
849              Object[] subLastSyncId = (Object[]) clientTablesAndLastId[1];
850              dbHandler.deleteRecordsFromSuperLogTable(stmt);
851              /* This method let publisher to accept the data written on the
852                 publisher's socket by the subscriber. And parse it.
853                 And deletes records from pub's log table.
854                 And perform other table operations.
855               */
856              //initializing hashmap for maxSyncId for updating consideredId of bookMarkTable further
857    
858              syncIdMap = new HashMap();
859              for (int i = 0; i < subRepTables.size(); i++) {
860                String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString();
861                StringBuffer query = new StringBuffer();
862                query.append(" select ").append(RepConstants.shadow_sync_id1).
863                                append(" from ").append(RepConstants.shadow_Table(tableName)).
864                                    append(" order by ").append(RepConstants.shadow_sync_id1).
865                                    append(" desc limit 1");
866                rs = stmt.executeQuery(query.toString());
867                if (rs.next()) 
868                syncIdMap.put(tableName, new Long(rs.getLong(1)));
869              }
870              publication.push(subName, getServerName(),Utility.createTransactionLogFile,localMachineAddress);
871              for (int i = 0; i < subRepTables.size(); i++) {
872                String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString();
873                updateBookMarkLastSyncId(tableName, pubName, subLastSyncId[i],stmt);
874                UpdateConisderedForBookMarksTable(pubName, subName, tableName, (Long) syncIdMap.get(tableName),stmt);
875              }
876              syncIdMap.clear();
877              if (_Subscription.xmlAndShadow_entries) {
878                deletingRecordsFromShadowTable(usedActualTables, stmt);
879              }
880            }
881          }
882          catch (Exception ex) {
883            RepConstants.writeERROR_FILE(ex);
884            if (ex instanceof RepException) {
885              throw (RepException) ex;
886            }
887            RepException rex = new RepException("REP0105", new Object[] {subName, ex.getMessage()});
888            rex.setStackTrace(ex.getStackTrace());
889            throw rex;
890          }
891        }
892        catch (RepException rex) {
893          RepConstants.writeERROR_FILE(rex);
894          throw rex;
895        }
896        finally {
897          connectionPool.removeSubPubFromMap(subName);
898    
899          try {
900            if (islockedTaken)
901              publication.releaseLOCK();
902          }
903          catch (RemoteException ex2) {
904            RepConstants.writeERROR_FILE(ex2);
905          }
906             connectionPool.returnConnection(subConnection);
907        }
908      }
909    
910      /**
911       * In Pull Replication records are downloaded from the publisher database.
912       * Subscriber create a XML file of records which are considered for
913       * synchronization and writes it on client synchronization socket.
914       * The Subscriber then reads the zip file of records, parses it and
915       * performs the included operations on the subscribed tables in accordance
916       * with the conflict resolution algorithm chosen for this publication.
917       *
918       * @throws RepException
919       */
920    
921    
922      public synchronized void pull() throws RepException {
923        _PubImpl publication = null;
924        String localAddress = null,remoteMachineAddress=null;
925        Object[] pubLastSyncId;
926        BufferedWriter bw = null;
927        Statement stmt = null;
928        ResultSet resultSet = null;
929        Connection subConnection = null;
930        boolean islockedTaken = false, isCurrentTableCyclic = false;
931        try {
932          try {
933            subConnection = connectionPool.getConnection(subName);
934            stmt = subConnection.createStatement();
935            _ReplicationServerImpl remoteRepServer = getRemoteReplicationServer();
936            publication = remoteRepServer.getRemotePublication(pubName);
937            remoteMachineAddress = remoteRepServer.getServerName();
938            publication.checkForLock(subName);
939            islockedTaken = true;
940            localAddress = InetAddress.getLocalHost().getHostAddress();
941    
942            /*
943             Creates synchronization related files over server and transfers
944             it over client socket. Besides it creates server socket over server
945             and returns serversocket related information.
946             */
947            long startTime = System.currentTimeMillis();
948            Object[] serverInfo = publication.createXMLForClient(subName, getServerName(),isSchemaSupported(),fileUpload,localAddress);
949            pubLastSyncId = (Object[]) serverInfo[2];
950          }
951          catch (RemoteException ex1) {
952            RepConstants.writeERROR_FILE(ex1);
953            RepException rex = new RepException("REP001",new Object[] {ex1.getMessage()});
954            rex.setStackTrace(ex1.getStackTrace());
955            throw rex;
956          }
957          catch (Exception ex1) {
958            RepConstants.writeERROR_FILE(ex1);
959            RepException rex = new RepException("REP0152", new Object[] {ex1.getMessage()});
960            rex.setStackTrace(ex1.getStackTrace());
961            throw rex;
962          }
963    
964          try {
965            // unzipping zip file
966            if(!localAddress.equalsIgnoreCase(remoteMachineAddress)) {
967              ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName),PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName));
968            }
969          }
970          catch (IOException ex) {
971            RepConstants.writeERROR_FILE(ex);
972            throw new RepException("REP084", null);
973          }
974          try {
975    
976            SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser();
977            XMLReader reader = saxParser.getXMLReader();
978            MergeHandler mg = new MergeHandler(true,
979                                                                                       subConnection, this,
980                                                                                       publication.getServerName(),
981                                                                                       dbHandler, bw, "PULL REPLICATION",
982                                                                                       PathHandler.fullOrPartialTransactionLogFile(),
983                                                                                       Utility.getDatabaseMataData(subConnection));
984            mg.setLocalName(subName);
985            mg.setRemoteName(pubName);
986            ContentHandler ch = mg;
987            reader.setContentHandler(ch);
988            //initializing hashmap for maxSyncId for updating consideredId of bookMarkTable further
989    
990            syncIdMap = new HashMap();
991    
992            for (int i = 0; i < subRepTables.size(); i++) {
993              RepTable repTable = ( (RepTable) subRepTables.get(i));
994              String tableName = repTable.getSchemaQualifiedName().toString();
995              StringBuffer query = new StringBuffer();
996              isCurrentTableCyclic = repTable.getCyclicDependency().
997                  equalsIgnoreCase(RepConstants.YES);
998              query.append(" select ").append(RepConstants.shadow_sync_id1).
999                  append(" from ").append(RepConstants.shadow_Table(tableName)).
1000                              append(" order by ").append(RepConstants.shadow_sync_id1).
1001                              append(" desc limit 1");
1002              resultSet = stmt.executeQuery(query.toString());
1003              if(resultSet.next())
1004                    syncIdMap.put(tableName, new Long(resultSet.getLong(1)));
1005            }
1006            reader.parse(PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName));
1007            mg.closeAllStatementAndResultset();
1008    
1009            makeSubscriberTransactionLgFile(subName,mg,bw,"PULL");
1010    
1011    
1012            /**
1013             * Following code has been written to handler the case
1014             * of cyclic table and complete the cyclic work in two
1015             * pass.In second pass value of all the colums is updated
1016             * by actual values that are set to null in first pass.
1017                     *
1018             */
1019            if (isCurrentTableCyclic) {
1020              MergeHandler mg1 = new MergeHandler(false,
1021                                                                                              subConnection, this,
1022                                                  publication.getServerName(),
1023                                                  dbHandler, bw,
1024                                                  "PULL REPLICATION",
1025                                                  PathHandler.fullOrPartialTransactionLogFile(),
1026                                                  Utility.getDatabaseMataData(subConnection));
1027              mg1.setLocalName(subName);
1028              mg1.setRemoteName(pubName);
1029              ContentHandler ch1 = mg1;
1030              reader.setContentHandler(ch1);
1031              AbstractSynchronize.writeDateInTransactionLogFile(bw);
1032              reader.parse(PathHandler.getDefaultFilePathForClient("server_" +pubName+ "_" + subName));
1033              mg1.closeAllStatementAndResultset();
1034            }
1035            try {
1036              for (int i = 0; i < subRepTables.size(); i++) {
1037                String tableName = ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().toString();
1038                UpdateConisderedForBookMarksTable(pubName, subName, tableName,(Long) syncIdMap.get(tableName),stmt);
1039              }
1040              syncIdMap.clear();
1041            }
1042            catch (SQLException ex) {
1043              RepConstants.writeERROR_FILE(ex);
1044              throw new RepException("REP0152", new Object[] {subName,ex.getMessage()});
1045            }
1046            makeSubscriberTransactionLgFile(subName,mg,bw,"PULL");
1047          }
1048          catch (Exception ex) {
1049            if(Utility.createTransactionLogFile) {
1050              AbstractSynchronize.writeUnsuccessfullOperationInTransaction(bw);
1051            }
1052            RepConstants.writeERROR_FILE(ex);
1053            RepException rep = null;
1054            if (ex instanceof SAXException) {
1055              Exception e = ( (SAXException) ex).getException();
1056              if (e instanceof RepException) {
1057                throw (RepException) e;
1058              }
1059              else {
1060                rep = new RepException("REP0152", new Object[] {e.getMessage()});
1061                rep.setStackTrace(e.getStackTrace());
1062                throw rep;
1063              }
1064            }
1065            else {
1066              rep = new RepException("REP0152", new Object[] {ex.getMessage()});
1067              rep.setStackTrace(ex.getStackTrace());
1068            }
1069            throw rep;
1070          }
1071    
1072          if (_Subscription.xmlAndShadow_entries) {
1073            // deleting xml file
1074            deleteFile(PathHandler.getDefaultFilePathForClient("server_" +pubName + "_" + subName));
1075            // deleting zip file
1076            deleteFile(PathHandler.getDefaultZIPFilePathForClient("server_" +pubName + "_" + subName));
1077          }
1078          try {
1079            publication.updatePublisherShadowAndBookmarkTableAfterPullOnSubscriber(subName, pubLastSyncId);
1080          }
1081          catch (Exception ex) {
1082            RepConstants.writeERROR_FILE(ex);
1083            throw new RepException("REP0152", new Object[] {subName,ex.getMessage()});
1084          }
1085        }
1086        catch (RepException rex) {
1087          RepConstants.writeERROR_FILE(rex);
1088          throw rex;
1089        }
1090        finally {
1091          try {
1092            if (bw != null)
1093              bw.close();
1094            connectionPool.removeSubPubFromMap(subName);
1095          }
1096          catch (Exception ex) {
1097            RepConstants.writeERROR_FILE(ex);
1098          }
1099          try {
1100            if (islockedTaken)
1101              publication.releaseLOCK();
1102          }
1103          catch (RemoteException ex2) {
1104            RepConstants.writeERROR_FILE(ex2);
1105          }
1106             connectionPool.returnConnection(subConnection);
1107        }
1108      }
1109    
1110      /**
1111       * Return the instance of RepTable corresponding to table name passed
1112       *
1113       * @param tableName String
1114       * @throws RepException
1115       * @return RepTable
1116       *
1117       */
1118      public RepTable getRepTable(String tableName) throws RepException {
1119        for (int i = 0; i < subRepTables.size(); i++) {
1120          if ( ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().
1121              toString().equalsIgnoreCase(tableName)) {
1122            return (RepTable) subRepTables.get(i);
1123          }
1124        }
1125        RepException rep = new RepException("REP017", new Object[] {tableName});
1126        RepConstants.writeERROR_FILE(rep);
1127        throw rep;
1128      }
1129    
1130      public void setFilterClause(String tableName, String filterClause) {
1131        for (int i = 0; i < subRepTables.size(); i++) {
1132          if ( ( (RepTable) subRepTables.get(i)).getSchemaQualifiedName().
1133              toString().equalsIgnoreCase(tableName)) {
1134            ( (RepTable) subRepTables.get(i)).setFilterClause(filterClause);
1135          }
1136        }
1137      }
1138    
1139      public String getServerName() throws RemoteException {
1140        return dbHandler.getLocalServerName();
1141      }
1142    
1143      public AbstractDataBaseHandler getDBDataTypeHandler() {
1144        return dbHandler;
1145      }
1146    
1147      public String getPub_SubName() {
1148        return subName;
1149      }
1150    
1151      private void deleteFile(String fileName) {
1152        File f = new File(fileName);
1153        boolean deleted = f.delete();
1154      }
1155    
1156      private void deleteALLRecordsFromSuperLogTable(Statement stmt) throws  SQLException {
1157        stmt.executeUpdate("delete from " + dbHandler.getLogTableName());
1158      }
1159    
1160      private void deleteRecordsFromSuperLogTable1(Statement subStatment) throws SQLException {
1161        // insert one record in superLogTable
1162        StringBuffer query = new StringBuffer();
1163        query.append("insert into ").append(dbHandler.getLogTableName()).append(" (").
1164            append(RepConstants.logTable_tableName2).append(") values  ('$$$$$$')");
1165    
1166        subStatment.execute(query.toString());
1167    
1168        query = new StringBuffer();
1169        // deleting all but one last record from super log table where commonid is maximum
1170        query.append("Select max (").append(RepConstants.logTable_commonId1).
1171            append(") from ").append(dbHandler.getLogTableName());
1172        ResultSet rs = subStatment.executeQuery(query.toString());
1173        rs.next();
1174        long maxCID = rs.getLong(1);
1175    
1176        query = new StringBuffer();
1177    
1178        query.append("delete from ").append(dbHandler.getLogTableName()).append(
1179            " where ")
1180            .append(RepConstants.logTable_commonId1).append(" !=").append(maxCID);
1181        int deletedNo = subStatment.executeUpdate(query.toString());
1182        log.debug(query.toString());
1183      }
1184    
1185      private void deletingRecordsFromShadowTable(ArrayList usedActualTables,Statement stmt) throws SQLException, RepException {
1186    
1187          int noOFTables = usedActualTables.size();
1188          StringBuffer query;
1189          if (noOFTables > 0) {
1190            for (int i = 0; i < noOFTables; i++) {
1191              Object minValue= dbHandler.getMinValOfSyncIdTodeleteRecordsFromShadowTable((String)
1192                         usedActualTables.get(i),stmt);
1193                     if (minValue instanceof BigDecimal) {
1194                       minValue = new Long( ( (BigDecimal) minValue).longValue());
1195                     }
1196                     else if (minValue instanceof Double) {
1197                       minValue = new Long( ( (Double) minValue).longValue());
1198                     }else if (minValue instanceof Long) {
1199                       minValue = new Long( ( (Long) minValue).longValue());
1200                     }else if (minValue instanceof Integer) {
1201                       minValue = new Long( ( (Integer) minValue).longValue());
1202                     }
1203    
1204              // deleting records from shadow table for that table
1205              query = new StringBuffer();
1206              query.append("delete from ").append(RepConstants.shadow_Table( (
1207                  String) usedActualTables.get(i))).append(" where  ").append(
1208                  RepConstants.shadow_sync_id1).append(" < ").append(minValue);
1209                      log.debug(query.toString());
1210              int count = stmt.executeUpdate(query.toString());
1211            }
1212          }
1213        }
1214    
1215      private void deletingRecordsFromShadowTable_old(ArrayList usedActualTables) {
1216            Connection conn = null;
1217        Statement stmt = null;
1218        try {
1219          int noOFTables = usedActualTables.size();
1220          StringBuffer query;
1221          if (noOFTables > 0) {
1222            conn = connectionPool.getConnection(subName);
1223            stmt = conn.createStatement();
1224            for (int i = 0; i < noOFTables; i++) {
1225              String shadowTable = RepConstants.shadow_Table( (String)
1226                  usedActualTables.get(i));
1227              query = new StringBuffer();
1228              query.append("SELECT MAX(").append(RepConstants.shadow_sync_id1).
1229                  append(") from ").append(shadowTable);
1230              ResultSet rs = stmt.executeQuery(query.toString());
1231              rs.next();
1232              long maxSyncId = rs.getLong(1);
1233    
1234              query = new StringBuffer();
1235              query.append("delete from ").append(shadowTable).append(" where ")
1236                  .append(RepConstants.shadow_sync_id1).append(" != ").append(maxSyncId);
1237              int count = stmt.executeUpdate(query.toString());
1238              log.debug(query.toString());
1239            }
1240          }
1241        }
1242        catch (Exception ex) {
1243          log.debug(ex);
1244          RepConstants.writeERROR_FILE(ex);
1245        }
1246        finally {
1247          connectionPool.removeSubPubFromMap(subName);
1248          try {
1249            if (stmt != null)
1250              stmt.close();
1251                try {
1252                            connectionPool.returnConnection(conn);
1253                    }
1254                    catch (RepException ex) {
1255                            // Ignore the exception
1256                    }
1257          }
1258          catch (SQLException ex1) {
1259          }
1260        }
1261      }
1262    
1263      private void UpdateConisderedForBookMarksTable(String pubName,
1264                                                     String subName,
1265                                                     String tableName,
1266                                                     Long syncId, Statement stmt) throws
1267          SQLException, RepException {
1268    
1269        StringBuffer query = new StringBuffer();
1270    
1271        Long maxSyncId = syncId;
1272        log.debug("maxSyncId" + maxSyncId);
1273        query.append(" UPDATE  ").append(dbHandler.getBookMarkTableName()).append(
1274            " set  ").append(RepConstants.bookmark_ConisderedId5)
1275            .append(" = ").append(maxSyncId).append("  where ").append(
1276            RepConstants.
1277            bookmark_LocalName1).append(" = '").append(subName).append("' and ").
1278            append(RepConstants.bookmark_RemoteName2)
1279            .append(" = '").append(pubName).append("' and ")
1280            .append(RepConstants.bookmark_TableName3).append(" = '").append(
1281            tableName).append("'");
1282        stmt.executeUpdate(query.toString());
1283        log.debug(query.toString());
1284    
1285      }
1286    
1287      /**
1288       * This method first gets the object of the replication server by
1289       * looking up the remote url and port. Then gets the publication object
1290       * from that remote object.
1291       *
1292       * @return
1293       * @throws RepException
1294       * @throws RemoteException
1295       */
1296    
1297      private _PubImpl getPublication() throws RepException, RemoteException {
1298        _ReplicationServerImpl serverRS = null;
1299         serverRS = getRemoteReplicationServer();
1300        _PubImpl pub = serverRS.getRemotePublication(pubName);
1301        if (pub == null) {
1302          throw new RepException("REP048", new Object[] {subName, pubName});
1303        }
1304        return pub;
1305      }
1306    
1307      /* It unsubscribe the subscriber as well as drop all related schedules */
1308      /**
1309       * This method removes the subscribed tables from the publication server's bookmark table,
1310       * then drops all replication related tables in the subscribed database
1311       *
1312       * @throws RepException
1313       * @throws RemoteException
1314       */
1315    
1316      public void unsubscribe() throws RepException {
1317        Connection con = null;
1318        Statement stt = null;
1319        try {
1320          try {
1321            getPublication().dropSubscription(subName);
1322          }
1323          catch (RepException ex) {
1324            RepConstants.writeERROR_FILE(ex);
1325            throw ex;
1326          }
1327          catch (Exception ex) {
1328            RepConstants.writeERROR_FILE(ex);
1329            throw new RepException("REP046", new Object[] {subName, ex.getMessage()});
1330          }
1331          try {
1332            con = connectionPool.getFreshConnection(subName);
1333            dbHandler.setDefaultSchema(con);
1334            stt = con.createStatement();
1335            try {
1336              stt.execute(" delete from " + dbHandler.getSubscriptionTableName() +
1337                          " where " + RepConstants.subscription_subName1 + " = '" +
1338                          subName + "'");
1339    
1340              log.debug(" delete from " + dbHandler.getSubscriptionTableName() +
1341                        " where " + RepConstants.subscription_subName1 + " = '" +
1342                        subName + "'");
1343            }
1344            catch (SQLException ex) {
1345              RepConstants.writeERROR_FILE(ex);
1346              throw new RepException("REP037", new Object[] {subName});
1347            }
1348            try {
1349              localServer.getScheduleHandler().dropSchedule(subName);
1350            }
1351            catch (RepException ex) {
1352              if (! (ex.getRepCode() == "REP203")) {
1353                RepConstants.writeERROR_FILE(ex);
1354                throw ex;
1355              }
1356            }
1357            deleteNonSharedPublishedSubscribedTables(con, subName);
1358            localServer.refershSubscription(subName);
1359          }
1360          catch (SQLException ex) {
1361            RepConstants.writeERROR_FILE(ex);
1362          }
1363        }
1364        finally {
1365          connectionPool.removeSubPubFromMap(subName);
1366          try {
1367            if (stt != null)
1368              stt.close();
1369          }
1370          catch (SQLException ex1) {
1371          }
1372             connectionPool.returnConnection(con);
1373        }
1374      }
1375    
1376      private void deleteNonSharedPublishedSubscribedTables(Connection con,
1377          String pubsubName) throws SQLException, RepException {
1378    
1379        //" Select Table_Name from RepTable Where PubSub_Name = '"+pubName+"' and Table_Name Not In "
1380        //"( Select Distinct Table_Name from RepTable Where PubSub_Name <> '"+pubName+"') ";
1381    
1382        Statement stt = null;
1383        ResultSet rs = null;
1384        ResultSet rsPub = null;
1385    
1386        try {
1387          StringBuffer tablesToDelete = new StringBuffer();
1388          tablesToDelete.append(" Select ").append(RepConstants.repTable_tableName2)
1389              .append(" from ").append(dbHandler.getRepTableName()).append(
1390              " where ")
1391              .append(RepConstants.repTable_pubsubName1).append(" = '").append(
1392              pubsubName)
1393              .append("' and ").append(RepConstants.repTable_tableName2)
1394              .append(" not in ( Select Distinct ").append(RepConstants.
1395              repTable_tableName2)
1396              .append(" from ").append(dbHandler.getRepTableName())
1397              .append(" Where ").append(RepConstants.repTable_pubsubName1)
1398              .append(" <> '").append(pubsubName).append("') ");
1399          stt = con.createStatement();
1400          rs = stt.executeQuery(tablesToDelete.toString());
1401              Connection pubsubConnection = null;
1402              try
1403              {
1404                pubsubConnection = connectionPool.getConnection(pubsubName);
1405          while (rs.next()) {
1406                    MetaDataInfo mdi = Utility.getDatabaseMataData(pubsubConnection);
1407            SchemaQualifiedName sname = new SchemaQualifiedName(mdi,
1408                rs.getString(RepConstants.repTable_tableName2));
1409            deleteAll(pubsubName, sname.toString(), con);
1410                    }
1411          }
1412              finally {
1413                    connectionPool.returnConnection(pubsubConnection);
1414              }
1415          stt.execute(" delete from " + dbHandler.getRepTableName() + " where " +
1416                      RepConstants.repTable_pubsubName1 + " = '" + pubsubName +"'");
1417          log.debug(" delete from " + dbHandler.getRepTableName() + " where " +
1418                    RepConstants.repTable_pubsubName1 + " = '" + pubsubName + "'");
1419          stt.execute(" delete from " + dbHandler.getBookMarkTableName() +
1420                      " where " +
1421                      RepConstants.bookmark_LocalName1 + " = '" + pubsubName +"'");
1422          log.debug(" delete from " + dbHandler.getBookMarkTableName() +
1423                    " where " +
1424                    RepConstants.bookmark_LocalName1 + " = '" + pubsubName + "'");
1425          rs = stt.executeQuery("select * from " +
1426                                dbHandler.getSubscriptionTableName());
1427    
1428          boolean issubTableExist = rs != null ? rs.next() ? true : false : false;
1429    
1430          try {
1431            rsPub = stt.executeQuery("select * from " +
1432                                     dbHandler.getPublicationTableName());
1433          }
1434          catch (SQLException ex) {
1435            /**
1436             *  Ignore the exception because Publication table
1437             *  does not exist in Subscriber Database. It is the
1438             *  case when table is published and subscribed in
1439             *  same database.
1440             */
1441          }
1442          boolean ispubTableExist = rsPub != null ? rsPub.next() ? true : false : false;
1443          if (!ispubTableExist && !issubTableExist) {
1444            dbHandler.dropSubscriberSystemTables(con);
1445          }
1446        }
1447        finally {
1448          if (rs != null)
1449            rs.close();
1450          if (rsPub != null)
1451            rsPub.close();
1452          if (stt != null)
1453            stt.close();
1454        }
1455      }
1456    
1457      private void deleteAll(String pubsubName, String table, Connection con) throws
1458          SQLException,
1459    
1460          RepException {
1461    
1462        dbHandler.dropTriggersAndShadowTable(con, table, pubsubName);
1463    
1464      }
1465    
1466      public ArrayList getRepTables() {
1467        return subRepTables;
1468      }
1469    
1470      /* It saves the schedule information in the schedule table*/
1471    
1472      private void saveScheduleData(String schName, String subName,
1473                                    String scheduleType,
1474                                    String publicationServerName,
1475                                    String publicationPortNo,
1476                                    String recurrenceType,
1477                                    String repType, long schTime, int counter) throws
1478          RepException {
1479    
1480        Connection connection = connectionPool.getConnection(subName);
1481        Statement stt = null;
1482        ResultSet rs = null;
1483        try {
1484          stt = connection.createStatement();
1485          StringBuffer insertQuery = new StringBuffer();
1486          insertQuery = insertQuery.append("insert into ").append(dbHandler.
1487              getScheduleTableName())
1488              .append(" values ( '").append(schName).append("', '")
1489              .append(subName).append("' , '").append(scheduleType).append(
1490              "' , '")
1491              .append(publicationServerName).append("', '")
1492              .append(publicationPortNo).append("', '")
1493              .append(recurrenceType).append("' , '")
1494              .append(repType).append("' , ").append(schTime).append(" , ").
1495              append(
1496              counter).append(" )");
1497          stt.executeUpdate(insertQuery.toString());
1498          log.info("add schedule query " + insertQuery.toString());
1499    
1500          localServer.getScheduleHandler().startSchedule(subName, schName,
1501              scheduleType,
1502              publicationServerName, publicationPortNo, recurrenceType, repType,
1503              schTime, counter);
1504        }
1505        catch (SQLException ex) {
1506          log.error(ex.getMessage(), ex);
1507          try {
1508            StringBuffer query = new StringBuffer();
1509            query = query.append("select ").append(RepConstants.subscription_subName1).append(
1510                " from ").append(dbHandler.getScheduleTableName()).append(
1511                " where ").
1512                append(RepConstants.subscription_subName1).append(" = '").append(
1513                subName).append("'");
1514            rs = stt.executeQuery(query.toString());
1515            if (rs.next() == true) {
1516              throw new RepException("REP201", new Object[] {subName});
1517            }
1518          }
1519          catch (SQLException ex1) {
1520            RepConstants.writeERROR_FILE(ex1);
1521          }
1522        }
1523        catch (RepException ex) {
1524          RepConstants.writeERROR_FILE(ex);
1525          throw ex;
1526        }
1527        finally {
1528          connectionPool.removeSubPubFromMap(subName);
1529          try {
1530            if (stt != null)
1531              stt.close();
1532            if (rs != null)
1533              rs.close();
1534          }
1535          catch (SQLException ex2) {
1536          }
1537             connectionPool.returnConnection(connection);
1538        }
1539      }
1540    
1541      /**
1542       * Add a schedule entry to the replication tables, which will be looked at
1543       * when the replication instance is started, and obeyed according to its
1544       * specification
1545       *
1546       * @param scheduleName           String
1547       * @param subscriptionName       String
1548       * @param scheduleType0          String
1549       * @param publicationServerName  String
1550       * @param publicationPortNo      String
1551       * @param recurrenceType         String
1552       * @param replicationType        String
1553       * @param startDateTime          Timestamp
1554       * @param scheduleCounter        int
1555       * @throws RepException
1556       */
1557      public void addSchedule(String scheduleName, String subscriptionName,
1558                              String scheduleType0, String publicationServerName,
1559                              String publicationPortNo,
1560                              String recurrenceType, String replicationType,
1561                              Timestamp startDateTime, int scheduleCounter) throws
1562          RepException {
1563        String scheduleType = null;
1564        scheduleType = scheduleType0;
1565        try {
1566          if (scheduleName == null) {
1567            throw new RepException("REP215", null);
1568          }
1569          _Subscription sub = localServer.getSubscription(subscriptionName);
1570          if (sub == null) {
1571            throw new RepException("REP037", new Object[] {subscriptionName});
1572          }
1573          if (publicationServerName == null) {
1574            throw new RepException("REP094", null);
1575          }
1576          if (publicationPortNo == null) {
1577            throw new RepException("REP095", null);
1578          }
1579          try {
1580            sub.setRemoteServerPortNo(Integer.parseInt(publicationPortNo));
1581          }
1582          catch (NumberFormatException ex) {
1583            throw new RepException("REP220", new Object[] {null});
1584          }
1585          if (replicationType == null) {
1586            throw new RepException("REP212", null);
1587          }
1588          if (scheduleType == null) {
1589            throw new RepException("REP222", new Object[] {null});
1590          }
1591          if (! ( (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_nonRealTime)) ||
1592                 (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_realTime)))) {
1593            throw new RepException("REP223", new Object[] {null});
1594          }
1595          if (! (replicationType.equalsIgnoreCase(RepConstants.replication_snapshotType) ||
1596                 replicationType.equalsIgnoreCase(RepConstants.replication_synchronizeType) ||
1597                 replicationType.equalsIgnoreCase(RepConstants.replication_pullType) ||
1598                 replicationType.equalsIgnoreCase(RepConstants.replication_pushType))) {
1599            throw new RepException("REP216", null);
1600          }
1601          long startTime = 0;
1602          if (scheduleType.equalsIgnoreCase(RepConstants.scheduleType_nonRealTime)) {
1603            startTime = startDateTime.getTime();
1604            if (startTime <= System.currentTimeMillis()) {
1605              throw new RepException("REP202", null);
1606            }
1607            if (recurrenceType == null) {
1608              throw new RepException("REP213", null);
1609            }
1610            if (! (recurrenceType.equalsIgnoreCase(RepConstants.recurrence_yearType) ||
1611                   recurrenceType.equalsIgnoreCase(RepConstants.recurrence_monthType) ||
1612                   recurrenceType.equalsIgnoreCase(RepConstants.recurrence_dayType) ||
1613                   recurrenceType.equalsIgnoreCase(RepConstants.recurrence_hourType)||
1614                   recurrenceType.equalsIgnoreCase(RepConstants.recurrence_minuteType))) {
1615              throw new RepException("REP218", null);
1616            }
1617    
1618            if (startDateTime == null) {
1619              throw new RepException("REP214", null);
1620            }
1621            if (! (scheduleCounter >= 0)) {
1622              throw new RepException("REP217", null);
1623            }
1624          }
1625    
1626          saveScheduleData(scheduleName, subscriptionName, scheduleType,
1627                           publicationServerName,
1628                           publicationPortNo, recurrenceType, replicationType,
1629                           startTime, scheduleCounter);
1630        }
1631        catch (RepException ex) {
1632          RepConstants.writeERROR_FILE(ex);
1633          throw ex;
1634        }
1635        finally {
1636          connectionPool.removeSubPubFromMap(subName);
1637        }
1638      }
1639    
1640      /**
1641       * Remove a schedule entry from the schedule table
1642       *
1643       * @param scheduleName0 String
1644       * @param subName0 String
1645       */
1646      public void removeSchedule(String scheduleName0, String subName0) throws
1647          RepException {
1648        try {
1649          if (scheduleName0 == null) {
1650            throw new RepException("REP215", null);
1651          }
1652          _Subscription sub = localServer.getSubscription(subName0);
1653          if (sub == null) {
1654            throw new RepException("REP037", new Object[] {subName0});
1655          }
1656    
1657          localServer.getScheduleHandler().dropSchedule(subName0);
1658        }
1659        catch (RepException ex) {
1660          RepConstants.writeERROR_FILE(ex);
1661          throw ex;
1662        }
1663        finally {
1664          connectionPool.removeSubPubFromMap(subName);
1665        }
1666      }
1667    
1668      /**
1669       * Edit an existing schedule entry
1670       *
1671       * @param scheduleName0 String
1672       * @param subName0 String
1673       * @param newPubServerName String
1674       * @param newPubPortNo String
1675       */
1676      public void editSchedule(String scheduleName0, String subName0,
1677                               String newPubServerName, String newPubPortNo) throws
1678          RepException {
1679        try {
1680          if (scheduleName0 == null) {
1681            throw new RepException("REP215", null);
1682          }
1683          _Subscription sub = localServer.getSubscription(subName0);
1684          if (sub == null) {
1685            throw new RepException("REP037", new Object[] {subName0});
1686          }
1687          if (newPubServerName == null) {
1688            throw new RepException("REP094", null);
1689          }
1690          if (newPubPortNo == null) {
1691            throw new RepException("REP095", null);
1692          }
1693          try {
1694            sub.setRemoteServerPortNo(Integer.parseInt(newPubPortNo));
1695          }
1696          catch (NumberFormatException ex) {
1697            throw new RepException("REP220", new Object[] {null});
1698          }
1699          localServer.getScheduleHandler().editSchedule(scheduleName0, subName0,
1700              newPubServerName, newPubPortNo);
1701        }
1702        catch (RepException ex) {
1703          RepConstants.writeERROR_FILE(ex);
1704          throw ex;
1705        }
1706        finally {
1707          connectionPool.removeSubPubFromMap(subName);
1708        }
1709      }
1710    
1711      private void updateBookMarkLastSyncId(String tableName,
1712                                            String remote_Pub_Sub_Name,
1713                                            Object lastId, Statement stmt) throws
1714          Exception {
1715        String updateQuery = "update " + dbHandler.getBookMarkTableName() +
1716            " set " +
1717            RepConstants.bookmark_lastSyncId4 + "=" + lastId + " where  " +
1718            RepConstants.bookmark_LocalName1 + " = '" + subName + "' and " +
1719            RepConstants.bookmark_RemoteName2 + " = '" + remote_Pub_Sub_Name +
1720            "' and " + RepConstants.bookmark_TableName3 + " = '" + tableName +
1721            "' ";
1722        stmt.executeUpdate(updateQuery);
1723      }
1724    
1725      public synchronized void updateSubscription() throws RepException {
1726        /* NEEDS WORK...bjt */
1727            Connection subConnection = null;
1728        Statement stt = null;
1729        ResultSet rs = null;
1730        try {
1731              subConnection = connectionPool.getConnection(subName);
1732              MetaDataInfo mdi = Utility.getDatabaseMataData(subConnection);
1733          HashMap primCols = new HashMap();
1734          _PubImpl pub = getPublication();
1735          int pubVendorType = pub.getPubVendorName();
1736          //getting list of tables which after subscribing were dropped(even if dropped and added again by pub)
1737          ArrayList dropTableListFromSub = pub.dropTableListForSub(subName);
1738          //dropping table and unsubscribing them from replicator tables
1739          if (!dropTableListFromSub.isEmpty()){
1740            UnsubscribeAndDropTablesDroppedFromPub(subConnection,dropTableListFromSub);
1741          }
1742            //Returns [0] array of schemas [1]publicationtablequeries.
1743          ArrayList[] schemaTables = getPublicationTableQueries(primCols, pub);
1744          ArrayList dropTableList = new ArrayList();
1745          ArrayList oldSubRepTableList = new ArrayList();
1746          ArrayList newSubRepTableList = new ArrayList();
1747          stt = subConnection.createStatement();
1748          StringBuffer sb = new StringBuffer();
1749          //getting all the old subscribed tables list
1750          sb.append("select ").append(RepConstants.repTable_tableName2).append(
1751              " from ").append(dbHandler.getRepTableName()).append(" where ")
1752              .append(RepConstants.repTable_pubsubName1).append(" = '")
1753              .append(subName).append("'");
1754          rs = stt.executeQuery(sb.toString());
1755          while (rs.next()) {
1756            SchemaQualifiedName sname = new SchemaQualifiedName(mdi, rs.getString(1));
1757            String schema = sname.getSchemaName();
1758            String table = sname.getTableName();
1759            RepTable repTable = new RepTable(sname, RepConstants.publisher);
1760            //Sets Sorted primary key columns in repTable.
1761            mdi.setPrimaryColumns(repTable, schema, table);
1762            oldSubRepTableList.add(repTable.getSchemaQualifiedName());
1763          }
1764    
1765          String[] alterTableQueries = alterTableAddFKQueries == null ? null :
1766              (String[]) alterTableAddFKQueries.toArray(new String[0]);
1767    
1768          dbHandler.createSubscribedTablesTriggersAndShadowTables(subName,
1769              (String[]) (schemaTables[1]).toArray(new String[0]),
1770              alterTableQueries, primCols, pubVendorType, subRepTables);
1771    
1772          //Do Entry in the Subscription Table , BookMark Table, Rep Table.
1773          saveSubscriptionNewData(dbHandler);
1774          //get new subReptable list
1775          for (int i = 0; i < subRepTables.size(); i++) {
1776            RepTable repTable = (RepTable) subRepTables.get(i);
1777            newSubRepTableList.add(repTable.getSchemaQualifiedName());
1778          }
1779          //find if any table if was dropped from publisher.If any found,add to dropTableList
1780          for (int j = 0; j < oldSubRepTableList.size(); j++) {
1781            if (!newSubRepTableList.contains(oldSubRepTableList.get(j))) {
1782              dropTableList.add(oldSubRepTableList.get(j));
1783            }
1784          }
1785          StringBuffer deleteQuery = new StringBuffer();
1786          for (int j = 0; j < dropTableList.size(); j++) {
1787            //drop triggers,shadow tables,delete from logtable
1788            dbHandler.dropTriggersAndShadowTable(subConnection,
1789                                                 dropTableList.get(j).toString(),subName);
1790            //delete entry from bookmarkTable
1791            deleteQuery.append(" delete from ").append(dbHandler.
1792                getBookMarkTableName()).append(" where ").append(RepConstants.
1793                bookmark_LocalName1).append(" = '").append(subName)
1794                .append("'").append(" and ").append(RepConstants.repTable_tableName2).append(
1795                " ='").append(dropTableList.get(j).toString()).append(" '").
1796                append(" and ").append(RepConstants.bookmark_RemoteName2).append(" ='").
1797                append(pubName).append(" '");
1798            stt.execute(deleteQuery.toString());
1799          }
1800          pub.saveSubscriptionNewData(subName);
1801        }
1802    
1803        catch (RepException ex) {
1804          RepConstants.writeERROR_FILE(ex);
1805          throw ex;
1806        }
1807        catch (Exception ex) {
1808          RepConstants.writeERROR_FILE(ex);
1809          throw new RepException("REP318", new Object[] {ex.getMessage()});
1810        }
1811        finally {
1812          connectionPool.removeSubPubFromMap(subName);
1813          try {
1814            if (rs != null)
1815              rs.close();
1816          }
1817          catch (SQLException ex1) {
1818          }
1819          try {
1820            if (stt != null) {
1821              stt.close();
1822            }
1823          }
1824          catch (SQLException ex2) {
1825          }
1826             connectionPool.returnConnection(subConnection);
1827        }
1828      }
1829    
1830      /**
1831       * Make an entry in the subscriptions table. Inserts all the records
1832       * corresponding to each replication table into the reptabe and Bookmark table.
1833       *
1834       * @param dbHandler
1835       * @throws SQLException
1836       * @throws RepException
1837       */
1838    
1839      private void saveSubscriptionNewData(AbstractDataBaseHandler dbHandler) throws
1840          SQLException, RepException {
1841        Connection connection = connectionPool.getConnection(subName);
1842        Statement stt = null;
1843        try {
1844          stt = connection.createStatement();
1845          //Delete all entries from Rep_table
1846          StringBuffer sb = new StringBuffer();
1847          sb.append("delete from ").append(dbHandler.getRepTableName()).append(" where ")
1848              .append(RepConstants.repTable_pubsubName1).append(" = '")
1849              .append(subName).append("'");
1850          log.debug("Delete all entries from Rep_table::" + sb.toString());
1851          stt.execute(sb.toString());
1852    
1853          //For each table makes an entry into the BookMark Table and Rep Table
1854          for (int i = 0, size = subRepTables.size(); i < size; i++) {
1855            RepTable repTable = (RepTable) subRepTables.get(i);
1856            repTable.setConflictResolver(conflictResolver);
1857            StringBuffer sb2 = new StringBuffer();
1858            sb2.append(" Insert into ").append(dbHandler.getBookMarkTableName())
1859                .append(" values ( '").append(subName).append("','")
1860                .append(pubName).append("','").append(repTable.getSchemaQualifiedName())
1861                .append("',").append("0,0,'N')");
1862            try {
1863              log.debug("Table Entry in BookMarkTable::" + sb2.toString());
1864              stt.execute(sb2.toString());
1865            }
1866            catch (SQLException ex1) {
1867            }
1868            dbHandler.saveRepTableData(connection, subName, repTable);
1869          }
1870        }
1871        catch (SQLException ex) {
1872          throw ex;
1873        }
1874        finally {
1875          connectionPool.removeSubPubFromMap(subName);
1876          try {
1877            if (stt != null) {
1878              stt.close();
1879            }
1880          }
1881          catch (SQLException ex2) {
1882          }
1883             connectionPool.returnConnection(connection);
1884        }
1885      }
1886    
1887      /**
1888       * Get a snapshot of newly added tables or tables
1889       * on which no replication operation was done
1890       *
1891       * @throws RepException
1892       */
1893      public synchronized void getSnapShotAfterUpdatingSubscriber() throws
1894                    /* NEEDS WORK...bjt */
1895          RepException {
1896        Connection subConnection = null;
1897        Statement stmt = null;
1898        _PubImpl publication = null;
1899        boolean islockedTaken = false,isCurrentTableCyclic = false;
1900        try {
1901          ServerSocket serverSocket = null;
1902          Socket socket = null;
1903          serverSocket = connectionPool.startServerSocket();
1904          String remoteServerName;
1905    
1906          String localAddress = null,remoteMachineAddress=null;
1907          try {
1908            ArrayList tablesForSnapShot = getTableListForSnapShotAfterUpdatingSub();
1909            _ReplicationServerImpl remoteServer = getRemoteReplicationServer();
1910            remoteServerName = remoteServer.getServerName();
1911            remoteMachineAddress = remoteServer.getServerName();
1912            publication = remoteServer.getRemotePublication(pubName);
1913            publication.checkForLock(subName);
1914            islockedTaken = true;
1915            localAddress = InetAddress.getLocalHost().getHostName();
1916            publication.createSnapShotAfterUpdateSub(localAddress,serverSocket.getLocalPort(),subName, tablesForSnapShot,isSchemaSupported(),fileUpload,localAddress);
1917          }
1918          catch (RepException ex) {
1919            log.error(ex.getMessage(), ex);
1920            throw ex;
1921          }
1922          catch (Exception ex) {
1923            RepConstants.writeERROR_FILE(ex);
1924            RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()});
1925            rex.setStackTrace(ex.getStackTrace());
1926            throw rex;
1927          }
1928          try {
1929    // bjt - still need to fix this routine...leave commented code until fixed
1930    //        socket = serverSocket.accept();
1931    //        InputStream is = socket.getInputStream();
1932    //        FileOutputStream fos = new FileOutputStream(PathHandler.
1933    //            getDefaultZIPFilePathForClient("snapshot_" + pubName+ "_" +subName));
1934    //        byte[] buf = new byte[1024];
1935    //        int len = 0;
1936    //        while ( (len = is.read(buf)) > 0) {
1937    //          fos.write(buf, 0, len);
1938    //        }
1939    //        fos.close();
1940    //        is.close();
1941    //        serverSocket.close();
1942    //        socket.close();
1943            //unzipping the zip file
1944            if(!localAddress.equalsIgnoreCase(remoteMachineAddress)) {
1945              ZipHandler.unZip(PathHandler.getDefaultZIPFilePathForClient("snapshot_" + pubName + "_" + subName),PathHandler.getDefaultFilePathForClient("snapshot_" +pubName + "_" + subName));
1946            }
1947    
1948          }
1949          catch (IOException ex) {
1950            RepConstants.writeERROR_FILE(ex);
1951            RepException rex = new RepException("REP052", new Object[] {subName,ex.getMessage()});
1952            rex.setStackTrace(ex.getStackTrace());
1953            throw rex;
1954          }
1955    
1956          try {
1957    
1958            for (int i = 0; i < subRepTables.size(); i++) {
1959              RepTable repTable = ( (RepTable) subRepTables.get(i));
1960              isCurrentTableCyclic = repTable.getCyclicDependency().equalsIgnoreCase(RepConstants.YES);
1961              if(isCurrentTableCyclic)
1962              break;
1963            }
1964            SAXParser saxParser = SAXParserFactory.newInstance().newSAXParser();
1965            XMLReader reader = saxParser.getXMLReader();
1966            subConnection = connectionPool.getConnection(subName);
1967            stmt = subConnection.createStatement();
1968            //Instance for content handler
1969                    //bjt
1970                    /*
1971                DatabaseMetaData dbmd = subConnection.getMetaData();
1972                    int batchMax = (dbmd.supportsBatchUpdates() ? 1000 : 0);
1973                    log.debug("Batch Max = " + batchMax);
1974                    if (batchMax > 0) {
1975                            log.debug("Using statement pooling in batches of " + batchMax);
1976                    } else {
1977                            log.debug("NOT Using statement pooling, database does not seem to support it");
1978                    }
1979                    */
1980            //SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName, batchMax);
1981            SnapshotHandler ch = new SnapshotHandler(true, subConnection, this,dbHandler, remoteServerName);
1982            //  instance for content hanedler
1983            ch.setPubName(pubName);
1984            ch.setSubName(subName);
1985            reader.setContentHandler(ch);
1986            reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" +pubName+ "_" +subName));
1987            ch.closeAllStatementAndResultset();
1988    
1989            if(isCurrentTableCyclic){
1990            SnapshotHandler ch1 = new SnapshotHandler(false, subConnection, this,dbHandler, remoteServerName); //  instance for content hanedler
1991            ch1.setPubName(pubName);
1992            ch1.setSubName(subName);
1993            reader.setContentHandler(ch1);
1994            reader.parse(PathHandler.getDefaultFilePathForClient("snapshot_" + pubName + "_" +subName+ "_" +subName));
1995            ch1.closeAllStatementAndResultset();
1996            }
1997    
1998    
1999            if (_Subscription.xmlAndShadow_entries) {
2000              // deleting xml file
2001              deleteFile(PathHandler.getDefaultFilePathForClient("snapshot_" +pubName+ "_" +subName));
2002              // deleting zip file
2003              deleteFile(PathHandler.getDefaultZIPFilePathForClient("snapshot_" +pubName+ "_" +subName));
2004            }
2005            dbHandler.deleteRecordsFromSuperLogTable(stmt);
2006          }
2007          catch (Exception ex) {
2008            RepConstants.writeERROR_FILE(ex);
2009            RepException rex = new RepException("REP053", new Object[] {subName,ex.getMessage()});
2010            rex.setStackTrace(ex.getStackTrace());
2011            throw rex;
2012          }
2013        }
2014        catch (RepException rex) {
2015          RepConstants.writeERROR_FILE(rex);
2016          throw rex;
2017        }
2018        finally {
2019          connectionPool.removeSubPubFromMap(subName);
2020          try {
2021            if (islockedTaken)
2022              publication.releaseLOCK();
2023          }
2024          catch (RemoteException ex2) {
2025            RepConstants.writeERROR_FILE(ex2);
2026          }
2027             connectionPool.returnConnection(subConnection);
2028    
2029        }
2030      }
2031    
2032      private ArrayList getTableListForSnapShotAfterUpdatingSub() throws RepException {
2033        ResultSet rs = null;
2034        Connection connection = null;
2035        Statement stt = null;
2036        ArrayList tablesForSnapShot = new ArrayList();
2037        StringBuffer selectTableNames = new StringBuffer();
2038        selectTableNames.append("select ").append(RepConstants.bookmark_TableName3)
2039            .append(" from ").append(RepConstants.bookmark_TableName).append(" where ")
2040            .append(RepConstants.bookmark_lastSyncId4).append(" = 0 ")
2041            .append(" and ").append(RepConstants.bookmark_ConisderedId5).append(" = 0 ");
2042        try {
2043          connection = connectionPool.getConnection(subName);
2044          stt = connection.createStatement();
2045          rs = stt.executeQuery(selectTableNames.toString());
2046          while (rs.next()) {
2047            tablesForSnapShot.add(rs.getString(RepConstants.bookmark_TableName3).toLowerCase());
2048          }
2049        }
2050        catch (SQLException ex) {
2051    
2052        }
2053        catch (RepException ex1) {
2054          throw ex1;
2055        }
2056        finally {
2057          try {
2058            if (rs != null) {
2059              rs.close();
2060            }
2061            if (stt != null) {
2062              stt.close();
2063            }
2064          }
2065          catch (SQLException ex2) {
2066          }
2067              connectionPool.returnConnection(connection);
2068          connectionPool.removeSubPubFromMap(subName);
2069        }
2070        return tablesForSnapShot;
2071      }
2072    
2073      private void UnsubscribeAndDropTablesDroppedFromPub(Connection
2074          subConnection, ArrayList dropTableList) throws RepException,
2075          RepException, SQLException {
2076        Statement stt = null;
2077        try {
2078          stt = subConnection.createStatement();
2079          for (int i = 0; i < dropTableList.size(); i++) {
2080            String tableName = dropTableList.get(i).toString();
2081            //drop triggers,shadow tables,delete from logtable
2082            dbHandler.dropTriggersAndShadowTable(subConnection, tableName,subName);
2083            StringBuffer deleteQuery = new StringBuffer();
2084            //delete entry from bookmarkTable
2085            deleteQuery.append(" delete from ").append(dbHandler.
2086                getBookMarkTableName()).append(
2087                " where ").append(RepConstants.bookmark_LocalName1).append(" = '").append(
2088                subName).append("'").append(" and ").append(RepConstants.repTable_tableName2).
2089                append(" ='").append(tableName).append("'").append(
2090                " and ").append(RepConstants.bookmark_RemoteName2).append(" ='").
2091                append(pubName).append("'");
2092            log.debug(deleteQuery.toString());
2093            try {
2094              stt.execute(deleteQuery.toString());
2095            }
2096            catch (SQLException ex1) {
2097            }
2098    
2099            StringBuffer deleteRepTableQuery = new StringBuffer();
2100          //delete entry from bookmarkTable
2101          deleteRepTableQuery.append(" delete from ").append(dbHandler.getRepTableName())
2102             .append( " where ").append(RepConstants.repTable_pubsubName1).append( " = '").append(subName)
2103            .append( "'").append(" and ").append(RepConstants.repTable_tableName2).append(" ='")
2104            .append(tableName).append("'");
2105          log.debug(deleteRepTableQuery.toString());
2106          try {
2107            stt.execute(deleteRepTableQuery.toString());
2108          }
2109          catch (SQLException ex1) {
2110            throw ex1;
2111          }
2112    
2113            //dropping table from database
2114            String dropTable = "Drop table " + tableName;
2115            log.debug("dropTable::" + dropTable);
2116            try {
2117              stt.execute(dropTable);
2118            }
2119            catch (SQLException ex) {
2120    
2121            }
2122          }
2123        }
2124        finally {
2125          try {
2126            if (stt != null)
2127              stt.close();
2128          }
2129          catch (Exception ex) {
2130          }
2131        }
2132      }
2133    
2134      private void makeSubscriberTransactionLgFile(String subName,MergeHandler mg,BufferedWriter bw,String replicationType) throws  Exception {
2135        if(Utility.createTransactionLogFile) {
2136          String transactionLogURL = PathHandler.getDefaultTransactionLogFilePathForSubscriber(subName);
2137          FileOutputStream fos = new FileOutputStream(transactionLogURL, true);
2138          OutputStreamWriter os = new OutputStreamWriter(fos);
2139          bw = new BufferedWriter(os);
2140          AbstractSynchronize.writeDateInTransactionLogFile(bw);
2141          AbstractSynchronize.writeOperationInTransactionLogFile(bw, mg.insert,mg.update, mg.delete, replicationType);
2142          bw.flush();
2143        }
2144     }
2145    
2146     private boolean isSchemaSupported() {
2147     return  dbHandler.isSchemaSupported();
2148     }
2149    
2150    
2151    
2152    
2153    }





























































Powered by Drupal - Theme by Danger4k