源库: IP 192.168.1.26/1923 database mydb table test 1.2 目标库 IP 192.168.1.26/1923 database skytf table test_tf 备注: 关于 pgsql_fdw 安装略,可以参考之前写的 BLOG:https://postgres.fun/20120607112420.html
目标库表信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14
mydb=> /d test Table "mydb.test" Column | Type | Modifiers --------+-----------------------+----------- id | integer | name | character varying(32) | Indexes: "idx_test_1" btree (id)
mydb=> select count(*) from test; count --------- 1990000 (1 row)
skytf=> /c skytf postgres You are now connected to database "skytf"as user "postgres".
skytf=# grant usage on foreign data wrapper pgsql_fdw to skytf; GRANT
skytf=# /c skytf skytf You are now connected to database "skytf"as user "skytf".
skytf=> CREATE SERVER pgsql_srv FOREIGN DATA WRAPPER pgsql_fdw skytf-> OPTIONS (host '127.0.0.1', port '1923', dbname 'mydb'); CREATE SERVER
skytf=> /des List of foreign servers Name | Owner | Foreign-data wrapper --------------+-------+---------------------- file_srv | skytf | file_fdw mysql_svr_25 | skytf | mysql_fdw pgsql_srv | skytf | pgsql_fdw (3 rows)
create mapping user
1 2 3
skytf=> CREATEUSERMAPPINGFORpublic SERVER pgsql_srv skytf-> OPTIONS (user'mydb', password'mydb'); CREATEUSERMAPPING
create foreign table
1 2 3 4 5 6
skytf=> CREATE FOREIGN TABLE ft_test ( skytf(> id integer , skytf(> name character varying(32) ) skytf-> SERVER pgsql_srv skytf-> OPTIONS (nspname 'mydb', relname 'test'); CREATE FOREIGN TABLE
查询外部表测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
skytf=> /d ft_test; Foreign table "skytf.ft_test" Column | Type | Modifiers --------+-----------------------+----------- id | integer | name | character varying(32) | Server: pgsql_srv
skytf=> select * From ft_test limit 3; id | name -------+------ 54241 | AAA 54242 | AAA 54243 | AAA (3 rows)
创建中间表
1 2
skytf=> create table test_tf (id int4,name varchar(32)); CREATE TABLE
创建函数
1 2 3 4 5 6 7 8
CREATEorreplaceFUNCTION func_sync_bill() RETURNSINTEGERAS $$ BEGIN begin insertinto test_tf (id,name) selectid,namefrom ft_test; return 0; end; END; $$ LANGUAGE 'plpgsql';
执行函数
1 2 3 4 5 6 7 8 9 10
skytf=> select func_sync_bill() ; ERROR: cache lookup failed for type 0 CONTEXT: SQL statement "insert into test_tf (id,name) select id,name from ft_test" PL/pgSQL function "func_sync_bill" line 4 at SQL statement
skytf=> select count(*) from test_tf; count ------- 0 (1 row)
备注:错误出现在语句 “insert into test_tf (id,name) select id,name from ft_test” 而这个语句单独在 session 中执行是正确的。
2.8 在会话中执行插入语句
1 2 3 4 5 6 7 8
skytf=> selectcount(*) from test_tf; count ------- 0 (1 row)
skytf=> insertinto test_tf (id,name) select id,name from ft_test; INSERT01990000
skytf=> CREATE or replace FUNCTION func_sync_bill() RETURNS INTEGER AS $$ skytf$> BEGIN skytf$> begin skytf$> EXECUTE 'insert into test_tf (id,name) select id,name from ft_test'; skytf$> return0; skytf$> end; skytf$> END; skytf$> $$ LANGUAGE 'plpgsql'; CREATE FUNCTION