Configuration Options¶
Configuration Parameters¶
Key User Configuration Properties¶
Note
plog.location.uri - is the location from which Dbvisit Replicate Connector will search for and read PLOG files delivered by the Dbvisit Replicate application. This directory must be local to the filesystem on which Kafka Connect is running, or accessible to it via a mounted filesystem. It cannot be an otherwise remote filesystem.
Note
plog.data.flush.size - for low transactional volumes (and for testing) it is best to change the default value of plog.data.flush.size
in the configuration file to a value less than your total number of change records, eg. for manual testing you can use 1 to verify that each and every record is emitted correctly. This configuration parameter is used as the internal PLOG reader’s cache size which translates at run time to the size of the polled batch of Kafka messages. It is more efficient under high transactional load to publish to Kafka (particularly in distributed mode where network latency might be an issue) in batches. Please note that in this approach the data from the PLOG reader is only emitted once the cache is full (for the specific Kafka source task) and/or the PLOG is done, so an Oracle redo log switch has occurred. This means that if the plog.data.flush.size
is greater than total number of LCRs in cache it will wait for more data to arrive or a log switch to occur.
All User Configuration Properties¶
tasks.max
Maximum number of tasks to start for processing PLOGs.
- Type: string
- Default: 4
topic.prefix
Prefix for all topic names.
- Type: string
- Default: REP-
plog.location.uri
Replicate PLOG location URI, output of Replicate MINE.
- Type: string
- Default: file:/home/oracle/ktest/mine
plog.data.flush.size
LCRs to cache before flushing, for connector this is the batch size, choose this value according to transactional volume, for high throughput to kafka the default value may suffic, for low or sporadic volume lower this value, eg. for testing use 1 which will not use cache and emit every record immediately.
- Type: string
- Default: 1000
plog.interval.time.ms
Time in milliseconds for one wait interval, used by scans and health check.
- Type: string
- Default: 500
plog.scan.interval.count
Number of intervals between scans, eg. 5 x 0.5s = 2.5s scan wait time.
- Type: string
- Default: 5
plog.health.check.interval
Number of intervals between health checks, these are used when initially waiting for MINE to produce PLOGs, eg. 10 * 0.5s = 5.0s.
- Type: string
- Default: 10
plog.scan.offline.interval
Default number of health check scans to decide whether or not replicate is offline, this is used as time out value. NOTE for testing use 1, i.e. quit after first health check 1 * 10 * 0.5s = 5s where 10 is plog.health.check.interval value and 0.5s is plog.interval.time.ms value.
- Type: string
- Default: 1000
topic.name.transaction.info
Topic name for transaction meta data stream.
- Type: string
- Default: TX.META
plog.global.scn.cold.start
Global SCN when to start loading data during cold start.
- Type: string
- Default: 0
Data Types¶
Oracle Data Type | Connect Data Type | Default Value | Conversion Rule |
---|---|---|---|
NUMBER | Int32 | -1 | scale <= 0 and precision - scale < 10 |
NUMBER | Int64 | -1L | scale <= 0 and precision - scale > 10 and < 20 |
NUMBER | Decimal | BigDecimal.ZERO | scale > 0 or precision - scale > 20 |
CHAR | Type.String | Empty string (zero length) | Encoded as UTF8 string |
VARCHAR | “” | “” | “” |
VARCHAR2 | “” | “” | “” |
LONG | “” | “” | “” |
NCHAR | Type.String | Empty string (zero length) | Encoded as UTF8, attempt is made to auto-detect if national character set was UTF-16 |
NVARCHAR | “” | “” | “” |
NVARCHAR2 | “” | “” | “” |
INTERVAL DAY TO SECOND | Type.String | Empty string (zero length) | |
INTERVAL YEAR TO MONTH | “” | “” | |
CLOB | Type.String | Empty string (zero length) | UTF8 string |
NCLOB | “” | “” | “” |
DATE | Timestamp | Epoch time | |
TIMESTAMP | “” | “” | |
TIMESTAMP WITH TIME ZONE | “” | “” | |
TIMESTAMP WITH LOCAL TIME ZONE | “” | “” | |
BLOB | Bytes | Empty byte array (zero length) | Converted from SerialBlob to bytes |
RAW | Bytes | Empty byte array (zero length) | No conversion |
LONG RAW | “” | “” | “” |
Distributed Mode Settings¶
Use the following to start Dbvisit Replicate Connector for Kafka in Distributed mode, once the Kafka Connect worker has been started on the host node. Postman is an excellent utility for working with cUrl commands.
➜ curl -v -H "Content-Type: application/json" -X PUT 'http://localhost:8083/connectors/kafka-connect-dbvisitreplicate/config' -d
'{
"connector.class": "com.dbvisit.replicate.kafkaconnect.ReplicateSourceConnector",
"tasks.max": "2",
"topic.prefix": "REP-",
"plog.location.uri": "file:/foo/bar",
"plog.data.flush.size": "1",
"plog.interval.time.ms": "500",
"plog.scan.interval.count": "5",
"plog.health.check.interval": "10",
"plog.scan.offline.interval": "1000",
"topic.name.transaction.info": "TX.META"
}'
Or save this to a file <json_file>:
{
"name": "TSource",
"config": {
"connector.class": "com.dbvisit.replicate.kafkaconnect.ReplicateSourceConnector",
"tasks.max": "2",
"topic.prefix": "REP-",
"plog.location.uri": "file:/foo/bar",
"plog.data.flush.size": "1",
"plog.interval.time.ms": "500",
"plog.scan.interval.count": "5",
"plog.health.check.interval": "10",
"plog.scan.offline.interval": "1000",
"topic.name.transaction.info": "TX.META"
}
}
➜ curl -X POST -H "Content-Type: application/json" http://localhost:8083 --data "@<json_file>"